You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:12 UTC
[24/56] [abbrv] incubator-carbondata git commit: Optimized detail
query flow and cleanup (#691)
Optimized detail query flow and cleanup (#691)
* Optimizing detail query
* Optimizing detail query flow
* Optimizing detail query flow
* Optimized raw detail query to improve push up performance.
* Fixed bugs
* reverted wrong check in
* Rebased the code
* Removed aggregation from core
* Refactored core package and fixed test cases
* Fixed bugs
* Fixed review comments and deleted aggregate classes after merge from master
* Removed unused code
* Optimized scanner flow
* Optimized scanner flow
* Optimized scanning flow
* Optimized scanner flow
* Refactored code
* Refactored code
* Removed unused code
* Reverted unnecessary comment
* Reverted queryinterface package from core
* Removed queryinterface package from core
* Handled review comments
* Handled review comments
* Added assert
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/29360501
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/29360501
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/29360501
Branch: refs/heads/master
Commit: 29360501336ddc54a349257512fffb5bd8d87126
Parents: 656577d
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Sat Jun 18 21:39:54 2016 +0530
Committer: Jacky Li <ja...@huawei.com>
Committed: Sun Jun 19 00:09:54 2016 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 6 +
.../DirectDictionaryKeyGeneratorFactory.java | 2 +-
.../TimeStampDirectDictionaryGenerator.java | 7 +
.../aggregator/ScannedResultAggregator.java | 43 ----
.../impl/ListBasedResultAggregator.java | 219 -----------------
.../collector/ScannedResultCollector.java | 43 ++++
.../impl/ListBasedResultCollector.java | 212 +++++++++++++++++
.../executor/impl/DetailQueryExecutor.java | 8 +-
.../impl/DetailRawRecordQueryExecutor.java | 10 +-
.../executor/infos/BlockExecutionInfo.java | 28 +--
.../internal/InternalQueryExecutor.java | 3 +-
.../impl/InternalDetailQueryExecutor.java | 130 -----------
.../executor/internal/impl/QueryRunner.java | 77 ------
.../merger/AbstractScannedResultMerger.java | 169 --------------
.../carbon/merger/ScannedResultMerger.java | 45 ----
.../impl/UnSortedScannedResultMerger.java | 34 ---
.../query/carbon/model/QueryModel.java | 14 ++
.../processor/AbstractDataBlockIterator.java | 126 ++++++++++
.../processor/AbstractDataBlockProcessor.java | 100 --------
.../query/carbon/processor/BlockProcessor.java | 36 ---
.../impl/AggregateQueryBlockProcessor.java | 62 -----
.../processor/impl/DataBlockIteratorImpl.java | 56 +++++
.../impl/DetailQueryBlockProcessor.java | 70 ------
.../query/carbon/result/BatchRawResult.java | 55 -----
.../AbstractDetailQueryResultIterator.java | 108 ++++-----
.../iterator/DetailQueryResultIterator.java | 71 +++---
.../iterator/DetailRawQueryResultIterator.java | 118 ----------
.../impl/RawQueryResultPreparatorImpl.java | 67 +++++-
.../query/carbon/util/DataTypeUtil.java | 4 +-
.../merger/exception/ResultMergerException.java | 91 --------
.../exception/DataProcessorException.java | 91 --------
.../queryinterface/filter/CarbonFilterInfo.java | 182 ---------------
.../query/queryinterface/query/CarbonQuery.java | 233 -------------------
.../query/impl/CarbonQueryImpl.java | 223 ------------------
.../query/metadata/AbstractCarbonLevel.java | 57 -----
.../queryinterface/query/metadata/Axis.java | 61 -----
.../query/metadata/CarbonCalculatedMeasure.java | 99 --------
.../query/metadata/CarbonDimensionLevel.java | 81 -------
.../metadata/CarbonDimensionLevelFilter.java | 217 -----------------
.../query/metadata/CarbonLevel.java | 83 -------
.../query/metadata/CarbonLevelHolder.java | 113 ---------
.../query/metadata/CarbonMeasure.java | 81 -------
.../query/metadata/CarbonMeasureFilter.java | 142 -----------
.../query/metadata/CarbonMember.java | 92 --------
.../query/metadata/CarbonTuple.java | 76 ------
.../query/metadata/DSLTransformation.java | 111 ---------
.../queryinterface/query/metadata/TopCount.java | 110 ---------
.../spark/merger/CarbonCompactionExecutor.java | 1 +
.../spark/partition/api/impl/PartitionImpl.java | 11 -
.../api/impl/SampleDataPartitionerImpl.java | 24 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 22 +-
.../org/apache/spark/sql/CarbonOperators.scala | 10 +-
.../TimeStampDirectDictionaryGenerator_UT.java | 6 +-
53 files changed, 627 insertions(+), 3513 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 34b609c..15a21e3 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -733,6 +733,12 @@ public final class CarbonCommonConstants {
*/
public static final String INMEMORY_REOCRD_SIZE = "carbon.inmemory.record.size";
public static final int INMEMORY_REOCRD_SIZE_DEFAULT = 240000;
+
+ /**
+ * INMEMORY_REOCRD_SIZE
+ */
+ public static final String DETAIL_QUERY_BATCH_SIZE = "carbon.detail.batch.size";
+ public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 10000;
/**
* SPILL_OVER_DISK_PATH
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
index 3859663..f3633bf 100644
--- a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
+++ b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
@@ -43,7 +43,7 @@ public final class DirectDictionaryKeyGeneratorFactory {
DirectDictionaryGenerator directDictionaryGenerator = null;
switch (dataType) {
case TIMESTAMP:
- directDictionaryGenerator = new TimeStampDirectDictionaryGenerator();
+ directDictionaryGenerator = TimeStampDirectDictionaryGenerator.instance;
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index 3954e14..d46725f 100644
--- a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -40,6 +40,13 @@ import static org.carbondata.core.keygenerator.directdictionary.timestamp.TimeSt
*/
public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGenerator {
+ private TimeStampDirectDictionaryGenerator() {
+
+ }
+
+ public static TimeStampDirectDictionaryGenerator instance =
+ new TimeStampDirectDictionaryGenerator();
+
/**
* Logger instance
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java
deleted file mode 100644
index f5e4b5f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.aggregator;
-
-import org.carbondata.query.carbon.result.AbstractScannedResult;
-import org.carbondata.query.carbon.result.Result;
-
-/**
- * Interface which will be used to aggregate the scan result
- */
-public interface ScannedResultAggregator {
-
- /**
- * Below method will be used to aggregate the scanned result
- *
- * @param scannedResult scanned result
- * @return how many records was aggregated
- */
- int aggregateData(AbstractScannedResult scannedResult);
-
- /**
- * Below method will be used to get the aggregated result
- *
- * @return
- */
- Result getAggregatedResult();
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java
deleted file mode 100644
index 10bf88c..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.aggregator.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.core.carbon.metadata.datatype.DataType;
-import org.carbondata.core.keygenerator.KeyGenException;
-import org.carbondata.query.carbon.aggregator.ScannedResultAggregator;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.infos.KeyStructureInfo;
-import org.carbondata.query.carbon.executor.util.QueryUtil;
-import org.carbondata.query.carbon.result.AbstractScannedResult;
-import org.carbondata.query.carbon.result.ListBasedResultWrapper;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.result.impl.ListBasedResult;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-
-/**
- * It is not a aggregator it is just a scanned result holder.
- *
- * @TODO change it to some other name
- */
-public class ListBasedResultAggregator implements ScannedResultAggregator {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(ListBasedResultAggregator.class.getName());
-
- /**
- * to keep a track of number of row processed to handle limit push down in
- * case of detail query scenario
- */
- private int rowCounter;
-
- /**
- * number of records asked in limit query if -1 then its either is
- * detail+order by query or detail query
- */
- private int limit;
-
- /**
- * dimension values list
- */
- private List<ListBasedResultWrapper> listBasedResult;
-
- /**
- * restructuring info
- */
- private KeyStructureInfo restructureInfos;
-
- /**
- * table block execution infos
- */
- private BlockExecutionInfo tableBlockExecutionInfos;
-
- private int[] measuresOrdinal;
-
- /**
- * to check whether measure exists in current table block or not this to
- * handle restructuring scenario
- */
- private boolean[] isMeasureExistsInCurrentBlock;
-
- /**
- * default value of the measures in case of restructuring some measure wont
- * be present in the table so in that default value will be used to
- * aggregate the data for that measure columns
- */
- private Object[] measureDefaultValue;
-
- /**
- * measure datatypes.
- */
- private DataType[] measureDatatypes;
-
- public ListBasedResultAggregator(BlockExecutionInfo blockExecutionInfos) {
- limit = blockExecutionInfos.getLimit();
- this.tableBlockExecutionInfos = blockExecutionInfos;
- restructureInfos = blockExecutionInfos.getKeyStructureInfo();
- measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
- isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
- measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
- this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
- }
-
- @Override
- /**
- * This method will add a record both key and value to list object
- * it will keep track of how many record is processed, to handle limit scenario
- * @param scanned result
- *
- */
- public int aggregateData(AbstractScannedResult scannedResult) {
- this.listBasedResult =
- new ArrayList<>(limit == -1 ? scannedResult.numberOfOutputRows() : limit);
- boolean isMsrsPresent = measureDatatypes.length > 0;
- ByteArrayWrapper wrapper = null;
- // scan the record and add to list
- ListBasedResultWrapper resultWrapper;
- while (scannedResult.hasNext() && (limit == -1 || rowCounter < limit)) {
- resultWrapper = new ListBasedResultWrapper();
- if(tableBlockExecutionInfos.isDimensionsExistInQuery()) {
- wrapper = new ByteArrayWrapper();
- wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
- wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
- wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
- resultWrapper.setKey(wrapper);
- } else {
- scannedResult.incrementCounter();
- }
- if(isMsrsPresent) {
- Object[] msrValues = new Object[measureDatatypes.length];
- fillMeasureData(msrValues, scannedResult);
- resultWrapper.setValue(msrValues);
- }
- listBasedResult.add(resultWrapper);
- rowCounter++;
- }
- return rowCounter;
- }
-
- private void fillMeasureData(Object[] msrValues, AbstractScannedResult scannedResult) {
- for (short i = 0; i < measuresOrdinal.length; i++) {
- // if measure exists is block then pass measure column
- // data chunk to the aggregator
- if (isMeasureExistsInCurrentBlock[i]) {
- msrValues[i] =
- getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
- scannedResult.getCurrenrRowId(),measureDatatypes[i]);
- } else {
- // if not then get the default value and use that value in aggregation
- msrValues[i] = measureDefaultValue[i];
- }
- }
- }
-
- private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) {
- if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
- Object msrVal;
- switch (dataType) {
- case LONG:
- msrVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
- break;
- case DECIMAL:
- msrVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
- break;
- default:
- msrVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
- }
- return DataTypeUtil.getMeasureDataBasedOnDataType(msrVal, dataType);
- }
- return null;
- }
-
- /**
- * Below method will used to get the result
- */
- @Override public Result getAggregatedResult() {
- Result<List<ListBasedResultWrapper>, Object> result = new ListBasedResult();
- if (!tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
- updateKeyWithLatestBlockKeyGenerator();
- }
- result.addScannedResult(listBasedResult);
- return result;
- }
-
-
-
- /**
- * Below method will be used to update the fixed length key with the
- * latest block key generator
- *
- * @return updated block
- */
- private void updateKeyWithLatestBlockKeyGenerator() {
- try {
- long[] data = null;
- ByteArrayWrapper key = null;
- for (int i = 0; i < listBasedResult.size(); i++) {
- // get the key
- key = listBasedResult.get(i).getKey();
- // unpack the key with table block key generator
- data = tableBlockExecutionInfos.getBlockKeyGenerator()
- .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock());
- // packed the key with latest block key generator
- // and generate the masked key for that key
- key.setDictionaryKey(QueryUtil
- .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
- restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(),
- restructureInfos.getMaskByteRanges().length));
- listBasedResult.get(i).setKey(key);
- }
- } catch (KeyGenException e) {
- LOGGER.error(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java b/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java
new file mode 100644
index 0000000..9e5d401
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.collector;
+
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+import org.carbondata.query.carbon.result.Result;
+
+/**
+ * Interface which will be used to aggregate the scan result
+ */
+public interface ScannedResultCollector {
+
+ /**
+ * Below method will be used to aggregate the scanned result
+ *
+ * @param scannedResult scanned result
+ * @return how many records was aggregated
+ */
+ int collectData(AbstractScannedResult scannedResult, int batchSize);
+
+ /**
+ * Below method will be used to get the aggregated result
+ *
+ * @return
+ */
+ Result getCollectedResult();
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java b/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java
new file mode 100644
index 0000000..30d33b8
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.collector.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.carbon.metadata.datatype.DataType;
+import org.carbondata.core.keygenerator.KeyGenException;
+import org.carbondata.query.carbon.collector.ScannedResultCollector;
+import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
+import org.carbondata.query.carbon.executor.infos.KeyStructureInfo;
+import org.carbondata.query.carbon.executor.util.QueryUtil;
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.result.impl.ListBasedResult;
+import org.carbondata.query.carbon.util.DataTypeUtil;
+import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ *
+ */
+public class ListBasedResultCollector implements ScannedResultCollector {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(ListBasedResultCollector.class.getName());
+
+ /**
+ * to keep a track of number of row processed to handle limit push down in
+ * case of detail query scenario
+ */
+ private int rowCounter;
+
+ /**
+ * dimension values list
+ */
+ private List<ListBasedResultWrapper> listBasedResult;
+
+ /**
+ * restructuring info
+ */
+ private KeyStructureInfo restructureInfos;
+
+ /**
+ * table block execution infos
+ */
+ private BlockExecutionInfo tableBlockExecutionInfos;
+
+ private int[] measuresOrdinal;
+
+ /**
+ * to check whether measure exists in current table block or not this to
+ * handle restructuring scenario
+ */
+ private boolean[] isMeasureExistsInCurrentBlock;
+
+ /**
+ * default value of the measures in case of restructuring some measure wont
+ * be present in the table so in that default value will be used to
+ * aggregate the data for that measure columns
+ */
+ private Object[] measureDefaultValue;
+
+ /**
+ * measure datatypes.
+ */
+ private DataType[] measureDatatypes;
+
+ public ListBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+ this.tableBlockExecutionInfos = blockExecutionInfos;
+ restructureInfos = blockExecutionInfos.getKeyStructureInfo();
+ measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
+ isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
+ measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
+ this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
+ }
+
+ @Override
+ /**
+ * This method will add a record both key and value to list object
+ * it will keep track of how many record is processed, to handle limit scenario
+ * @param scanned result
+ *
+ */
+ public int collectData(AbstractScannedResult scannedResult, int batchSize) {
+ this.listBasedResult =
+ new ArrayList<>(batchSize);
+ boolean isMsrsPresent = measureDatatypes.length > 0;
+ ByteArrayWrapper wrapper = null;
+ // scan the record and add to list
+ ListBasedResultWrapper resultWrapper;
+ int rowCounter = 0;
+ while (scannedResult.hasNext() && rowCounter < batchSize) {
+ resultWrapper = new ListBasedResultWrapper();
+ if(tableBlockExecutionInfos.isDimensionsExistInQuery()) {
+ wrapper = new ByteArrayWrapper();
+ wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
+ wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
+ wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
+ resultWrapper.setKey(wrapper);
+ } else {
+ scannedResult.incrementCounter();
+ }
+ if(isMsrsPresent) {
+ Object[] msrValues = new Object[measureDatatypes.length];
+ fillMeasureData(msrValues, scannedResult);
+ resultWrapper.setValue(msrValues);
+ }
+ listBasedResult.add(resultWrapper);
+ rowCounter++;
+ }
+ return rowCounter;
+ }
+
+ private void fillMeasureData(Object[] msrValues, AbstractScannedResult scannedResult) {
+ for (short i = 0; i < measuresOrdinal.length; i++) {
+ // if measure exists is block then pass measure column
+ // data chunk to the collector
+ if (isMeasureExistsInCurrentBlock[i]) {
+ msrValues[i] =
+ getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
+ scannedResult.getCurrenrRowId(),measureDatatypes[i]);
+ } else {
+ // if not then get the default value and use that value in aggregation
+ msrValues[i] = measureDefaultValue[i];
+ }
+ }
+ }
+
+ private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) {
+ if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+ Object msrVal;
+ switch (dataType) {
+ case LONG:
+ msrVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ break;
+ case DECIMAL:
+ msrVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+ break;
+ default:
+ msrVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+ }
+ return DataTypeUtil.getMeasureDataBasedOnDataType(msrVal, dataType);
+ }
+ return null;
+ }
+
+ /**
+ * Below method will used to get the result
+ */
+ @Override public Result getCollectedResult() {
+ Result<List<ListBasedResultWrapper>, Object> result = new ListBasedResult();
+ if (!tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
+ updateKeyWithLatestBlockKeyGenerator();
+ }
+ result.addScannedResult(listBasedResult);
+ return result;
+ }
+
+
+
+ /**
+ * Below method will be used to update the fixed length key with the
+ * latest block key generator
+ *
+ * @return updated block
+ */
+ private void updateKeyWithLatestBlockKeyGenerator() {
+ try {
+ long[] data = null;
+ ByteArrayWrapper key = null;
+ for (int i = 0; i < listBasedResult.size(); i++) {
+ // get the key
+ key = listBasedResult.get(i).getKey();
+ // unpack the key with table block key generator
+ data = tableBlockExecutionInfos.getBlockKeyGenerator()
+ .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock());
+ // packed the key with latest block key generator
+ // and generate the masked key for that key
+ key.setDictionaryKey(QueryUtil
+ .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
+ restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(),
+ restructureInfos.getMaskByteRanges().length));
+ listBasedResult.get(i).setKey(key);
+ }
+ } catch (KeyGenException e) {
+ LOGGER.error(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
index b2f323c..0255cbb 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
@@ -23,11 +23,10 @@ import java.util.List;
import org.carbondata.core.iterator.CarbonIterator;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
-import org.carbondata.query.carbon.executor.internal.impl.InternalDetailQueryExecutor;
import org.carbondata.query.carbon.model.QueryModel;
import org.carbondata.query.carbon.result.iterator.ChunkRowIterator;
import org.carbondata.query.carbon.result.iterator.DetailQueryResultIterator;
+import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl;
/**
* Below class will be used to execute the detail query
@@ -39,10 +38,9 @@ public class DetailQueryExecutor extends AbstractQueryExecutor {
@Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
throws QueryExecutionException {
List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
- InternalQueryExecutor queryExecutor = new InternalDetailQueryExecutor();
return new ChunkRowIterator(
- new DetailQueryResultIterator(blockExecutionInfoList, queryProperties, queryModel,
- queryExecutor));
+ new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
+ new DetailQueryResultPreparatorImpl(queryProperties, queryModel)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
index d1967cd..e72c638 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
@@ -5,11 +5,10 @@ import java.util.List;
import org.carbondata.core.iterator.CarbonIterator;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
-import org.carbondata.query.carbon.executor.internal.impl.InternalDetailQueryExecutor;
import org.carbondata.query.carbon.model.QueryModel;
import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.iterator.DetailRawQueryResultIterator;
+import org.carbondata.query.carbon.result.iterator.DetailQueryResultIterator;
+import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparatorImpl;
/**
* Executor for raw records, it does not parse to actual data
@@ -19,8 +18,7 @@ public class DetailRawRecordQueryExecutor extends AbstractQueryExecutor<BatchRes
@Override public CarbonIterator<BatchResult> execute(QueryModel queryModel)
throws QueryExecutionException {
List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
- InternalQueryExecutor queryExecutor = new InternalDetailQueryExecutor();
- return new DetailRawQueryResultIterator(blockExecutionInfoList, queryProperties, queryModel,
- queryExecutor);
+ return new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
+ new RawQueryResultPreparatorImpl(queryProperties, queryModel));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
index 7bed33d..202a932 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
@@ -26,7 +26,6 @@ import org.carbondata.core.carbon.datastore.IndexKey;
import org.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.carbondata.core.datastorage.store.impl.FileFactory.FileType;
import org.carbondata.core.keygenerator.KeyGenerator;
-import org.carbondata.query.carbon.merger.ScannedResultMerger;
import org.carbondata.query.filter.executer.FilterExecuter;
/**
@@ -156,11 +155,6 @@ public class BlockExecutionInfo {
private int[] noDictionaryBlockIndexes;
/**
- * to process the scanned result
- */
- private ScannedResultMerger scannedResultProcessor;
-
- /**
* key generator used for generating the table block fixed length key
*/
private KeyGenerator blockKeyGenerator;
@@ -214,7 +208,7 @@ public class BlockExecutionInfo {
}
/**
- * @param tableBlock the tableBlock to set
+ * @param blockIndex the tableBlock to set
*/
public void setDataBlock(AbstractIndex blockIndex) {
this.blockIndex = blockIndex;
@@ -410,7 +404,7 @@ public class BlockExecutionInfo {
}
/**
- * @param restructureInfos the restructureInfos to set
+ * @param keyStructureInfo the restructureInfos to set
*/
public void setKeyStructureInfo(KeyStructureInfo keyStructureInfo) {
this.keyStructureInfo = keyStructureInfo;
@@ -424,7 +418,7 @@ public class BlockExecutionInfo {
}
/**
- * @param sortInfos the sortInfos to set
+ * @param sortInfo the sortInfos to set
*/
public void setSortInfo(SortInfo sortInfo) {
this.sortInfo = sortInfo;
@@ -473,20 +467,6 @@ public class BlockExecutionInfo {
}
/**
- * @return the scannedResultProcessor
- */
- public ScannedResultMerger getScannedResultProcessor() {
- return scannedResultProcessor;
- }
-
- /**
- * @param scannedResultProcessor the scannedResultProcessor to set
- */
- public void setScannedResultProcessor(ScannedResultMerger scannedResultProcessor) {
- this.scannedResultProcessor = scannedResultProcessor;
- }
-
- /**
* @return the filterEvaluatorTree
*/
public FilterExecuter getFilterExecuterTree() {
@@ -494,7 +474,7 @@ public class BlockExecutionInfo {
}
/**
- * @param filterEvaluatorTree the filterEvaluatorTree to set
+ * @param filterExecuterTree the filterEvaluatorTree to set
*/
public void setFilterExecuterTree(FilterExecuter filterExecuterTree) {
this.filterExecuterTree = filterExecuterTree;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
index 7b3691c..089ee82 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
@@ -20,6 +20,7 @@ package org.carbondata.query.carbon.executor.internal;
import java.util.List;
+import org.carbondata.core.datastorage.store.FileHolder;
import org.carbondata.core.iterator.CarbonIterator;
import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
@@ -42,5 +43,5 @@ public interface InternalQueryExecutor {
* @throws QueryExecutionException
*/
CarbonIterator<Result> executeQuery(List<BlockExecutionInfo> blockExecutionInfos,
- int[] blockIndexToBeExecuted) throws QueryExecutionException;
+ int[] blockIndexToBeExecuted, FileHolder fileReader) throws QueryExecutionException;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
deleted file mode 100644
index 866596f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.executor.internal.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
-import org.carbondata.query.carbon.merger.ScannedResultMerger;
-import org.carbondata.query.carbon.merger.impl.UnSortedScannedResultMerger;
-import org.carbondata.query.carbon.result.Result;
-
-/**
- * Below Class will be used to execute the detail query
- */
-public class InternalDetailQueryExecutor implements InternalQueryExecutor {
-
- /**
- * LOGGER.
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(InternalDetailQueryExecutor.class.getName());
-
- /**
- * number of cores can be used to execute the query
- */
- private int numberOfCores;
-
- public InternalDetailQueryExecutor() {
-
- // below code will be used to update the number of cores based on number
- // records we
- // can keep in memory while executing the query execution
- int recordSize = 0;
- String defaultInMemoryRecordsSize =
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.INMEMORY_REOCRD_SIZE);
- if (null != defaultInMemoryRecordsSize) {
- try {
- recordSize = Integer.parseInt(defaultInMemoryRecordsSize);
- } catch (NumberFormatException ne) {
- LOGGER.error("Invalid inmemory records size. Using default value");
- recordSize = CarbonCommonConstants.INMEMORY_REOCRD_SIZE_DEFAULT;
- }
- }
- numberOfCores = recordSize / Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
- CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
- if (numberOfCores == 0) {
- numberOfCores++;
- }
- }
-
- /**
- * Below method will be used to used to execute the detail query
- * and it will return iterator over result
- *
- * @param executionInfos block execution info which will have all the properties
- * required for query execution
- * @param sliceIndexes slice indexes to be executed
- * @return query result
- */
- @Override public CarbonIterator<Result> executeQuery(
- List<BlockExecutionInfo> executionInfos,
- int[] sliceIndexes) throws QueryExecutionException {
- long startTime = System.currentTimeMillis();
- QueryRunner task;
- ScannedResultMerger scannedResultProcessor =
- new UnSortedScannedResultMerger(executionInfos.get(executionInfos.size() - 1),
- sliceIndexes.length);
- ExecutorService execService = Executors.newFixedThreadPool(numberOfCores);
- List<Future> listFutureObjects = new ArrayList<Future>();
- try {
- for (int currentSliceIndex : sliceIndexes) {
- if (currentSliceIndex == -1) {
- continue;
- }
- executionInfos.get(currentSliceIndex).setScannedResultProcessor(scannedResultProcessor);
- task = new QueryRunner(executionInfos.get(currentSliceIndex));
- listFutureObjects.add(execService.submit(task));
- }
- execService.shutdown();
- execService.awaitTermination(2, TimeUnit.DAYS);
- LOGGER.info("Total time taken for scan " + (System.currentTimeMillis() - startTime));
- for (Future future : listFutureObjects) {
- try {
- future.get();
- } catch (ExecutionException e) {
- throw new QueryExecutionException(e.getMessage());
- }
- }
- return scannedResultProcessor.getQueryResultIterator();
- } catch (QueryExecutionException exception) {
- throw new QueryExecutionException(exception);
- } catch (InterruptedException e) {
- LOGGER.error(e, e.getMessage());
- throw new QueryExecutionException(e);
- } finally {
- execService = null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java
deleted file mode 100644
index f341fa9..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.executor.internal.impl;
-
-import java.util.concurrent.Callable;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.common.logging.impl.StandardLogService;
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.processor.BlockProcessor;
-import org.carbondata.query.carbon.processor.impl.DetailQueryBlockProcessor;
-
-/**
- * Class which will execute the query
- */
-public class QueryRunner implements Callable<Void> {
-
- /**
- * LOGGER.
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(QueryRunner.class.getName());
- /**
- * block processor
- */
- private BlockProcessor dataBlockProcessor;
- /**
- * file reader which will be used to execute the query
- */
- private FileHolder fileReader;
- /**
- * block execution info which is required to run the query
- */
- private BlockExecutionInfo blockExecutionInfo;
-
- public QueryRunner(BlockExecutionInfo executionInfo) {
- this.blockExecutionInfo = executionInfo;
- this.fileReader = FileFactory.getFileHolder(executionInfo.getFileType());
- // if detail query detail query processor will be used to process the
- // block
- dataBlockProcessor = new DetailQueryBlockProcessor(executionInfo, fileReader);
- }
-
- @Override public Void call() throws Exception {
- StandardLogService
- .setThreadName(blockExecutionInfo.getPartitionId(), blockExecutionInfo.getQueryId());
- try {
- this.dataBlockProcessor.processBlock();
- } catch (Exception e) {
- LOGGER.error(e);
- throw new Exception(e.getMessage());
- } finally {
- this.fileReader.finish();
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java b/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java
deleted file mode 100644
index 9358f42..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.merger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.result.impl.ListBasedResult;
-import org.carbondata.query.carbon.result.iterator.MemoryBasedResultIterator;
-
-/**
- * Class which processed the scanned result
- * Processing can be merging sorting
- */
-public abstract class AbstractScannedResultMerger implements ScannedResultMerger {
-
- /**
- * LOGGER
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(AbstractScannedResultMerger.class.getName());
- /**
- * merging will done using thread
- */
- protected ExecutorService execService;
-
- /**
- * Merged scanned result which will merge all the result from all the blocks
- * executor
- */
- protected Result mergedScannedResult;
-
- /**
- * scanned result list
- */
- protected List<Result> scannedResultList;
-
- /**
- * tableBlockExecutionInfo
- */
- protected BlockExecutionInfo blockExecutionInfo;
-
- /**
- * max number of scanned result can keep in memory
- */
- private int maxNumberOfScannedResultList;
-
- /**
- * lockObject
- */
- private Object lockObject;
-
- public AbstractScannedResultMerger(BlockExecutionInfo blockExecutionInfo,
- int maxNumberOfScannedresultList) {
-
- this.lockObject = new Object();
- this.maxNumberOfScannedResultList = maxNumberOfScannedresultList;
- execService = Executors.newFixedThreadPool(1);
- scannedResultList = new ArrayList<Result>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- this.blockExecutionInfo = blockExecutionInfo;
- initialiseResult();
- }
-
- /**
- * for initializing the map based or list based result.
- */
- protected void initialiseResult() {
- mergedScannedResult = new ListBasedResult();
- }
-
- /**
- * Below method will be used to add the scanned result
- * If number of scanned result in the list of more than
- * the maxNumberOfScannedResultList than results present in the
- * list will be merged to merged scanned result
- *
- * @param scannedResult
- */
- @Override public void addScannedResult(Result scannedResult) throws QueryExecutionException {
- synchronized (this.lockObject) {
- scannedResultList.add(scannedResult);
- if ((scannedResultList.size() > maxNumberOfScannedResultList)) {
- List<Result> localResult = scannedResultList;
- scannedResultList = new ArrayList<Result>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- execService.submit(new MergerThread(localResult));
- }
- }
- }
-
- /**
- * Below method will be used to merge the scanned result
- *
- * @param scannedResultList scanned result list
- */
- protected void mergeScannedResults(List<Result> scannedResultList) {
- long start = System.currentTimeMillis();
- LOGGER.debug("Started a slice result merging");
-
- for (int i = 0; i < scannedResultList.size(); i++) {
- mergedScannedResult.merge(scannedResultList.get(i));
- }
- LOGGER.debug("Finished current slice result merging in time (MS) " + (System.currentTimeMillis()
- - start));
- }
-
- /**
- * Below method will be used to get the final query
- * return
- *
- * @return iterator over result
- */
- @Override public CarbonIterator<Result> getQueryResultIterator() throws QueryExecutionException {
- execService.shutdown();
- try {
- execService.awaitTermination(1, TimeUnit.DAYS);
- } catch (InterruptedException e1) {
- LOGGER.error("Problem in thread termination" + e1.getMessage());
- }
- if (scannedResultList.size() > 0) {
- mergeScannedResults(scannedResultList);
- scannedResultList = null;
- }
- LOGGER.debug("Finished result merging from all slices");
- return new MemoryBasedResultIterator(mergedScannedResult);
- }
-
- /**
- * Thread class to merge the scanned result
- */
- private final class MergerThread implements Callable<Void> {
- private List<Result> scannedResult;
-
- private MergerThread(List<Result> scannedResult) {
- this.scannedResult = scannedResult;
- }
-
- @Override public Void call() throws Exception {
- mergeScannedResults(scannedResult);
- return null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java b/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java
deleted file mode 100644
index 19ed6cd..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.merger;
-
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.result.Result;
-
-/**
- * Interface for merging the scanned result
- */
-public interface ScannedResultMerger {
-
- /**
- * Below method will be used to add the scanned result
- *
- * @param scannedResult scanned result
- * @throws QueryExecutionException throw exception in case of failure
- */
- void addScannedResult(Result scannedResult) throws QueryExecutionException;
-
- /**
- * Below method will be used to get the query result
- *
- * @return query result
- * @throws QueryExecutionException throw exception in case of any failure
- */
- CarbonIterator<Result> getQueryResultIterator() throws QueryExecutionException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java b/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java
deleted file mode 100644
index 8a6c302..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.merger.impl;
-
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.merger.AbstractScannedResultMerger;
-
-/**
- * Below class will be used merge the unsorted result
- */
-public class UnSortedScannedResultMerger extends AbstractScannedResultMerger {
-
- public UnSortedScannedResultMerger(BlockExecutionInfo blockExecutionInfo,
- int maxNumberOfScannedresultList) {
- super(blockExecutionInfo, maxNumberOfScannedresultList);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java b/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
index 64b519a..92334db 100644
--- a/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
+++ b/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
@@ -140,6 +140,12 @@ public class QueryModel implements Serializable {
*/
private CarbonTable table;
+ /**
+ * This is used only whne [forcedDetailRawQuery = true]. By default forcedDetailRawQuery returns
+ * dictionary values. But user wants in detail raw bytes the user set this field to true.
+ */
+ private boolean rawBytesDetailQuery;
+
public QueryModel() {
tableBlockInfos = new ArrayList<TableBlockInfo>();
queryDimension = new ArrayList<QueryDimension>();
@@ -499,4 +505,12 @@ public class QueryModel implements Serializable {
public void setColumnToDictionaryMapping(Map<String, Dictionary> columnToDictionaryMapping) {
this.columnToDictionaryMapping = columnToDictionaryMapping;
}
+
+ public boolean isRawBytesDetailQuery() {
+ return rawBytesDetailQuery;
+ }
+
+ public void setRawBytesDetailQuery(boolean rawBytesDetailQuery) {
+ this.rawBytesDetailQuery = rawBytesDetailQuery;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java
new file mode 100644
index 0000000..52b1bdf
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.processor;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.DataRefNode;
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.query.carbon.collector.ScannedResultCollector;
+import org.carbondata.query.carbon.collector.impl.ListBasedResultCollector;
+import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
+import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.scanner.BlockletScanner;
+import org.carbondata.query.carbon.scanner.impl.FilterScanner;
+import org.carbondata.query.carbon.scanner.impl.NonFilterScanner;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public abstract class AbstractDataBlockIterator extends CarbonIterator<Result> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(AbstractDataBlockIterator.class.getName());
+ /**
+ * iterator which will be used to iterate over data blocks
+ */
+ protected CarbonIterator<DataRefNode> dataBlockIterator;
+
+ /**
+ * execution details
+ */
+ protected BlockExecutionInfo blockExecutionInfo;
+
+ /**
+ * result collector which will be used to aggregate the scanned result
+ */
+ protected ScannedResultCollector scannerResultAggregator;
+
+ /**
+ * processor which will be used to process the block processing can be
+ * filter processing or non filter processing
+ */
+ protected BlockletScanner blockletScanner;
+
+ /**
+ * to hold the data block
+ */
+ protected BlocksChunkHolder blocksChunkHolder;
+
+ /**
+ * batch size of result
+ */
+ protected int batchSize;
+
+ protected AbstractScannedResult scannedResult;
+
+ public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo,
+ FileHolder fileReader, int batchSize) {
+ this.blockExecutionInfo = blockExecutionInfo;
+ dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+ blockExecutionInfo.getNumberOfBlockToScan());
+ blocksChunkHolder = new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
+ blockExecutionInfo.getTotalNumberOfMeasureBlock());
+ blocksChunkHolder.setFileReader(fileReader);
+
+ if (blockExecutionInfo.getFilterExecuterTree() != null) {
+ blockletScanner = new FilterScanner(blockExecutionInfo);
+ } else {
+ blockletScanner = new NonFilterScanner(blockExecutionInfo);
+ }
+
+ this.scannerResultAggregator =
+ new ListBasedResultCollector(blockExecutionInfo);
+ this.batchSize = batchSize;
+ }
+
+ public boolean hasNext() {
+ try {
+ if (scannedResult != null && scannedResult.hasNext()) {
+ return true;
+ } else {
+ scannedResult = getNextScannedResult();
+ while (scannedResult != null) {
+ if (scannedResult.hasNext()) {
+ return true;
+ }
+ scannedResult = getNextScannedResult();
+ }
+ return false;
+ }
+ } catch (QueryExecutionException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private AbstractScannedResult getNextScannedResult() throws QueryExecutionException {
+ if (dataBlockIterator.hasNext()) {
+ blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+ blocksChunkHolder.reset();
+ return blockletScanner.scanBlocklet(blocksChunkHolder);
+ }
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java
deleted file mode 100644
index fdcf6f1..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.processor;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.datastore.DataRefNode;
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.aggregator.ScannedResultAggregator;
-import org.carbondata.query.carbon.aggregator.impl.ListBasedResultAggregator;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.scanner.BlockletScanner;
-import org.carbondata.query.carbon.scanner.impl.FilterScanner;
-import org.carbondata.query.carbon.scanner.impl.NonFilterScanner;
-
-/**
- * This class provides a skeletal implementation of the
- * {@link BlockProcessor} interface to minimize the effort required to
- * implement this interface.
- */
-public abstract class AbstractDataBlockProcessor implements BlockProcessor {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(AbstractDataBlockProcessor.class.getName());
- /**
- * iterator which will be used to iterate over data blocks
- */
- protected CarbonIterator<DataRefNode> dataBlockIterator;
-
- /**
- * execution details
- */
- protected BlockExecutionInfo blockExecutionInfo;
-
- /**
- * result aggregator which will be used to aggregate the scanned result
- */
- protected ScannedResultAggregator scannerResultAggregator;
-
- /**
- * processor which will be used to process the block processing can be
- * filter processing or non filter processing
- */
- protected BlockletScanner blockletScanner;
-
- /**
- * to hold the data block
- */
- protected BlocksChunkHolder blocksChunkHolder;
-
- public AbstractDataBlockProcessor(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader) {
- this.blockExecutionInfo = blockExecutionInfo;
- dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
- blockExecutionInfo.getNumberOfBlockToScan());
- blocksChunkHolder = new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
- blockExecutionInfo.getTotalNumberOfMeasureBlock());
- blocksChunkHolder.setFileReader(fileReader);
-
- if (blockExecutionInfo.getFilterExecuterTree() != null) {
- blockletScanner = new FilterScanner(blockExecutionInfo);
- } else {
- blockletScanner = new NonFilterScanner(blockExecutionInfo);
- }
-
- this.scannerResultAggregator =
- new ListBasedResultAggregator(blockExecutionInfo);
- }
-
- /**
- * Below method will be used to add the scanned result to scanned result
- * processor
- */
- protected void finishScanning() {
- try {
- this.blockExecutionInfo.getScannedResultProcessor()
- .addScannedResult(scannerResultAggregator.getAggregatedResult());
- } catch (QueryExecutionException e) {
- LOGGER.error(e,
- "Problem while adding the result to Scanned Result Processor");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java
deleted file mode 100644
index 84e2d5f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.carbon.processor;
-
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-
-/**
- * Scanner interface which will be used
- * to scan the blocks.
- */
-public interface BlockProcessor {
-
- /**
- * Below method can be used to scan the block based on the query execution infos
- *
- * @throws QueryExecutionException
- */
- void processBlock() throws QueryExecutionException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java
deleted file mode 100644
index 260b894..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.processor.impl;
-
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.processor.AbstractDataBlockProcessor;
-
-/**
- * Below class will be used to process the blocks for
- * aggregated query
- */
-public class AggregateQueryBlockProcessor extends AbstractDataBlockProcessor {
-
- /**
- * AggregateQueryScanner constructor
- *
- * @param blockExecutionInfos
- */
- public AggregateQueryBlockProcessor(BlockExecutionInfo tableBlockExecutionInfos,
- FileHolder fileReader) {
- super(tableBlockExecutionInfos, fileReader);
- }
-
- /**
- * Below method will be used to scan the block
- * then it will call processor to process the data
- * and the it will call aggregator to aggregate the data
- * it will call finish once all the blocks of a table is scanned
- *
- * @throws QueryExecutionException
- */
- @Override public void processBlock() throws QueryExecutionException {
- while (dataBlockIterator.hasNext()) {
- try {
- blocksChunkHolder.setDataBlock(dataBlockIterator.next());
- blocksChunkHolder.reset();
- this.scannerResultAggregator.aggregateData(blockletScanner.scanBlocklet(blocksChunkHolder));
- } catch (Exception e) {
- throw new QueryExecutionException(e);
- }
- }
- finishScanning();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java
new file mode 100644
index 0000000..ade1965
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.processor.impl;
+
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
+import org.carbondata.query.carbon.processor.AbstractDataBlockIterator;
+import org.carbondata.query.carbon.result.Result;
+
+/**
+ * Below class will be used to process the block for detail query
+ */
+public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
+
+ /**
+ * DataBlockIteratorImpl Constructor
+ *
+ * @param blockExecutionInfo execution information
+ */
+ public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo,
+ FileHolder fileReader, int batchSize) {
+ super(blockExecutionInfo, fileReader, batchSize);
+ }
+
+ /**
+ * It scans the block and returns the result with @batchSize
+ *
+ * @return Result of @batchSize
+ */
+ public Result next() {
+ this.scannerResultAggregator.collectData(scannedResult, batchSize);
+ Result result = this.scannerResultAggregator.getCollectedResult();
+ while (result.size() < batchSize && hasNext()) {
+ this.scannerResultAggregator.collectData(scannedResult, batchSize-result.size());
+ result.merge(this.scannerResultAggregator.getCollectedResult());
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java
deleted file mode 100644
index b079a84..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.processor.impl;
-
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.processor.AbstractDataBlockProcessor;
-
-/**
- * Below class will be used to process the block for detail query
- */
-public class DetailQueryBlockProcessor extends AbstractDataBlockProcessor {
-
- /**
- * counter for number of records processed
- */
- private int counter;
-
- /**
- * DetailQueryScanner Constructor
- *
- * @param blockExecutionInfo execution information
- */
- public DetailQueryBlockProcessor(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader) {
- super(blockExecutionInfo, fileReader);
- }
-
- /**
- * Below method will be used scan the blocks and then process the scanned blocks
- * as its a detail query so its will use dummy aggregator
- * to aggregate the data.
- * This scanner will handle the limit scenario if detail query is without order by.
- * In case of detail query once one block is process it will pass to scanned result processor
- * as in this case number of records will be more and it will take more memory
- *
- * @throws QueryExecutionException
- */
- @Override public void processBlock() throws QueryExecutionException {
-
- while (dataBlockIterator.hasNext()) {
- blocksChunkHolder.setDataBlock(dataBlockIterator.next());
- blocksChunkHolder.reset();
- counter += this.scannerResultAggregator
- .aggregateData(blockletScanner.scanBlocklet(blocksChunkHolder));
- // finishScanning();
- if (blockExecutionInfo.getLimit() != -1 && counter >= blockExecutionInfo.getLimit()) {
- break;
- }
- }
- finishScanning();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java b/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
index 4aa2adf..b00c021 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
@@ -19,21 +19,11 @@
package org.carbondata.query.carbon.result;
-import org.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.carbondata.query.carbon.model.QueryDimension;
-import org.carbondata.query.carbon.model.QuerySchemaInfo;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-
/**
* Below class holds the query result of batches.
*/
public class BatchRawResult extends BatchResult {
- private QuerySchemaInfo querySchemaInfo;
-
/**
* This method will return one row at a time based on the counter given.
* @param counter
@@ -42,51 +32,6 @@ public class BatchRawResult extends BatchResult {
public Object[] getRawRow(int counter) {
return rows[counter];
}
- /**
- * Returns the next element in the iteration.
- *
- * @return the next element in the iteration
- */
- @Override public Object[] next() {
- return parseData();
- }
-
- private Object[] parseData() {
- int[] order = querySchemaInfo.getQueryReverseOrder();
- Object[] row = rows[counter];
- ByteArrayWrapper key = (ByteArrayWrapper) row[0];
- QueryDimension[] queryDimensions = querySchemaInfo.getQueryDimensions();
- Object[] parsedData = new Object[queryDimensions.length + row.length - 1];
- if(key != null) {
- long[] surrogateResult = querySchemaInfo.getKeyGenerator()
- .getKeyArray(key.getDictionaryKey(), querySchemaInfo.getMaskedByteIndexes());
- int noDictionaryColumnIndex = 0;
- for (int i = 0; i < queryDimensions.length; i++) {
- if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
- parsedData[order[i]] = DataTypeUtil.getDataBasedOnDataType(
- new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++)),
- queryDimensions[i].getDimension().getDataType());
- } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
- parsedData[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
- (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()]);
- } else {
- parsedData[order[i]] =
- (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()];
- }
- }
- }
- for (int i = 0; i < row.length - 1; i++) {
- parsedData[order[i + queryDimensions.length]] = row[i + 1];
- }
- counter++;
- return parsedData;
- }
-
- public void setQuerySchemaInfo(QuerySchemaInfo querySchemaInfo) {
- this.querySchemaInfo = querySchemaInfo;
- }
/**
* For getting the total size.