You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:14 UTC
[26/56] [abbrv] incubator-carbondata git commit: Refactor
org.carbondata.query package (#692)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
new file mode 100644
index 0000000..0c61947
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.processor.impl;
+
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.processor.AbstractDataBlockIterator;
+import org.carbondata.scan.result.Result;
+
+/**
+ * Below class will be used to process the block for detail query
+ */
+public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
+
+ /**
+ * DataBlockIteratorImpl Constructor
+ *
+ * @param blockExecutionInfo execution information
+ */
+ public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo,
+ FileHolder fileReader, int batchSize) {
+ super(blockExecutionInfo, fileReader, batchSize);
+ }
+
+ /**
+ * It scans the block and returns the result with @batchSize
+ *
+ * @return Result of @batchSize
+ */
+ public Result next() {
+ this.scannerResultAggregator.collectData(scannedResult, batchSize);
+ Result result = this.scannerResultAggregator.getCollectedResult();
+ while (result.size() < batchSize && hasNext()) {
+ this.scannerResultAggregator.collectData(scannedResult, batchSize-result.size());
+ result.merge(this.scannerResultAggregator.getCollectedResult());
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java
new file mode 100644
index 0000000..5d00e32
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/AbstractScannedResult.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.result;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.executor.infos.KeyStructureInfo;
+
+/**
+ * Scanned result class which will store and provide the result on request
+ */
+public abstract class AbstractScannedResult {
+
+ /**
+ * current row number
+ */
+ protected int currentRow = -1;
+ /**
+ * row mapping indexes
+ */
+ protected int[] rowMapping;
+ /**
+ * key size of the fixed length column
+ */
+ private int fixedLengthKeySize;
+ /**
+ * total number of rows
+ */
+ private int totalNumberOfRows;
+ /**
+ * to keep track of number of rows process
+ */
+ private int rowCounter;
+ /**
+ * dimension column data chunk
+ */
+ private DimensionColumnDataChunk[] dataChunks;
+ /**
+ * measure column data chunk
+ */
+ private MeasureColumnDataChunk[] measureDataChunks;
+ /**
+ * dictionary column block index in file
+ */
+ private int[] dictionaryColumnBlockIndexes;
+
+ /**
+ * no dictionary column block index in file
+ */
+ private int[] noDictionaryColumnBlockIndexes;
+
+ /**
+ * column group to is key structure info
+ * which will be used to get the key from the complete
+ * column group key
+ * For example if only one dimension of the column group is selected
+ * then from complete column group key it will be used to mask the key and
+ * get the particular column key
+ */
+ private Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
+
+ public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) {
+ this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize();
+ this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes();
+ this.dictionaryColumnBlockIndexes = blockExecutionInfo.getDictionaryColumnBlockIndex();
+ this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo();
+ }
+
+ /**
+ * Below method will be used to set the dimension chunks
+ * which will be used to create a row
+ *
+ * @param dataChunks dimension chunks used in query
+ */
+ public void setDimensionChunks(DimensionColumnDataChunk[] dataChunks) {
+ this.dataChunks = dataChunks;
+ }
+
+ /**
+ * Below method will be used to set the measure column chunks
+ *
+ * @param measureDataChunks measure data chunks
+ */
+ public void setMeasureChunks(MeasureColumnDataChunk[] measureDataChunks) {
+ this.measureDataChunks = measureDataChunks;
+ }
+
+ /**
+ * Below method will be used to get the chunk based in measure ordinal
+ *
+ * @param ordinal measure ordinal
+ * @return measure column chunk
+ */
+ public MeasureColumnDataChunk getMeasureChunk(int ordinal) {
+ return measureDataChunks[ordinal];
+ }
+
+ /**
+ * Below method will be used to get the key for all the dictionary dimensions
+ * which is present in the query
+ *
+ * @param rowId row id selected after scanning
+ * @return return the dictionary key
+ */
+ protected byte[] getDictionaryKeyArray(int rowId) {
+ byte[] completeKey = new byte[fixedLengthKeySize];
+ int offset = 0;
+ for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
+ offset += dataChunks[dictionaryColumnBlockIndexes[i]]
+ .fillChunkData(completeKey, offset, rowId,
+ columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
+ }
+ rowCounter++;
+ return completeKey;
+ }
+
+ /**
+ * Just increment the counter incase of query only on measures.
+ */
+ public void incrementCounter() {
+ rowCounter ++;
+ currentRow ++;
+ }
+
+ /**
+ * Below method will be used to get the dimension data based on dimension
+ * ordinal and index
+ *
+ * @param dimOrdinal dimension ordinal present in the query
+ * @param rowId row index
+ * @return dimension data based on row id
+ */
+ protected byte[] getDimensionData(int dimOrdinal, int rowId) {
+ return dataChunks[dimOrdinal].getChunkData(rowId);
+ }
+
+ /**
+ * Below method will be used to get the dimension key array
+ * for all the no dictionary dimension present in the query
+ *
+ * @param rowId row number
+ * @return no dictionary keys for all no dictionary dimension
+ */
+ protected byte[][] getNoDictionaryKeyArray(int rowId) {
+ byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnBlockIndexes.length][];
+ int position = 0;
+ for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
+ noDictionaryColumnsKeys[position++] =
+ dataChunks[noDictionaryColumnBlockIndexes[i]].getChunkData(rowId);
+ }
+ return noDictionaryColumnsKeys;
+ }
+
+ /**
+ * Below method will be used to get the complex type keys array based
+ * on row id for all the complex type dimension selected in query
+ *
+ * @param rowId row number
+ * @return complex type key array for all the complex dimension selected in query
+ */
+ protected byte[][] getComplexTypeKeyArray(int rowId) {
+ return new byte[0][];
+ }
+
+ /**
+ * @return return the total number of row after scanning
+ */
+ public int numberOfOutputRows() {
+ return this.totalNumberOfRows;
+ }
+
+ /**
+ * to check whether any more row is present in the result
+ *
+ * @return
+ */
+ public boolean hasNext() {
+ return rowCounter < this.totalNumberOfRows;
+ }
+
+ /**
+ * As this class will be a flyweight object so
+ * for one block all the blocklet scanning will use same result object
+ * in that case we need to reset the counter to zero so
+ * for new result it will give the result from zero
+ */
+ public void reset() {
+ rowCounter = 0;
+ currentRow = -1;
+ }
+
+ /**
+ * @param totalNumberOfRows set total of number rows valid after scanning
+ */
+ public void setNumberOfRows(int totalNumberOfRows) {
+ this.totalNumberOfRows = totalNumberOfRows;
+ }
+
+ /**
+ * After applying filter it will return the bit set with the valid row indexes
+ * so below method will be used to set the row indexes
+ *
+ * @param indexes
+ */
+ public void setIndexes(int[] indexes) {
+ this.rowMapping = indexes;
+ }
+
+ /**
+ * Below method will be used to check whether measure value is null or not
+ *
+ * @param ordinal measure ordinal
+ * @param rowIndex row number to be checked
+ * @return whether it is null or not
+ */
+ protected boolean isNullMeasureValue(int ordinal, int rowIndex) {
+ return measureDataChunks[ordinal].getNullValueIndexHolder().getBitSet().get(rowIndex);
+ }
+
+ /**
+ * Below method will be used to get the measure value of
+ * long type
+ *
+ * @param ordinal measure ordinal
+ * @param rowIndex row number of the measure value
+ * @return measure value of long type
+ */
+ protected long getLongMeasureValue(int ordinal, int rowIndex) {
+ return measureDataChunks[ordinal].getMeasureDataHolder().getReadableLongValueByIndex(rowIndex);
+ }
+
+ /**
+ * Below method will be used to get the measure value of double type
+ *
+ * @param ordinal measure ordinal
+ * @param rowIndex row number
+ * @return measure value of double type
+ */
+ protected double getDoubleMeasureValue(int ordinal, int rowIndex) {
+ return measureDataChunks[ordinal].getMeasureDataHolder()
+ .getReadableDoubleValueByIndex(rowIndex);
+ }
+
+ /**
+ * Below method will be used to get the measure type of big decimal data type
+ *
+ * @param ordinal ordinal of the of the measure
+ * @param rowIndex row number
+ * @return measure of big decimal type
+ */
+ protected BigDecimal getBigDecimalMeasureValue(int ordinal, int rowIndex) {
+ return measureDataChunks[ordinal].getMeasureDataHolder()
+ .getReadableBigDecimalValueByIndex(rowIndex);
+ }
+
+ /**
+ * will return the current valid row id
+ *
+ * @return valid row id
+ */
+ public abstract int getCurrenrRowId();
+
+ /**
+ * @return dictionary key array for all the dictionary dimension
+ * selected in query
+ */
+ public abstract byte[] getDictionaryKeyArray();
+
+ /**
+ * Return the dimension data based on dimension ordinal
+ *
+ * @param dimensionOrdinal dimension ordinal
+ * @return dimension data
+ */
+ public abstract byte[] getDimensionKey(int dimensionOrdinal);
+
+ /**
+ * Below method will be used to get the complex type key array
+ *
+ * @return complex type key array
+ */
+ public abstract byte[][] getComplexTypeKeyArray();
+
+ /**
+ * Below method will be used to get the no dictionary key
+ * array for all the no dictionary dimension selected in query
+ *
+ * @return no dictionary key array for all the no dictionary dimension
+ */
+ public abstract byte[][] getNoDictionaryKeyArray();
+
+ /**
+ * Below method will be used to to check whether measure value
+ * is null or for a measure
+ *
+ * @param ordinal measure ordinal
+ * @return is null or not
+ */
+ public abstract boolean isNullMeasureValue(int ordinal);
+
+ /**
+ * Below method will be used to get the measure value for measure
+ * of long data type
+ *
+ * @param ordinal measure ordinal
+ * @return long value of measure
+ */
+ public abstract long getLongMeasureValue(int ordinal);
+
+ /**
+ * Below method will be used to get the value of measure of double
+ * type
+ *
+ * @param ordinal measure ordinal
+ * @return measure value
+ */
+ public abstract double getDoubleMeasureValue(int ordinal);
+
+ /**
+ * Below method will be used to get the data of big decimal type
+ * of a measure
+ *
+ * @param ordinal measure ordinal
+ * @return measure value
+ */
+ public abstract BigDecimal getBigDecimalMeasureValue(int ordinal);
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java b/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java
new file mode 100644
index 0000000..c13b0f7
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/BatchRawResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.result;
+
+/**
+ * Below class holds the query result of batches.
+ */
+public class BatchRawResult extends BatchResult {
+
+ /**
+ * This method will return one row at a time based on the counter given.
+ * @param counter
+ * @return
+ */
+ public Object[] getRawRow(int counter) {
+ return rows[counter];
+ }
+
+ /**
+ * For getting the total size.
+ * @return
+ */
+ public int getSize() {
+ return rows.length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/BatchResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/BatchResult.java b/core/src/main/java/org/carbondata/scan/result/BatchResult.java
new file mode 100644
index 0000000..dc14060
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/BatchResult.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.result;
+
+import java.util.NoSuchElementException;
+
+import org.carbondata.core.iterator.CarbonIterator;
+
+/**
+ * Below class holds the query result
+ */
+public class BatchResult extends CarbonIterator<Object[]> {
+
+ /**
+ * list of keys
+ */
+ protected Object[][] rows;
+
+ /**
+ * counter to check whether all the records are processed or not
+ */
+ protected int counter;
+
+ public BatchResult() {
+ this.rows = new Object[0][];
+ }
+
+ /**
+ * Below method will be used to get the rows
+ *
+ * @return
+ */
+ public Object[][] getRows() {
+ return rows;
+ }
+
+ /**
+ * Below method will be used to get the set the values
+ *
+ * @param rows
+ */
+ public void setRows(Object[][] rows) {
+ this.rows = rows;
+ }
+
+
+ /**
+ * Returns {@code true} if the iteration has more elements.
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ @Override public boolean hasNext() {
+ return counter < rows.length;
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ *
+ * @return the next element in the iteration
+ */
+ @Override public Object[] next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Object[] row = rows[counter];
+ counter++;
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java b/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java
new file mode 100644
index 0000000..f3085ce
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/ListBasedResultWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.result;
+
+import org.carbondata.scan.wrappers.ByteArrayWrapper;
+
+public class ListBasedResultWrapper {
+
+ private ByteArrayWrapper key;
+
+ private Object[] value;
+
+ /**
+ * @return the key
+ */
+ public ByteArrayWrapper getKey() {
+ return key;
+ }
+
+ /**
+ * @param key the key to set
+ */
+ public void setKey(ByteArrayWrapper key) {
+ this.key = key;
+ }
+
+ /**
+ * @return the value
+ */
+ public Object[] getValue() {
+ return value;
+ }
+
+ /**
+ * @param value the value to set
+ */
+ public void setValue(Object[] value) {
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/Result.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/Result.java b/core/src/main/java/org/carbondata/scan/result/Result.java
new file mode 100644
index 0000000..98466bb
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/Result.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.result;
+
+import org.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * Result interface for storing the result
+ */
+public interface Result<K, V> {
+ /**
+ * Below method will be used to
+ * add the sccaed result
+ *
+ * @param result
+ */
+ void addScannedResult(K result);
+
+ /**
+ * Returns {@code true} if the iteration has more elements.
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ boolean hasNext();
+
+ /**
+ * Below method will return the result key
+ *
+ * @return key
+ */
+ ByteArrayWrapper getKey();
+
+ /**
+ * Below code will return the result value
+ *
+ * @return value
+ */
+ V[] getValue();
+
+ void merge(Result<K, V> otherResult);
+
+ /**
+ * Below method will be used to get the result
+ *
+ * @return
+ */
+ K getResult();
+
+ /**
+ * @return size of the result
+ */
+ int size();
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java
new file mode 100644
index 0000000..962d9a3
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/impl/FilterQueryScannedResult.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.result.impl;
+
+import java.math.BigDecimal;
+
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * Result provider class in case of filter query
+ * In case of filter query data will be send
+ * based on filtered row index
+ */
+public class FilterQueryScannedResult extends AbstractScannedResult {
+
+ public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) {
+ super(tableBlockExecutionInfos);
+ }
+
+ /**
+ * @return dictionary key array for all the dictionary dimension
+ * selected in query
+ */
+ @Override public byte[] getDictionaryKeyArray() {
+ ++currentRow;
+ return getDictionaryKeyArray(rowMapping[currentRow]);
+ }
+
+ /**
+ * Below method will be used to get the complex type key array
+ *
+ * @return complex type key array
+ */
+ @Override public byte[][] getComplexTypeKeyArray() {
+ return getComplexTypeKeyArray(rowMapping[currentRow]);
+ }
+
+ /**
+ * Below method will be used to get the no dictionary key
+ * array for all the no dictionary dimension selected in query
+ *
+ * @return no dictionary key array for all the no dictionary dimension
+ */
+ @Override public byte[][] getNoDictionaryKeyArray() {
+ return getNoDictionaryKeyArray(rowMapping[currentRow]);
+ }
+
+ /**
+ * will return the current valid row id
+ *
+ * @return valid row id
+ */
+ @Override public int getCurrenrRowId() {
+ return rowMapping[currentRow];
+ }
+
+ /**
+ * Return the dimension data based on dimension ordinal
+ *
+ * @param dimensionOrdinal dimension ordinal
+ * @return dimension data
+ */
+ @Override public byte[] getDimensionKey(int dimensionOrdinal) {
+ return getDimensionData(dimensionOrdinal, rowMapping[currentRow]);
+ }
+
+ /**
+ * Below method will be used to to check whether measure value
+ * is null or for a measure
+ *
+ * @param ordinal measure ordinal
+ * @return is null or not
+ */
+ @Override public boolean isNullMeasureValue(int ordinal) {
+ return isNullMeasureValue(ordinal, rowMapping[currentRow]);
+ }
+
+ /**
+ * Below method will be used to get the measure value for measure
+ * of long data type
+ *
+ * @param ordinal measure ordinal
+ * @return long value of measure
+ */
+ @Override public long getLongMeasureValue(int ordinal) {
+ return getLongMeasureValue(ordinal, rowMapping[currentRow]);
+ }
+
+ /**
+ * Below method will be used to get the value of measure of double
+ * type
+ *
+ * @param ordinal measure ordinal
+ * @return measure value
+ */
+ @Override public double getDoubleMeasureValue(int ordinal) {
+ return getDoubleMeasureValue(ordinal, rowMapping[currentRow]);
+ }
+
+ /**
+ * Below method will be used to get the data of big decimal type
+ * of a measure
+ *
+ * @param ordinal measure ordinal
+ * @return measure value
+ */
+ @Override public BigDecimal getBigDecimalMeasureValue(int ordinal) {
+ return getBigDecimalMeasureValue(ordinal, rowMapping[currentRow]);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java b/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java
new file mode 100644
index 0000000..24ebf5b
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/impl/ListBasedResult.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.result.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.scan.result.ListBasedResultWrapper;
+import org.carbondata.scan.result.Result;
+import org.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * Below class is a holder over list based result wrapper
+ */
+public class ListBasedResult implements Result<List<ListBasedResultWrapper>, Object> {
+
+ /**
+ * current result list
+ */
+ private List<ListBasedResultWrapper> currentRowPointer;
+
+ /**
+ * all result list , this is required because if we merger all the scanned
+ * result from all the blocks in one list, that list creation will take more
+ * time as every time list will create a big array and then it will do copy
+ * the older element to new array, and creation of big array will also be a
+ * problem if memory is fragmented then jvm in to do defragmentation to
+ * create a big space, but if divide the data in multiple list than it avoid
+ * copy and defragmentation
+ */
+ private List<List<ListBasedResultWrapper>> allRowsResult;
+
+ /**
+ * counter to check how many result processed
+ */
+ private int totalRecordCounter = -1;
+
+ /**
+ * number of records
+ */
+ private int totalNumberOfRecords;
+
+ /**
+ * current counter of the record in list
+ */
+ private int listRecordCounter = -1;
+
+ /**
+ * current list counter
+ */
+ private int currentListCounter;
+
+ public ListBasedResult() {
+ currentRowPointer =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ allRowsResult =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+
+ /**
+ * below method will be used to add the scan result
+ */
+ @Override public void addScannedResult(List<ListBasedResultWrapper> listBasedResult) {
+ this.currentRowPointer = listBasedResult;
+ totalNumberOfRecords = listBasedResult.size();
+ allRowsResult.add(listBasedResult);
+ }
+
+ /**
+ * Method to check more result is present
+ * or not
+ */
+ @Override public boolean hasNext() {
+ if (allRowsResult.size() == 0) {
+ return false;
+ }
+ // As we are storing data in list of list, below code is to check whether
+ // any more result is present
+ // in the result.
+ // first it will check list counter is zero if it is zero
+ // than it will check list counter to check how many list has been processed
+ // if more list are present and all the list of current list is processed
+ // than it will take a new list from all row result list
+ totalRecordCounter++;
+ listRecordCounter++;
+ if (listRecordCounter == 0 || (listRecordCounter >= currentRowPointer.size()
+ && currentListCounter < allRowsResult.size())) {
+ listRecordCounter = 0;
+ currentRowPointer = allRowsResult.get(currentListCounter);
+ currentListCounter++;
+ }
+ return totalRecordCounter < totalNumberOfRecords;
+ }
+
+ /**
+ * @return key
+ */
+ @Override public ByteArrayWrapper getKey() {
+ return currentRowPointer.get(listRecordCounter).getKey();
+ }
+
+ /**
+ * @return will return the value
+ */
+ @Override public Object[] getValue() {
+ return currentRowPointer.get(listRecordCounter).getValue();
+ }
+
+ /***
+ * below method will be used to merge the
+ * scanned result
+ *
+ * @param otherResult return to be merged
+ */
+ @Override public void merge(Result<List<ListBasedResultWrapper>, Object> otherResult) {
+ if (otherResult.size() > 0) {
+ totalNumberOfRecords += otherResult.size();
+ this.allRowsResult.add(otherResult.getResult());
+ }
+ }
+
+ /**
+ * Return the size of the result
+ */
+ @Override public int size() {
+ return totalNumberOfRecords;
+ }
+
+ /**
+ * @return the complete result
+ */
+ @Override public List<ListBasedResultWrapper> getResult() {
+ return currentRowPointer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java
new file mode 100644
index 0000000..9782099
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/impl/NonFilterQueryScannedResult.java
@@ -0,0 +1,109 @@
+package org.carbondata.scan.result.impl;
+
+import java.math.BigDecimal;
+
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * Result provide class for non filter query
+ * In case of no filter query we need to return
+ * complete data
+ */
+public class NonFilterQueryScannedResult extends AbstractScannedResult {
+
+ public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) {
+ super(blockExecutionInfo);
+ }
+
+ /**
+ * @return dictionary key array for all the dictionary dimension selected in
+ * query
+ */
+ @Override public byte[] getDictionaryKeyArray() {
+ ++currentRow;
+ return getDictionaryKeyArray(currentRow);
+ }
+
+ /**
+ * Below method will be used to get the complex type key array
+ *
+ * @return complex type key array
+ */
+ @Override public byte[][] getComplexTypeKeyArray() {
+ return getComplexTypeKeyArray(currentRow);
+ }
+
+ /**
+ * Below method will be used to get the no dictionary key array for all the
+ * no dictionary dimension selected in query
+ *
+ * @return no dictionary key array for all the no dictionary dimension
+ */
+ @Override public byte[][] getNoDictionaryKeyArray() {
+ return getNoDictionaryKeyArray(currentRow);
+ }
+
+ /**
+ * will return the current valid row id
+ *
+ * @return valid row id
+ */
+ @Override public int getCurrenrRowId() {
+ return currentRow;
+ }
+
+ /**
+ * Return the dimension data based on dimension ordinal
+ *
+ * @param dimensionOrdinal dimension ordinal
+ * @return dimension data
+ */
+ @Override public byte[] getDimensionKey(int dimensionOrdinal) {
+ return getDimensionData(dimensionOrdinal, currentRow);
+ }
+
+ /**
+ * Below method will be used to to check whether measure value is null or
+ * for a measure
+ *
+ * @param ordinal measure ordinal
+ * @return is null or not
+ */
+ @Override public boolean isNullMeasureValue(int ordinal) {
+ return isNullMeasureValue(ordinal, currentRow);
+ }
+
+ /**
+ * Below method will be used to get the measure value for measure of long
+ * data type
+ *
+ * @param ordinal measure ordinal
+ * @return long value of measure
+ */
+ @Override public long getLongMeasureValue(int ordinal) {
+ return getLongMeasureValue(ordinal, currentRow);
+ }
+
+ /**
+ * Below method will be used to get the value of measure of double type
+ *
+ * @param ordinal measure ordinal
+ * @return measure value
+ */
+ @Override public double getDoubleMeasureValue(int ordinal) {
+ return getDoubleMeasureValue(ordinal, currentRow);
+ }
+
+ /**
+ * Below method will be used to get the data of big decimal type of a
+ * measure
+ *
+ * @param ordinal measure ordinal
+ * @return measure value
+ */
+ @Override public BigDecimal getBigDecimalMeasureValue(int ordinal) {
+ return getBigDecimalMeasureValue(ordinal, currentRow);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
new file mode 100644
index 0000000..2356a9f
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.result.iterator;
+
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.DataRefNode;
+import org.carbondata.core.carbon.datastore.DataRefNodeFinder;
+import org.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.core.util.CarbonProperties;
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.model.QueryModel;
+import org.carbondata.scan.processor.AbstractDataBlockIterator;
+import org.carbondata.scan.processor.impl.DataBlockIteratorImpl;
+
+/**
+ * In case of detail query we cannot keep all the records in memory so for
+ * executing that query are returning a iterator over block and every time next
+ * call will come it will execute the block and return the result
+ */
+public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
+
+ /**
+ * LOGGER.
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractDetailQueryResultIterator.class.getName());
+
+ /**
+ * execution info of the block
+ */
+ protected List<BlockExecutionInfo> blockExecutionInfos;
+
+ /**
+ * number of cores which can be used
+ */
+ private int batchSize;
+
+ /**
+ * file reader which will be used to execute the query
+ */
+ protected FileHolder fileReader;
+
+ protected AbstractDataBlockIterator dataBlockIterator;
+
+ protected boolean nextBatch = false;
+
+ public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel) {
+ String batchSizeString =
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
+ if (null != batchSizeString) {
+ try {
+ batchSize = Integer.parseInt(batchSizeString);
+ } catch (NumberFormatException ne) {
+ LOGGER.error("Invalid inmemory records size. Using default value");
+ batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+ }
+ } else {
+ batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+ }
+
+ this.blockExecutionInfos = infos;
+ this.fileReader = FileFactory.getFileHolder(
+ FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getStorePath()));
+ intialiseInfos();
+ }
+
+ private void intialiseInfos() {
+ for (BlockExecutionInfo blockInfo : blockExecutionInfos) {
+ DataRefNodeFinder finder = new BTreeDataRefNodeFinder(blockInfo.getEachColumnValueSize());
+ DataRefNode startDataBlock = finder
+ .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
+ DataRefNode endDataBlock = finder
+ .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
+ long numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ blockInfo.setFirstDataBlock(startDataBlock);
+ blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
+ }
+ }
+
+ @Override public boolean hasNext() {
+ if ((dataBlockIterator != null && dataBlockIterator.hasNext()) || nextBatch) {
+ return true;
+ } else {
+ dataBlockIterator = getDataBlockIterator();
+ while (dataBlockIterator != null) {
+ if (dataBlockIterator.hasNext()) {
+ return true;
+ }
+ dataBlockIterator = getDataBlockIterator();
+ }
+ return false;
+ }
+ }
+
+ private DataBlockIteratorImpl getDataBlockIterator() {
+ if(blockExecutionInfos.size() > 0) {
+ BlockExecutionInfo executionInfo = blockExecutionInfos.get(0);
+ blockExecutionInfos.remove(executionInfo);
+ return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize);
+ }
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java
new file mode 100644
index 0000000..5cc4f1e
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/iterator/ChunkRowIterator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.scan.result.iterator;
+
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.scan.result.BatchResult;
+
+/**
+ * Iterator over row result
+ */
+public class ChunkRowIterator extends CarbonIterator<Object[]> {
+
+ /**
+ * iterator over chunk result
+ */
+ private CarbonIterator<BatchResult> iterator;
+
+ /**
+ * currect chunk
+ */
+ private BatchResult currentchunk;
+
+ public ChunkRowIterator(CarbonIterator<BatchResult> iterator) {
+ this.iterator = iterator;
+ if (iterator.hasNext()) {
+ currentchunk = iterator.next();
+ }
+ }
+
+ /**
+ * Returns {@code true} if the iteration has more elements. (In other words,
+ * returns {@code true} if {@link #next} would return an element rather than
+ * throwing an exception.)
+ *
+ * @return {@code true} if the iteration has more elements
+ */
+ @Override public boolean hasNext() {
+ if (null != currentchunk) {
+ if ((currentchunk.hasNext())) {
+ return true;
+ } else if (!currentchunk.hasNext()) {
+ while (iterator.hasNext()) {
+ currentchunk = iterator.next();
+ if (currentchunk != null && currentchunk.hasNext()) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the next element in the iteration.
+ *
+ * @return the next element in the iteration
+ */
+ @Override public Object[] next() {
+ return currentchunk.next();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java
new file mode 100644
index 0000000..4eb50ac
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/iterator/DetailQueryResultIterator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.result.iterator;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.carbondata.scan.executor.exception.QueryExecutionException;
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.model.QueryModel;
+import org.carbondata.scan.result.BatchResult;
+import org.carbondata.scan.result.ListBasedResultWrapper;
+import org.carbondata.scan.result.preparator.QueryResultPreparator;
+
+/**
+ * In case of detail query we cannot keep all the records in memory so for
+ * executing that query are returning a iterator over block and every time next
+ * call will come it will execute the block and return the result
+ */
+public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator {
+
+ /**
+ * to prepare the result
+ */
+ private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
+
+ private ExecutorService execService = Executors.newFixedThreadPool(1);
+
+ private Future<BatchResult> future;
+
+ public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+ QueryResultPreparator queryResultPreparator) {
+ super(infos, queryModel);
+ this.queryResultPreparator = queryResultPreparator;
+ }
+
+ @Override public BatchResult next() {
+ BatchResult result;
+ try {
+ if (future == null) {
+ future = execute();
+ }
+ result = future.get();
+ nextBatch = false;
+ if (hasNext()) {
+ nextBatch = true;
+ future = execute();
+ } else {
+ fileReader.finish();
+ }
+ } catch (Exception ex) {
+ fileReader.finish();
+ throw new RuntimeException(ex.getCause().getMessage());
+ }
+ return result;
+ }
+
+ private Future<BatchResult> execute() {
+ return execService.submit(new Callable<BatchResult>() {
+ @Override public BatchResult call() throws QueryExecutionException {
+ return queryResultPreparator.prepareQueryResult(dataBlockIterator.next());
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java
new file mode 100644
index 0000000..8c028b2
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/iterator/RawResultIterator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.result.iterator;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.core.keygenerator.KeyGenException;
+import org.carbondata.scan.result.BatchRawResult;
+import org.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This is a wrapper iterator over the detail raw query iterator.
+ * This iterator will handle the processing of the raw rows.
+ * This will handle the batch results and will iterate on the batches and give single row.
+ */
+public class RawResultIterator extends CarbonIterator<Object[]> {
+
+ private final SegmentProperties sourceSegProperties;
+
+ private final SegmentProperties destinationSegProperties;
+ /**
+ * Iterator of the Batch raw result.
+ */
+ private CarbonIterator<BatchRawResult> detailRawQueryResultIterator;
+
+ /**
+ * Counter to maintain the row counter.
+ */
+ private int counter = 0;
+
+ private Object[] currentConveretedRawRow = null;
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RawResultIterator.class.getName());
+
+ /**
+ * batch of the result.
+ */
+ private BatchRawResult batch;
+
+ public RawResultIterator(CarbonIterator<BatchRawResult> detailRawQueryResultIterator,
+ SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) {
+ this.detailRawQueryResultIterator = detailRawQueryResultIterator;
+ this.sourceSegProperties = sourceSegProperties;
+ this.destinationSegProperties = destinationSegProperties;
+ }
+
+ @Override public boolean hasNext() {
+
+ if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
+ if (detailRawQueryResultIterator.hasNext()) {
+ batch = detailRawQueryResultIterator.next();
+ counter = 0; // batch changed so reset the counter.
+ } else {
+ return false;
+ }
+ }
+
+ if (!checkIfBatchIsProcessedCompletely(batch)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override public Object[] next() {
+ if (null == batch) { // for 1st time
+ batch = detailRawQueryResultIterator.next();
+ }
+ if (!checkIfBatchIsProcessedCompletely(batch)) {
+ try {
+ if(null != currentConveretedRawRow){
+ counter++;
+ Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+ currentConveretedRawRow = null;
+ return currentConveretedRawRowTemp;
+ }
+ return convertRow(batch.getRawRow(counter++));
+ } catch (KeyGenException e) {
+ LOGGER.error(e.getMessage());
+ return null;
+ }
+ } else { // completed one batch.
+ batch = detailRawQueryResultIterator.next();
+ counter = 0;
+ }
+ try {
+ if(null != currentConveretedRawRow){
+ counter++;
+ Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
+ currentConveretedRawRow = null;
+ return currentConveretedRawRowTemp;
+ }
+
+ return convertRow(batch.getRawRow(counter++));
+ } catch (KeyGenException e) {
+ LOGGER.error(e.getMessage());
+ return null;
+ }
+
+ }
+
+ /**
+ * for fetching the row with out incrementing counter.
+ * @return
+ */
+ public Object[] fetchConverted() throws KeyGenException {
+ if(null != currentConveretedRawRow){
+ return currentConveretedRawRow;
+ }
+ if(hasNext())
+ {
+ Object[] rawRow = batch.getRawRow(counter);
+ currentConveretedRawRow = convertRow(rawRow);;
+ return currentConveretedRawRow;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private Object[] convertRow(Object[] rawRow) throws KeyGenException {
+ byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey();
+ long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims);
+ byte[] covertedBytes =
+ destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray);
+ ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes);
+ return rawRow;
+ }
+
+ /**
+ * To check if the batch is processed completely
+ * @param batch
+ * @return
+ */
+ private boolean checkIfBatchIsProcessedCompletely(BatchRawResult batch){
+ if(counter < batch.getSize())
+ {
+ return false;
+ }
+ else{
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java b/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java
new file mode 100644
index 0000000..7ef5b6d
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/preparator/QueryResultPreparator.java
@@ -0,0 +1,10 @@
+package org.carbondata.scan.result.preparator;
+
+import org.carbondata.scan.result.BatchResult;
+import org.carbondata.scan.result.Result;
+
+public interface QueryResultPreparator<K, V> {
+
+ public BatchResult prepareQueryResult(Result<K, V> scannedResult);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java b/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java
new file mode 100644
index 0000000..a42dc67
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/preparator/impl/AbstractQueryResultPreparator.java
@@ -0,0 +1,87 @@
+package org.carbondata.scan.result.preparator.impl;
+
+import java.util.List;
+
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.scan.executor.impl.QueryExecutorProperties;
+import org.carbondata.scan.model.QueryDimension;
+import org.carbondata.scan.model.QueryModel;
+import org.carbondata.scan.result.BatchResult;
+import org.carbondata.scan.result.preparator.QueryResultPreparator;
+import org.carbondata.scan.util.DataTypeUtil;
+
+public abstract class AbstractQueryResultPreparator<K, V> implements QueryResultPreparator<K, V> {
+
+ /**
+ * query properties
+ */
+ protected QueryExecutorProperties queryExecuterProperties;
+
+ /**
+ * query model
+ */
+ protected QueryModel queryModel;
+
+ public AbstractQueryResultPreparator(QueryExecutorProperties executerProperties,
+ QueryModel queryModel) {
+ this.queryExecuterProperties = executerProperties;
+ this.queryModel = queryModel;
+ }
+
+ protected void fillDimensionData(Object[][] convertedResult, List<QueryDimension> queryDimensions,
+ int dimensionCount, Object[] row, int rowIndex) {
+ QueryDimension queryDimension;
+ for (int i = 0; i < dimensionCount; i++) {
+ queryDimension = queryDimensions.get(i);
+ if (!CarbonUtil
+ .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) {
+ row[queryDimension.getQueryOrder()] = convertedResult[i][rowIndex];
+ } else if (CarbonUtil
+ .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType());
+ row[queryDimension.getQueryOrder()] = directDictionaryGenerator
+ .getValueFromSurrogate((Integer) convertedResult[i][rowIndex]);
+ } else {
+ if (queryExecuterProperties.sortDimIndexes[i] == 1) {
+ row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
+ queryExecuterProperties.columnToDictionayMapping
+ .get(queryDimension.getDimension().getColumnId())
+ .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][rowIndex]),
+ queryDimension.getDimension().getDataType());
+ } else {
+ row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
+ queryExecuterProperties.columnToDictionayMapping
+ .get(queryDimension.getDimension().getColumnId())
+ .getDictionaryValueForKey((Integer) convertedResult[i][rowIndex]),
+ queryDimension.getDimension().getDataType());
+ }
+ }
+ }
+ }
+
+ protected Object[][] encodeToRows(Object[][] data) {
+ if (data.length == 0) {
+ return data;
+ }
+ Object[][] rData = new Object[data[0].length][data.length];
+ int len = data.length;
+ for (int i = 0; i < rData.length; i++) {
+ for (int j = 0; j < len; j++) {
+ rData[i][j] = data[j][i];
+ }
+ }
+ return rData;
+ }
+
+ protected BatchResult getEmptyChunkResult(int size) {
+ Object[][] row = new Object[size][1];
+ BatchResult chunkResult = new BatchResult();
+ chunkResult.setRows(row);
+ return chunkResult;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java
new file mode 100644
index 0000000..17735bc
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/preparator/impl/DetailQueryResultPreparatorImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.result.preparator.impl;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.scan.executor.impl.QueryExecutorProperties;
+import org.carbondata.scan.model.QueryDimension;
+import org.carbondata.scan.model.QueryMeasure;
+import org.carbondata.scan.model.QueryModel;
+import org.carbondata.scan.result.BatchResult;
+import org.carbondata.scan.result.ListBasedResultWrapper;
+import org.carbondata.scan.result.Result;
+import org.carbondata.scan.util.DataTypeUtil;
+import org.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * Below class will be used to get the result by converting to actual data
+ * Actual data conversion can be converting the surrogate key to actual data
+ *
+ * @TODO there are many things in class which is very confusing, need to check
+ * why it was handled like that and how we can handle that in a better
+ * way.Need to revisit this class. IF aggregation is push down to spark
+ * layer and if we can process the data in byte array format then this
+ * class wont be useful so in future we can delete this class.
+ * @TODO need to expose one interface which will return the result based on required type
+ * for example its implementation case return converted result or directly result with out
+ * converting to actual value
+ */
+public class DetailQueryResultPreparatorImpl
+ extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DetailQueryResultPreparatorImpl.class.getName());
+
+ public DetailQueryResultPreparatorImpl(QueryExecutorProperties executerProperties,
+ QueryModel queryModel) {
+ super(executerProperties, queryModel);
+ }
+
+ @Override public BatchResult prepareQueryResult(
+ Result<List<ListBasedResultWrapper>, Object> scannedResult) {
+ if ((null == scannedResult || scannedResult.size() < 1)) {
+ return new BatchResult();
+ }
+ List<QueryDimension> queryDimension = queryModel.getQueryDimension();
+ int dimensionCount = queryDimension.size();
+ int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureDataTypes.length;
+ Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
+ if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0
+ && scannedResult.size() > 0) {
+ return getEmptyChunkResult(scannedResult.size());
+ }
+ int currentRow = 0;
+ long[] surrogateResult = null;
+ int noDictionaryColumnIndex = 0;
+ ByteArrayWrapper key = null;
+ Object[] value = null;
+ while (scannedResult.hasNext()) {
+ key = scannedResult.getKey();
+ value = scannedResult.getValue();
+ if (key != null) {
+ surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator()
+ .getKeyArray(key.getDictionaryKey(),
+ queryExecuterProperties.keyStructureInfo.getMaskedBytes());
+ for (int i = 0; i < dimensionCount; i++) {
+ if (!CarbonUtil.hasEncoding(queryDimension.get(i).getDimension().getEncoder(),
+ Encoding.DICTIONARY)) {
+ resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType(
+ new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++),
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
+ queryDimension.get(i).getDimension().getDataType());
+ } else {
+ resultData[currentRow][i] =
+ (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()];
+ }
+ }
+ }
+ if (value != null) {
+ System.arraycopy(value, 0, resultData[currentRow], dimensionCount,
+ queryExecuterProperties.measureDataTypes.length);
+ }
+ currentRow++;
+ noDictionaryColumnIndex = 0;
+ }
+ if (resultData.length > 0) {
+ resultData = encodeToRows(resultData);
+ }
+ return getResult(queryModel, resultData);
+ }
+
+ private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) {
+
+ int rowSize = convertedResult[0].length;
+ Object[][] rows = new Object[rowSize][];
+ List<QueryDimension> queryDimensions = queryModel.getQueryDimension();
+ int dimensionCount = queryDimensions.size();
+ int msrCount = queryExecuterProperties.measureDataTypes.length;
+ Object[] row;
+ for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+ row = new Object[dimensionCount + msrCount];
+ fillDimensionData(convertedResult, queryDimensions, dimensionCount, row, rowIndex);
+
+ QueryMeasure msr;
+ for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
+ msr = queryModel.getQueryMeasures().get(i);
+ row[msr.getQueryOrder()] = convertedResult[dimensionCount + i][rowIndex];
+ }
+ rows[rowIndex] = row;
+ }
+ LOGGER.info(
+ "###########################################------ Total Number of records" + rowSize);
+ BatchResult chunkResult = new BatchResult();
+ chunkResult.setRows(rows);
+ return chunkResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java
new file mode 100644
index 0000000..75d78a2
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/result/preparator/impl/RawQueryResultPreparatorImpl.java
@@ -0,0 +1,127 @@
+package org.carbondata.scan.result.preparator.impl;
+
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.scan.executor.impl.QueryExecutorProperties;
+import org.carbondata.scan.model.QueryDimension;
+import org.carbondata.scan.model.QueryMeasure;
+import org.carbondata.scan.model.QueryModel;
+import org.carbondata.scan.model.QuerySchemaInfo;
+import org.carbondata.scan.result.BatchRawResult;
+import org.carbondata.scan.result.BatchResult;
+import org.carbondata.scan.result.ListBasedResultWrapper;
+import org.carbondata.scan.result.Result;
+import org.carbondata.scan.util.DataTypeUtil;
+import org.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * It does not decode the dictionary.
+ */
+public class RawQueryResultPreparatorImpl
+ extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RawQueryResultPreparatorImpl.class.getName());
+
+ private QuerySchemaInfo querySchemaInfo;
+
+ public RawQueryResultPreparatorImpl(QueryExecutorProperties executerProperties,
+ QueryModel queryModel) {
+ super(executerProperties, queryModel);
+ querySchemaInfo = new QuerySchemaInfo();
+ querySchemaInfo.setKeyGenerator(queryExecuterProperties.keyStructureInfo.getKeyGenerator());
+ querySchemaInfo.setMaskedByteIndexes(queryExecuterProperties.keyStructureInfo.getMaskedBytes());
+ querySchemaInfo.setQueryDimensions(queryModel.getQueryDimension()
+ .toArray(new QueryDimension[queryModel.getQueryDimension().size()]));
+ querySchemaInfo.setQueryMeasures(queryModel.getQueryMeasures()
+ .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()]));
+ int msrSize = queryExecuterProperties.measureDataTypes.length;
+ int dimensionCount = queryModel.getQueryDimension().size();
+ int[] queryOrder = new int[dimensionCount + msrSize];
+ int[] queryReverseOrder = new int[dimensionCount + msrSize];
+ for (int i = 0; i < dimensionCount; i++) {
+ queryOrder[queryModel.getQueryDimension().get(i).getQueryOrder()] = i;
+ queryReverseOrder[i] = queryModel.getQueryDimension().get(i).getQueryOrder();
+ }
+ for (int i = 0; i < msrSize; i++) {
+ queryOrder[queryModel.getQueryMeasures().get(i).getQueryOrder()] = i + dimensionCount;
+ queryReverseOrder[i + dimensionCount] = queryModel.getQueryMeasures().get(i).getQueryOrder();
+ }
+ querySchemaInfo.setQueryOrder(queryOrder);
+ querySchemaInfo.setQueryReverseOrder(queryReverseOrder);
+ }
+
+ @Override public BatchResult prepareQueryResult(
+ Result<List<ListBasedResultWrapper>, Object> scannedResult) {
+ if ((null == scannedResult || scannedResult.size() < 1)) {
+ return new BatchRawResult();
+ }
+ QueryDimension[] queryDimensions = querySchemaInfo.getQueryDimensions();
+ int msrSize = queryExecuterProperties.measureDataTypes.length;
+ int dimSize = queryDimensions.length;
+ int[] order = querySchemaInfo.getQueryReverseOrder();
+ Object[][] resultData = new Object[scannedResult.size()][];
+ Object[] value;
+ Object[] row;
+ int counter = 0;
+ if (queryModel.isRawBytesDetailQuery()) {
+ while (scannedResult.hasNext()) {
+ value = scannedResult.getValue();
+ row = new Object[msrSize + 1];
+ row[0] = scannedResult.getKey();
+ if (value != null) {
+ assert (value.length == msrSize);
+ System.arraycopy(value, 0, row, 1, msrSize);
+ }
+ resultData[counter] = row;
+ counter++;
+ }
+ } else {
+ while (scannedResult.hasNext()) {
+ value = scannedResult.getValue();
+ row = new Object[msrSize + dimSize];
+ ByteArrayWrapper key = scannedResult.getKey();
+ if (key != null) {
+ long[] surrogateResult = querySchemaInfo.getKeyGenerator()
+ .getKeyArray(key.getDictionaryKey(), querySchemaInfo.getMaskedByteIndexes());
+ int noDictionaryColumnIndex = 0;
+ for (int i = 0; i < dimSize; i++) {
+ if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
+ row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
+ new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++)),
+ queryDimensions[i].getDimension().getDataType());
+ } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator directDictionaryGenerator =
+ DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+ queryDimensions[i].getDimension().getDataType());
+ if (directDictionaryGenerator != null) {
+ row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
+ (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()]);
+ }
+ } else {
+ row[order[i]] =
+ (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()];
+ }
+ }
+ }
+ for (int i = 0; i < msrSize; i++) {
+ row[order[i + queryDimensions.length]] = value[i];
+ }
+ resultData[counter] = row;
+ counter++;
+ }
+ }
+
+ LOGGER.info("###########################---- Total Number of records" + scannedResult.size());
+ BatchRawResult result = new BatchRawResult();
+ result.setRows(resultData);
+ return result;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java
new file mode 100644
index 0000000..4d7f5d3
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/scanner/AbstractBlockletScanner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.scanner;
+
+import org.carbondata.scan.executor.exception.QueryExecutionException;
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+import org.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * Blocklet scanner class to process the block
+ */
+public abstract class AbstractBlockletScanner implements BlockletScanner {
+
+ /**
+ * scanner result
+ */
+ protected AbstractScannedResult scannedResult;
+
+ /**
+ * block execution info
+ */
+ protected BlockExecutionInfo blockExecutionInfo;
+
+ public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) {
+ this.blockExecutionInfo = tableBlockExecutionInfos;
+ }
+
+ @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+ throws QueryExecutionException {
+ fillKeyValue(blocksChunkHolder);
+ return scannedResult;
+ }
+
+ protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) {
+ scannedResult.reset();
+ scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
+ .getMeasureChunks(blocksChunkHolder.getFileReader(),
+ blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
+ scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
+
+ scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
+ .getDimensionChunks(blocksChunkHolder.getFileReader(),
+ blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java b/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java
new file mode 100644
index 0000000..f1a0646
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/scanner/BlockletScanner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.scanner;
+
+import org.carbondata.scan.executor.exception.QueryExecutionException;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+import org.carbondata.scan.result.AbstractScannedResult;
+
+/**
+ * Interface for processing the block
+ * Processing can be filter based processing or non filter based processing
+ */
+public interface BlockletScanner {
+
+ /**
+ * Below method will used to process the block data and get the scanned result
+ *
+ * @param blocksChunkHolder block chunk which holds the block data
+ * @return scannerResult
+ * result after processing
+ * @throws QueryExecutionException
+ */
+ AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+ throws QueryExecutionException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java
new file mode 100644
index 0000000..830146d
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/scanner/impl/FilterScanner.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.scanner.impl;
+
+import java.util.BitSet;
+
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.core.util.CarbonProperties;
+import org.carbondata.scan.executor.exception.QueryExecutionException;
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.carbondata.scan.filter.executer.FilterExecuter;
+import org.carbondata.scan.processor.BlocksChunkHolder;
+import org.carbondata.scan.result.AbstractScannedResult;
+import org.carbondata.scan.result.impl.FilterQueryScannedResult;
+import org.carbondata.scan.scanner.AbstractBlockletScanner;
+
+/**
+ * Below class will be used for filter query processing
+ * this class will be first apply the filter then it will read the block if
+ * required and return the scanned result
+ */
+public class FilterScanner extends AbstractBlockletScanner {
+
+ /**
+ * filter tree
+ */
+ private FilterExecuter filterExecuter;
+
+ /**
+ * this will be used to apply min max
+ * this will be useful for dimension column which is on the right side
+ * as node finder will always give tentative blocks, if column data stored individually
+ * and data is in sorted order then we can check whether filter is in the range of min max or not
+ * if it present then only we can apply filter on complete data.
+ * this will be very useful in case of sparse data when rows are
+ * repeating.
+ */
+ private boolean isMinMaxEnabled;
+
+ public FilterScanner(BlockExecutionInfo blockExecutionInfo) {
+ super(blockExecutionInfo);
+ scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
+ // to check whether min max is enabled or not
+ String minMaxEnableValue = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED,
+ CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE);
+ if (null != minMaxEnableValue) {
+ isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue);
+ }
+ // get the filter tree
+ this.filterExecuter = blockExecutionInfo.getFilterExecuterTree();
+ }
+
+ /**
+ * Below method will be used to process the block
+ *
+ * @param blocksChunkHolder block chunk holder which holds the data
+ * @throws QueryExecutionException
+ * @throws FilterUnsupportedException
+ */
+ @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder)
+ throws QueryExecutionException {
+ try {
+ fillScannedResult(blocksChunkHolder);
+ } catch (FilterUnsupportedException e) {
+ throw new QueryExecutionException(e.getMessage());
+ }
+ return scannedResult;
+ }
+
+ /**
+ * This method will process the data in below order
+ * 1. first apply min max on the filter tree and check whether any of the filter
+ * is fall on the range of min max, if not then return empty result
+ * 2. If filter falls on min max range then apply filter on actual
+ * data and get the filtered row index
+ * 3. if row index is empty then return the empty result
+ * 4. if row indexes is not empty then read only those blocks(measure or dimension)
+ * which was present in the query but not present in the filter, as while applying filter
+ * some of the blocks where already read and present in chunk holder so not need to
+ * read those blocks again, this is to avoid reading of same blocks which was already read
+ * 5. Set the blocks and filter indexes to result
+ *
+ * @param blocksChunkHolder
+ * @throws FilterUnsupportedException
+ */
+ private void fillScannedResult(BlocksChunkHolder blocksChunkHolder)
+ throws FilterUnsupportedException {
+
+ scannedResult.reset();
+ // apply min max
+ if (isMinMaxEnabled) {
+ BitSet bitSet = this.filterExecuter
+ .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(),
+ blocksChunkHolder.getDataBlock().getColumnsMinValue());
+ if (bitSet.isEmpty()) {
+ scannedResult.setNumberOfRows(0);
+ scannedResult.setIndexes(new int[0]);
+ return;
+ }
+ }
+ // apply filter on actual data
+ BitSet bitSet = this.filterExecuter.applyFilter(blocksChunkHolder);
+ // if indexes is empty then return with empty result
+ if (bitSet.isEmpty()) {
+ scannedResult.setNumberOfRows(0);
+ scannedResult.setIndexes(new int[0]);
+ return;
+ }
+ // get the row indexes from bot set
+ int[] indexes = new int[bitSet.cardinality()];
+ int index = 0;
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+ indexes[index++] = i;
+ }
+
+ FileHolder fileReader = blocksChunkHolder.getFileReader();
+ int[] allSelectedDimensionBlocksIndexes =
+ blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
+ DimensionColumnDataChunk[] dimensionColumnDataChunk =
+ new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
+ // read dimension chunk blocks from file which is not present
+ for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
+ if (null == blocksChunkHolder.getDimensionDataChunk()[allSelectedDimensionBlocksIndexes[i]]) {
+ dimensionColumnDataChunk[allSelectedDimensionBlocksIndexes[i]] =
+ blocksChunkHolder.getDataBlock()
+ .getDimensionChunk(fileReader, allSelectedDimensionBlocksIndexes[i]);
+ } else {
+ dimensionColumnDataChunk[allSelectedDimensionBlocksIndexes[i]] =
+ blocksChunkHolder.getDimensionDataChunk()[allSelectedDimensionBlocksIndexes[i]];
+ }
+ }
+ MeasureColumnDataChunk[] measureColumnDataChunk =
+ new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
+ int[] allSelectedMeasureBlocksIndexes = blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
+
+ // read the measure chunk blocks which is not present
+ for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
+
+ if (null == blocksChunkHolder.getMeasureDataChunk()[allSelectedMeasureBlocksIndexes[i]]) {
+ measureColumnDataChunk[allSelectedMeasureBlocksIndexes[i]] =
+ blocksChunkHolder.getDataBlock()
+ .getMeasureChunk(fileReader, allSelectedMeasureBlocksIndexes[i]);
+ } else {
+ measureColumnDataChunk[allSelectedMeasureBlocksIndexes[i]] =
+ blocksChunkHolder.getMeasureDataChunk()[allSelectedMeasureBlocksIndexes[i]];
+ }
+ }
+ scannedResult.setDimensionChunks(dimensionColumnDataChunk);
+ scannedResult.setIndexes(indexes);
+ scannedResult.setMeasureChunks(measureColumnDataChunk);
+ scannedResult.setNumberOfRows(indexes.length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/1c725f5b/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java
new file mode 100644
index 0000000..b582a95
--- /dev/null
+++ b/core/src/main/java/org/carbondata/scan/scanner/impl/NonFilterScanner.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.scan.scanner.impl;
+
+import org.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.carbondata.scan.result.impl.NonFilterQueryScannedResult;
+import org.carbondata.scan.scanner.AbstractBlockletScanner;
+
+/**
+ * Non filter processor which will be used for non filter query
+ * In case of non filter query we just need to read all the blocks requested in the
+ * query and pass it to scanned result
+ */
+public class NonFilterScanner extends AbstractBlockletScanner {
+
+ public NonFilterScanner(BlockExecutionInfo blockExecutionInfo) {
+ super(blockExecutionInfo);
+ // as its a non filter query creating a non filter query scanned result object
+ scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo);
+ }
+}