You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:12 UTC

[24/56] [abbrv] incubator-carbondata git commit: Optimized detail query flow and cleanup (#691)

Optimized detail query flow and cleanup (#691)

* Optimizing detail query

* Optimizing detail query flow

* Optimizing detail query flow

* Optimized raw detail query to improve push up performance.

* Fixed bugs

* reverted wrong check in

* Rebased the code

* Removed aggregation from core

* Refactored core package and fixed test cases

* Fixed bugs

* Fixed review comments and deleted aggregate classes after merge from master

* Removed unused code

* Optimized scanner flow

* Optimized scanner flow

* Optimized scanning flow

* Optimized scanner flow

* Refactored code

* Refactored code

* Removed unused code

* Reverted unnecessary comment

* Reverted queryinterface package from core

* Removed queryinterface package from core

* Handled review comments

* Handled review comments

* Added assert


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/29360501
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/29360501
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/29360501

Branch: refs/heads/master
Commit: 29360501336ddc54a349257512fffb5bd8d87126
Parents: 656577d
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Sat Jun 18 21:39:54 2016 +0530
Committer: Jacky Li <ja...@huawei.com>
Committed: Sun Jun 19 00:09:54 2016 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../DirectDictionaryKeyGeneratorFactory.java    |   2 +-
 .../TimeStampDirectDictionaryGenerator.java     |   7 +
 .../aggregator/ScannedResultAggregator.java     |  43 ----
 .../impl/ListBasedResultAggregator.java         | 219 -----------------
 .../collector/ScannedResultCollector.java       |  43 ++++
 .../impl/ListBasedResultCollector.java          | 212 +++++++++++++++++
 .../executor/impl/DetailQueryExecutor.java      |   8 +-
 .../impl/DetailRawRecordQueryExecutor.java      |  10 +-
 .../executor/infos/BlockExecutionInfo.java      |  28 +--
 .../internal/InternalQueryExecutor.java         |   3 +-
 .../impl/InternalDetailQueryExecutor.java       | 130 -----------
 .../executor/internal/impl/QueryRunner.java     |  77 ------
 .../merger/AbstractScannedResultMerger.java     | 169 --------------
 .../carbon/merger/ScannedResultMerger.java      |  45 ----
 .../impl/UnSortedScannedResultMerger.java       |  34 ---
 .../query/carbon/model/QueryModel.java          |  14 ++
 .../processor/AbstractDataBlockIterator.java    | 126 ++++++++++
 .../processor/AbstractDataBlockProcessor.java   | 100 --------
 .../query/carbon/processor/BlockProcessor.java  |  36 ---
 .../impl/AggregateQueryBlockProcessor.java      |  62 -----
 .../processor/impl/DataBlockIteratorImpl.java   |  56 +++++
 .../impl/DetailQueryBlockProcessor.java         |  70 ------
 .../query/carbon/result/BatchRawResult.java     |  55 -----
 .../AbstractDetailQueryResultIterator.java      | 108 ++++-----
 .../iterator/DetailQueryResultIterator.java     |  71 +++---
 .../iterator/DetailRawQueryResultIterator.java  | 118 ----------
 .../impl/RawQueryResultPreparatorImpl.java      |  67 +++++-
 .../query/carbon/util/DataTypeUtil.java         |   4 +-
 .../merger/exception/ResultMergerException.java |  91 --------
 .../exception/DataProcessorException.java       |  91 --------
 .../queryinterface/filter/CarbonFilterInfo.java | 182 ---------------
 .../query/queryinterface/query/CarbonQuery.java | 233 -------------------
 .../query/impl/CarbonQueryImpl.java             | 223 ------------------
 .../query/metadata/AbstractCarbonLevel.java     |  57 -----
 .../queryinterface/query/metadata/Axis.java     |  61 -----
 .../query/metadata/CarbonCalculatedMeasure.java |  99 --------
 .../query/metadata/CarbonDimensionLevel.java    |  81 -------
 .../metadata/CarbonDimensionLevelFilter.java    | 217 -----------------
 .../query/metadata/CarbonLevel.java             |  83 -------
 .../query/metadata/CarbonLevelHolder.java       | 113 ---------
 .../query/metadata/CarbonMeasure.java           |  81 -------
 .../query/metadata/CarbonMeasureFilter.java     | 142 -----------
 .../query/metadata/CarbonMember.java            |  92 --------
 .../query/metadata/CarbonTuple.java             |  76 ------
 .../query/metadata/DSLTransformation.java       | 111 ---------
 .../queryinterface/query/metadata/TopCount.java | 110 ---------
 .../spark/merger/CarbonCompactionExecutor.java  |   1 +
 .../spark/partition/api/impl/PartitionImpl.java |  11 -
 .../api/impl/SampleDataPartitionerImpl.java     |  24 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  22 +-
 .../org/apache/spark/sql/CarbonOperators.scala  |  10 +-
 .../TimeStampDirectDictionaryGenerator_UT.java  |   6 +-
 53 files changed, 627 insertions(+), 3513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 34b609c..15a21e3 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -733,6 +733,12 @@ public final class CarbonCommonConstants {
    */
   public static final String INMEMORY_REOCRD_SIZE = "carbon.inmemory.record.size";
   public static final int INMEMORY_REOCRD_SIZE_DEFAULT = 240000;
+
+  /**
+   * INMEMORY_REOCRD_SIZE
+   */
+  public static final String DETAIL_QUERY_BATCH_SIZE = "carbon.detail.batch.size";
+  public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 10000;
   /**
    * SPILL_OVER_DISK_PATH
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
index 3859663..f3633bf 100644
--- a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
+++ b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/DirectDictionaryKeyGeneratorFactory.java
@@ -43,7 +43,7 @@ public final class DirectDictionaryKeyGeneratorFactory {
     DirectDictionaryGenerator directDictionaryGenerator = null;
     switch (dataType) {
       case TIMESTAMP:
-        directDictionaryGenerator = new TimeStampDirectDictionaryGenerator();
+        directDictionaryGenerator = TimeStampDirectDictionaryGenerator.instance;
         break;
       default:
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index 3954e14..d46725f 100644
--- a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -40,6 +40,13 @@ import static org.carbondata.core.keygenerator.directdictionary.timestamp.TimeSt
  */
 public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGenerator {
 
+  private TimeStampDirectDictionaryGenerator() {
+
+  }
+
+  public static TimeStampDirectDictionaryGenerator instance =
+      new TimeStampDirectDictionaryGenerator();
+
   /**
    * Logger instance
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java
deleted file mode 100644
index f5e4b5f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/aggregator/ScannedResultAggregator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.aggregator;
-
-import org.carbondata.query.carbon.result.AbstractScannedResult;
-import org.carbondata.query.carbon.result.Result;
-
-/**
- * Interface which will be used to aggregate the scan result
- */
-public interface ScannedResultAggregator {
-
-  /**
-   * Below method will be used to aggregate the scanned result
-   *
-   * @param scannedResult scanned result
-   * @return how many records was aggregated
-   */
-  int aggregateData(AbstractScannedResult scannedResult);
-
-  /**
-   * Below method will be used to get the aggregated result
-   *
-   * @return
-   */
-  Result getAggregatedResult();
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java
deleted file mode 100644
index 10bf88c..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/aggregator/impl/ListBasedResultAggregator.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.aggregator.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.core.carbon.metadata.datatype.DataType;
-import org.carbondata.core.keygenerator.KeyGenException;
-import org.carbondata.query.carbon.aggregator.ScannedResultAggregator;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.infos.KeyStructureInfo;
-import org.carbondata.query.carbon.executor.util.QueryUtil;
-import org.carbondata.query.carbon.result.AbstractScannedResult;
-import org.carbondata.query.carbon.result.ListBasedResultWrapper;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.result.impl.ListBasedResult;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-
-/**
- * It is not a aggregator it is just a scanned result holder.
- *
- * @TODO change it to some other name
- */
-public class ListBasedResultAggregator implements ScannedResultAggregator {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ListBasedResultAggregator.class.getName());
-
-  /**
-   * to keep a track of number of row processed to handle limit push down in
-   * case of detail query scenario
-   */
-  private int rowCounter;
-
-  /**
-   * number of records asked in limit query if -1 then its either is
-   * detail+order by query or detail query
-   */
-  private int limit;
-
-  /**
-   * dimension values list
-   */
-  private List<ListBasedResultWrapper> listBasedResult;
-
-  /**
-   * restructuring info
-   */
-  private KeyStructureInfo restructureInfos;
-
-  /**
-   * table block execution infos
-   */
-  private BlockExecutionInfo tableBlockExecutionInfos;
-
-  private int[] measuresOrdinal;
-
-  /**
-   * to check whether measure exists in current table block or not this to
-   * handle restructuring scenario
-   */
-  private boolean[] isMeasureExistsInCurrentBlock;
-
-  /**
-   * default value of the measures in case of restructuring some measure wont
-   * be present in the table so in that default value will be used to
-   * aggregate the data for that measure columns
-   */
-  private Object[] measureDefaultValue;
-
-  /**
-   * measure datatypes.
-   */
-  private DataType[] measureDatatypes;
-
-  public ListBasedResultAggregator(BlockExecutionInfo blockExecutionInfos) {
-    limit = blockExecutionInfos.getLimit();
-    this.tableBlockExecutionInfos = blockExecutionInfos;
-    restructureInfos = blockExecutionInfos.getKeyStructureInfo();
-    measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
-    isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
-    measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
-    this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
-  }
-
-  @Override
-  /**
-   * This method will add a record both key and value to list object
-   * it will keep track of how many record is processed, to handle limit scenario
-   * @param scanned result
-   *
-   */
-  public int aggregateData(AbstractScannedResult scannedResult) {
-    this.listBasedResult =
-        new ArrayList<>(limit == -1 ? scannedResult.numberOfOutputRows() : limit);
-    boolean isMsrsPresent = measureDatatypes.length > 0;
-    ByteArrayWrapper wrapper = null;
-    // scan the record and add to list
-    ListBasedResultWrapper resultWrapper;
-    while (scannedResult.hasNext() && (limit == -1 || rowCounter < limit)) {
-      resultWrapper = new ListBasedResultWrapper();
-      if(tableBlockExecutionInfos.isDimensionsExistInQuery()) {
-        wrapper = new ByteArrayWrapper();
-        wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
-        wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
-        wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
-        resultWrapper.setKey(wrapper);
-      } else {
-        scannedResult.incrementCounter();
-      }
-      if(isMsrsPresent) {
-        Object[] msrValues = new Object[measureDatatypes.length];
-        fillMeasureData(msrValues, scannedResult);
-        resultWrapper.setValue(msrValues);
-      }
-      listBasedResult.add(resultWrapper);
-      rowCounter++;
-    }
-    return rowCounter;
-  }
-
-  private void fillMeasureData(Object[] msrValues, AbstractScannedResult scannedResult) {
-    for (short i = 0; i < measuresOrdinal.length; i++) {
-      // if measure exists is block then pass measure column
-      // data chunk to the aggregator
-      if (isMeasureExistsInCurrentBlock[i]) {
-        msrValues[i] =
-            getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
-                scannedResult.getCurrenrRowId(),measureDatatypes[i]);
-      } else {
-        // if not then get the default value and use that value in aggregation
-        msrValues[i] = measureDefaultValue[i];
-      }
-    }
-  }
-
-  private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      Object msrVal;
-      switch (dataType) {
-        case LONG:
-          msrVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
-          break;
-        case DECIMAL:
-          msrVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
-          break;
-        default:
-          msrVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
-      }
-      return DataTypeUtil.getMeasureDataBasedOnDataType(msrVal, dataType);
-    }
-    return null;
-  }
-
-  /**
-   * Below method will used to get the result
-   */
-  @Override public Result getAggregatedResult() {
-    Result<List<ListBasedResultWrapper>, Object> result = new ListBasedResult();
-    if (!tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
-      updateKeyWithLatestBlockKeyGenerator();
-    }
-    result.addScannedResult(listBasedResult);
-    return result;
-  }
-
-
-
-  /**
-   * Below method will be used to update the fixed length key with the
-   * latest block key generator
-   *
-   * @return updated block
-   */
-  private void updateKeyWithLatestBlockKeyGenerator() {
-    try {
-      long[] data = null;
-      ByteArrayWrapper key = null;
-      for (int i = 0; i < listBasedResult.size(); i++) {
-        // get the key
-        key = listBasedResult.get(i).getKey();
-        // unpack the key with table block key generator
-        data = tableBlockExecutionInfos.getBlockKeyGenerator()
-            .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock());
-        // packed the key with latest block key generator
-        // and generate the masked key for that key
-        key.setDictionaryKey(QueryUtil
-            .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
-                restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(),
-                restructureInfos.getMaskByteRanges().length));
-        listBasedResult.get(i).setKey(key);
-      }
-    } catch (KeyGenException e) {
-      LOGGER.error(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java b/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java
new file mode 100644
index 0000000..9e5d401
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/collector/ScannedResultCollector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.collector;
+
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+import org.carbondata.query.carbon.result.Result;
+
+/**
+ * Interface which will be used to aggregate the scan result
+ */
+public interface ScannedResultCollector {
+
+  /**
+   * Below method will be used to aggregate the scanned result
+   *
+   * @param scannedResult scanned result
+   * @return how many records was aggregated
+   */
+  int collectData(AbstractScannedResult scannedResult, int batchSize);
+
+  /**
+   * Below method will be used to get the aggregated result
+   *
+   * @return
+   */
+  Result getCollectedResult();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java b/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java
new file mode 100644
index 0000000..30d33b8
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/collector/impl/ListBasedResultCollector.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.collector.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.carbondata.core.carbon.metadata.datatype.DataType;
+import org.carbondata.core.keygenerator.KeyGenException;
+import org.carbondata.query.carbon.collector.ScannedResultCollector;
+import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
+import org.carbondata.query.carbon.executor.infos.KeyStructureInfo;
+import org.carbondata.query.carbon.executor.util.QueryUtil;
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.result.impl.ListBasedResult;
+import org.carbondata.query.carbon.util.DataTypeUtil;
+import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ *
+ */
+public class ListBasedResultCollector implements ScannedResultCollector {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ListBasedResultCollector.class.getName());
+
+  /**
+   * to keep a track of number of row processed to handle limit push down in
+   * case of detail query scenario
+   */
+  private int rowCounter;
+
+  /**
+   * dimension values list
+   */
+  private List<ListBasedResultWrapper> listBasedResult;
+
+  /**
+   * restructuring info
+   */
+  private KeyStructureInfo restructureInfos;
+
+  /**
+   * table block execution infos
+   */
+  private BlockExecutionInfo tableBlockExecutionInfos;
+
+  private int[] measuresOrdinal;
+
+  /**
+   * to check whether measure exists in current table block or not this to
+   * handle restructuring scenario
+   */
+  private boolean[] isMeasureExistsInCurrentBlock;
+
+  /**
+   * default value of the measures in case of restructuring some measure wont
+   * be present in the table so in that default value will be used to
+   * aggregate the data for that measure columns
+   */
+  private Object[] measureDefaultValue;
+
+  /**
+   * measure datatypes.
+   */
+  private DataType[] measureDatatypes;
+
+  public ListBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    this.tableBlockExecutionInfos = blockExecutionInfos;
+    restructureInfos = blockExecutionInfos.getKeyStructureInfo();
+    measuresOrdinal = tableBlockExecutionInfos.getAggregatorInfo().getMeasureOrdinals();
+    isMeasureExistsInCurrentBlock = tableBlockExecutionInfos.getAggregatorInfo().getMeasureExists();
+    measureDefaultValue = tableBlockExecutionInfos.getAggregatorInfo().getDefaultValues();
+    this.measureDatatypes = tableBlockExecutionInfos.getAggregatorInfo().getMeasureDataTypes();
+  }
+
+  @Override
+  /**
+   * This method will add a record both key and value to list object
+   * it will keep track of how many record is processed, to handle limit scenario
+   * @param scanned result
+   *
+   */
+  public int collectData(AbstractScannedResult scannedResult, int batchSize) {
+    this.listBasedResult =
+        new ArrayList<>(batchSize);
+    boolean isMsrsPresent = measureDatatypes.length > 0;
+    ByteArrayWrapper wrapper = null;
+    // scan the record and add to list
+    ListBasedResultWrapper resultWrapper;
+    int rowCounter = 0;
+    while (scannedResult.hasNext() && rowCounter < batchSize) {
+      resultWrapper = new ListBasedResultWrapper();
+      if(tableBlockExecutionInfos.isDimensionsExistInQuery()) {
+        wrapper = new ByteArrayWrapper();
+        wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
+        wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
+        wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
+        resultWrapper.setKey(wrapper);
+      } else {
+        scannedResult.incrementCounter();
+      }
+      if(isMsrsPresent) {
+        Object[] msrValues = new Object[measureDatatypes.length];
+        fillMeasureData(msrValues, scannedResult);
+        resultWrapper.setValue(msrValues);
+      }
+      listBasedResult.add(resultWrapper);
+      rowCounter++;
+    }
+    return rowCounter;
+  }
+
+  private void fillMeasureData(Object[] msrValues, AbstractScannedResult scannedResult) {
+    for (short i = 0; i < measuresOrdinal.length; i++) {
+      // if measure exists is block then pass measure column
+      // data chunk to the collector
+      if (isMeasureExistsInCurrentBlock[i]) {
+        msrValues[i] =
+            getMeasureData(scannedResult.getMeasureChunk(measuresOrdinal[i]),
+                scannedResult.getCurrenrRowId(),measureDatatypes[i]);
+      } else {
+        // if not then get the default value and use that value in aggregation
+        msrValues[i] = measureDefaultValue[i];
+      }
+    }
+  }
+
+  private Object getMeasureData(MeasureColumnDataChunk dataChunk, int index, DataType dataType) {
+    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
+      Object msrVal;
+      switch (dataType) {
+        case LONG:
+          msrVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+          break;
+        case DECIMAL:
+          msrVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+          break;
+        default:
+          msrVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+      }
+      return DataTypeUtil.getMeasureDataBasedOnDataType(msrVal, dataType);
+    }
+    return null;
+  }
+
+  /**
+   * Below method will used to get the result
+   */
+  @Override public Result getCollectedResult() {
+    Result<List<ListBasedResultWrapper>, Object> result = new ListBasedResult();
+    if (!tableBlockExecutionInfos.isFixedKeyUpdateRequired()) {
+      updateKeyWithLatestBlockKeyGenerator();
+    }
+    result.addScannedResult(listBasedResult);
+    return result;
+  }
+
+
+
+  /**
+   * Below method will be used to update the fixed length key with the
+   * latest block key generator
+   *
+   * @return updated block
+   */
+  private void updateKeyWithLatestBlockKeyGenerator() {
+    try {
+      long[] data = null;
+      ByteArrayWrapper key = null;
+      for (int i = 0; i < listBasedResult.size(); i++) {
+        // get the key
+        key = listBasedResult.get(i).getKey();
+        // unpack the key with table block key generator
+        data = tableBlockExecutionInfos.getBlockKeyGenerator()
+            .getKeyArray(key.getDictionaryKey(), tableBlockExecutionInfos.getMaskedByteForBlock());
+        // packed the key with latest block key generator
+        // and generate the masked key for that key
+        key.setDictionaryKey(QueryUtil
+            .getMaskedKey(restructureInfos.getKeyGenerator().generateKey(data),
+                restructureInfos.getMaxKey(), restructureInfos.getMaskByteRanges(),
+                restructureInfos.getMaskByteRanges().length));
+        listBasedResult.get(i).setKey(key);
+      }
+    } catch (KeyGenException e) {
+      LOGGER.error(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
index b2f323c..0255cbb 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailQueryExecutor.java
@@ -23,11 +23,10 @@ import java.util.List;
 import org.carbondata.core.iterator.CarbonIterator;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
-import org.carbondata.query.carbon.executor.internal.impl.InternalDetailQueryExecutor;
 import org.carbondata.query.carbon.model.QueryModel;
 import org.carbondata.query.carbon.result.iterator.ChunkRowIterator;
 import org.carbondata.query.carbon.result.iterator.DetailQueryResultIterator;
+import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl;
 
 /**
  * Below class will be used to execute the detail query
@@ -39,10 +38,9 @@ public class DetailQueryExecutor extends AbstractQueryExecutor {
   @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
       throws QueryExecutionException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
-    InternalQueryExecutor queryExecutor = new InternalDetailQueryExecutor();
     return new ChunkRowIterator(
-        new DetailQueryResultIterator(blockExecutionInfoList, queryProperties, queryModel,
-            queryExecutor));
+        new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
+            new DetailQueryResultPreparatorImpl(queryProperties, queryModel)));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
index d1967cd..e72c638 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/impl/DetailRawRecordQueryExecutor.java
@@ -5,11 +5,10 @@ import java.util.List;
 import org.carbondata.core.iterator.CarbonIterator;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
-import org.carbondata.query.carbon.executor.internal.impl.InternalDetailQueryExecutor;
 import org.carbondata.query.carbon.model.QueryModel;
 import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.iterator.DetailRawQueryResultIterator;
+import org.carbondata.query.carbon.result.iterator.DetailQueryResultIterator;
+import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparatorImpl;
 
 /**
  * Executor for raw records, it does not parse to actual data
@@ -19,8 +18,7 @@ public class DetailRawRecordQueryExecutor extends AbstractQueryExecutor<BatchRes
   @Override public CarbonIterator<BatchResult> execute(QueryModel queryModel)
       throws QueryExecutionException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
-    InternalQueryExecutor queryExecutor = new InternalDetailQueryExecutor();
-    return new DetailRawQueryResultIterator(blockExecutionInfoList, queryProperties, queryModel,
-        queryExecutor);
+    return new DetailQueryResultIterator(blockExecutionInfoList, queryModel,
+        new RawQueryResultPreparatorImpl(queryProperties, queryModel));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
index 7bed33d..202a932 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/infos/BlockExecutionInfo.java
@@ -26,7 +26,6 @@ import org.carbondata.core.carbon.datastore.IndexKey;
 import org.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.carbondata.core.keygenerator.KeyGenerator;
-import org.carbondata.query.carbon.merger.ScannedResultMerger;
 import org.carbondata.query.filter.executer.FilterExecuter;
 
 /**
@@ -156,11 +155,6 @@ public class BlockExecutionInfo {
   private int[] noDictionaryBlockIndexes;
 
   /**
-   * to process the scanned result
-   */
-  private ScannedResultMerger scannedResultProcessor;
-
-  /**
    * key generator used for generating the table block fixed length key
    */
   private KeyGenerator blockKeyGenerator;
@@ -214,7 +208,7 @@ public class BlockExecutionInfo {
   }
 
   /**
-   * @param tableBlock the tableBlock to set
+   * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
     this.blockIndex = blockIndex;
@@ -410,7 +404,7 @@ public class BlockExecutionInfo {
   }
 
   /**
-   * @param restructureInfos the restructureInfos to set
+   * @param keyStructureInfo the restructureInfos to set
    */
   public void setKeyStructureInfo(KeyStructureInfo keyStructureInfo) {
     this.keyStructureInfo = keyStructureInfo;
@@ -424,7 +418,7 @@ public class BlockExecutionInfo {
   }
 
   /**
-   * @param sortInfos the sortInfos to set
+   * @param sortInfo the sortInfos to set
    */
   public void setSortInfo(SortInfo sortInfo) {
     this.sortInfo = sortInfo;
@@ -473,20 +467,6 @@ public class BlockExecutionInfo {
   }
 
   /**
-   * @return the scannedResultProcessor
-   */
-  public ScannedResultMerger getScannedResultProcessor() {
-    return scannedResultProcessor;
-  }
-
-  /**
-   * @param scannedResultProcessor the scannedResultProcessor to set
-   */
-  public void setScannedResultProcessor(ScannedResultMerger scannedResultProcessor) {
-    this.scannedResultProcessor = scannedResultProcessor;
-  }
-
-  /**
    * @return the filterEvaluatorTree
    */
   public FilterExecuter getFilterExecuterTree() {
@@ -494,7 +474,7 @@ public class BlockExecutionInfo {
   }
 
   /**
-   * @param filterEvaluatorTree the filterEvaluatorTree to set
+   * @param filterExecuterTree the filterEvaluatorTree to set
    */
   public void setFilterExecuterTree(FilterExecuter filterExecuterTree) {
     this.filterExecuterTree = filterExecuterTree;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
index 7b3691c..089ee82 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/internal/InternalQueryExecutor.java
@@ -20,6 +20,7 @@ package org.carbondata.query.carbon.executor.internal;
 
 import java.util.List;
 
+import org.carbondata.core.datastorage.store.FileHolder;
 import org.carbondata.core.iterator.CarbonIterator;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
@@ -42,5 +43,5 @@ public interface InternalQueryExecutor {
    * @throws QueryExecutionException
    */
   CarbonIterator<Result> executeQuery(List<BlockExecutionInfo> blockExecutionInfos,
-      int[] blockIndexToBeExecuted) throws QueryExecutionException;
+      int[] blockIndexToBeExecuted, FileHolder fileReader) throws QueryExecutionException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
deleted file mode 100644
index 866596f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalDetailQueryExecutor.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.executor.internal.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
-import org.carbondata.query.carbon.merger.ScannedResultMerger;
-import org.carbondata.query.carbon.merger.impl.UnSortedScannedResultMerger;
-import org.carbondata.query.carbon.result.Result;
-
-/**
- * Below Class will be used to execute the detail query
- */
-public class InternalDetailQueryExecutor implements InternalQueryExecutor {
-
-  /**
-   * LOGGER.
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(InternalDetailQueryExecutor.class.getName());
-
-  /**
-   * number of cores can be used to execute the query
-   */
-  private int numberOfCores;
-
-  public InternalDetailQueryExecutor() {
-
-    // below code will be used to update the number of cores based on number
-    // records we
-    // can keep in memory while executing the query execution
-    int recordSize = 0;
-    String defaultInMemoryRecordsSize =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.INMEMORY_REOCRD_SIZE);
-    if (null != defaultInMemoryRecordsSize) {
-      try {
-        recordSize = Integer.parseInt(defaultInMemoryRecordsSize);
-      } catch (NumberFormatException ne) {
-        LOGGER.error("Invalid inmemory records size. Using default value");
-        recordSize = CarbonCommonConstants.INMEMORY_REOCRD_SIZE_DEFAULT;
-      }
-    }
-    numberOfCores = recordSize / Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
-            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
-    if (numberOfCores == 0) {
-      numberOfCores++;
-    }
-  }
-
-  /**
-   * Below method will be used to used to execute the detail query
-   * and it will return iterator over result
-   *
-   * @param executionInfos block execution info which will have all the properties
-   *                       required for query execution
-   * @param sliceIndexes   slice indexes to be executed
-   * @return query result
-   */
-  @Override public CarbonIterator<Result> executeQuery(
-      List<BlockExecutionInfo> executionInfos,
-      int[] sliceIndexes) throws QueryExecutionException {
-    long startTime = System.currentTimeMillis();
-    QueryRunner task;
-    ScannedResultMerger scannedResultProcessor =
-        new UnSortedScannedResultMerger(executionInfos.get(executionInfos.size() - 1),
-            sliceIndexes.length);
-    ExecutorService execService = Executors.newFixedThreadPool(numberOfCores);
-    List<Future> listFutureObjects = new ArrayList<Future>();
-    try {
-      for (int currentSliceIndex : sliceIndexes) {
-        if (currentSliceIndex == -1) {
-          continue;
-        }
-        executionInfos.get(currentSliceIndex).setScannedResultProcessor(scannedResultProcessor);
-        task = new QueryRunner(executionInfos.get(currentSliceIndex));
-        listFutureObjects.add(execService.submit(task));
-      }
-      execService.shutdown();
-      execService.awaitTermination(2, TimeUnit.DAYS);
-      LOGGER.info("Total time taken for scan " + (System.currentTimeMillis() - startTime));
-      for (Future future : listFutureObjects) {
-        try {
-          future.get();
-        } catch (ExecutionException e) {
-          throw new QueryExecutionException(e.getMessage());
-        }
-      }
-      return scannedResultProcessor.getQueryResultIterator();
-    } catch (QueryExecutionException exception) {
-      throw new QueryExecutionException(exception);
-    } catch (InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-      throw new QueryExecutionException(e);
-    } finally {
-      execService = null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java
deleted file mode 100644
index f341fa9..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/QueryRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.executor.internal.impl;
-
-import java.util.concurrent.Callable;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.common.logging.impl.StandardLogService;
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.core.datastorage.store.impl.FileFactory;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.processor.BlockProcessor;
-import org.carbondata.query.carbon.processor.impl.DetailQueryBlockProcessor;
-
-/**
- * Class which will execute the query
- */
-public class QueryRunner implements Callable<Void> {
-
-  /**
-   * LOGGER.
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(QueryRunner.class.getName());
-  /**
-   * block processor
-   */
-  private BlockProcessor dataBlockProcessor;
-  /**
-   * file reader which will be used to execute the query
-   */
-  private FileHolder fileReader;
-  /**
-   * block execution info which is required to run the query
-   */
-  private BlockExecutionInfo blockExecutionInfo;
-
-  public QueryRunner(BlockExecutionInfo executionInfo) {
-    this.blockExecutionInfo = executionInfo;
-    this.fileReader = FileFactory.getFileHolder(executionInfo.getFileType());
-    // if detail query detail query processor will be used to process the
-    // block
-    dataBlockProcessor = new DetailQueryBlockProcessor(executionInfo, fileReader);
-  }
-
-  @Override public Void call() throws Exception {
-    StandardLogService
-        .setThreadName(blockExecutionInfo.getPartitionId(), blockExecutionInfo.getQueryId());
-    try {
-      this.dataBlockProcessor.processBlock();
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new Exception(e.getMessage());
-    } finally {
-      this.fileReader.finish();
-    }
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java b/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java
deleted file mode 100644
index 9358f42..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/merger/AbstractScannedResultMerger.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.merger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.result.impl.ListBasedResult;
-import org.carbondata.query.carbon.result.iterator.MemoryBasedResultIterator;
-
-/**
- * Class which processed the scanned result
- * Processing can be merging sorting
- */
-public abstract class AbstractScannedResultMerger implements ScannedResultMerger {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(AbstractScannedResultMerger.class.getName());
-  /**
-   * merging will done using thread
-   */
-  protected ExecutorService execService;
-
-  /**
-   * Merged scanned result which will merge all the result from all the blocks
-   * executor
-   */
-  protected Result mergedScannedResult;
-
-  /**
-   * scanned result list
-   */
-  protected List<Result> scannedResultList;
-
-  /**
-   * tableBlockExecutionInfo
-   */
-  protected BlockExecutionInfo blockExecutionInfo;
-
-  /**
-   * max number of scanned result can keep in memory
-   */
-  private int maxNumberOfScannedResultList;
-
-  /**
-   * lockObject
-   */
-  private Object lockObject;
-
-  public AbstractScannedResultMerger(BlockExecutionInfo blockExecutionInfo,
-      int maxNumberOfScannedresultList) {
-
-    this.lockObject = new Object();
-    this.maxNumberOfScannedResultList = maxNumberOfScannedresultList;
-    execService = Executors.newFixedThreadPool(1);
-    scannedResultList = new ArrayList<Result>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    this.blockExecutionInfo = blockExecutionInfo;
-    initialiseResult();
-  }
-
-  /**
-   * for initializing the map based or list based result.
-   */
-  protected void initialiseResult() {
-    mergedScannedResult = new ListBasedResult();
-  }
-
-  /**
-   * Below method will be used to add the scanned result
-   * If number of scanned result in the list of more than
-   * the maxNumberOfScannedResultList than results present in the
-   * list will be merged to merged scanned result
-   *
-   * @param scannedResult
-   */
-  @Override public void addScannedResult(Result scannedResult) throws QueryExecutionException {
-    synchronized (this.lockObject) {
-      scannedResultList.add(scannedResult);
-      if ((scannedResultList.size() > maxNumberOfScannedResultList)) {
-        List<Result> localResult = scannedResultList;
-        scannedResultList = new ArrayList<Result>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        execService.submit(new MergerThread(localResult));
-      }
-    }
-  }
-
-  /**
-   * Below method will be used to merge the scanned result
-   *
-   * @param scannedResultList scanned result list
-   */
-  protected void mergeScannedResults(List<Result> scannedResultList) {
-    long start = System.currentTimeMillis();
-    LOGGER.debug("Started a slice result merging");
-
-    for (int i = 0; i < scannedResultList.size(); i++) {
-      mergedScannedResult.merge(scannedResultList.get(i));
-    }
-    LOGGER.debug("Finished current slice result merging in time (MS) " + (System.currentTimeMillis()
-            - start));
-  }
-
-  /**
-   * Below method will be used to get the final query
-   * return
-   *
-   * @return iterator over result
-   */
-  @Override public CarbonIterator<Result> getQueryResultIterator() throws QueryExecutionException {
-    execService.shutdown();
-    try {
-      execService.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e1) {
-      LOGGER.error("Problem in thread termination" + e1.getMessage());
-    }
-    if (scannedResultList.size() > 0) {
-      mergeScannedResults(scannedResultList);
-      scannedResultList = null;
-    }
-    LOGGER.debug("Finished result merging from all slices");
-    return new MemoryBasedResultIterator(mergedScannedResult);
-  }
-
-  /**
-   * Thread class to merge the scanned result
-   */
-  private final class MergerThread implements Callable<Void> {
-    private List<Result> scannedResult;
-
-    private MergerThread(List<Result> scannedResult) {
-      this.scannedResult = scannedResult;
-    }
-
-    @Override public Void call() throws Exception {
-      mergeScannedResults(scannedResult);
-      return null;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java b/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java
deleted file mode 100644
index 19ed6cd..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/merger/ScannedResultMerger.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.merger;
-
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.result.Result;
-
-/**
- * Interface for merging the scanned result
- */
-public interface ScannedResultMerger {
-
-  /**
-   * Below method will be used to add the scanned result
-   *
-   * @param scannedResult scanned result
-   * @throws QueryExecutionException throw exception in case of failure
-   */
-  void addScannedResult(Result scannedResult) throws QueryExecutionException;
-
-  /**
-   * Below method will be used to get the query result
-   *
-   * @return query result
-   * @throws QueryExecutionException throw exception in case of any failure
-   */
-  CarbonIterator<Result> getQueryResultIterator() throws QueryExecutionException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java b/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java
deleted file mode 100644
index 8a6c302..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/merger/impl/UnSortedScannedResultMerger.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.merger.impl;
-
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.merger.AbstractScannedResultMerger;
-
-/**
- * Below class will be used merge the unsorted result
- */
-public class UnSortedScannedResultMerger extends AbstractScannedResultMerger {
-
-  public UnSortedScannedResultMerger(BlockExecutionInfo blockExecutionInfo,
-      int maxNumberOfScannedresultList) {
-    super(blockExecutionInfo, maxNumberOfScannedresultList);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java b/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
index 64b519a..92334db 100644
--- a/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
+++ b/core/src/main/java/org/carbondata/query/carbon/model/QueryModel.java
@@ -140,6 +140,12 @@ public class QueryModel implements Serializable {
    */
   private CarbonTable table;
 
+  /**
+   * This is used only whne [forcedDetailRawQuery = true]. By default forcedDetailRawQuery returns
+   * dictionary values. But user wants in detail raw bytes the user set this field to true.
+   */
+  private boolean rawBytesDetailQuery;
+
   public QueryModel() {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     queryDimension = new ArrayList<QueryDimension>();
@@ -499,4 +505,12 @@ public class QueryModel implements Serializable {
   public void setColumnToDictionaryMapping(Map<String, Dictionary> columnToDictionaryMapping) {
     this.columnToDictionaryMapping = columnToDictionaryMapping;
   }
+
+  public boolean isRawBytesDetailQuery() {
+    return rawBytesDetailQuery;
+  }
+
+  public void setRawBytesDetailQuery(boolean rawBytesDetailQuery) {
+    this.rawBytesDetailQuery = rawBytesDetailQuery;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java
new file mode 100644
index 0000000..52b1bdf
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockIterator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.processor;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.DataRefNode;
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.query.carbon.collector.ScannedResultCollector;
+import org.carbondata.query.carbon.collector.impl.ListBasedResultCollector;
+import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
+import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
+import org.carbondata.query.carbon.result.AbstractScannedResult;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.scanner.BlockletScanner;
+import org.carbondata.query.carbon.scanner.impl.FilterScanner;
+import org.carbondata.query.carbon.scanner.impl.NonFilterScanner;
+
+/**
+ * This abstract class provides a skeletal implementation of the
+ * Block iterator.
+ */
+public abstract class AbstractDataBlockIterator extends CarbonIterator<Result> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractDataBlockIterator.class.getName());
+  /**
+   * iterator which will be used to iterate over data blocks
+   */
+  protected CarbonIterator<DataRefNode> dataBlockIterator;
+
+  /**
+   * execution details
+   */
+  protected BlockExecutionInfo blockExecutionInfo;
+
+  /**
+   * result collector which will be used to aggregate the scanned result
+   */
+  protected ScannedResultCollector scannerResultAggregator;
+
+  /**
+   * processor which will be used to process the block processing can be
+   * filter processing or non filter processing
+   */
+  protected BlockletScanner blockletScanner;
+
+  /**
+   * to hold the data block
+   */
+  protected BlocksChunkHolder blocksChunkHolder;
+
+  /**
+   * batch size of result
+   */
+  protected int batchSize;
+
+  protected AbstractScannedResult scannedResult;
+
+  public AbstractDataBlockIterator(BlockExecutionInfo blockExecutionInfo,
+      FileHolder fileReader, int batchSize) {
+    this.blockExecutionInfo = blockExecutionInfo;
+    dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
+        blockExecutionInfo.getNumberOfBlockToScan());
+    blocksChunkHolder = new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
+        blockExecutionInfo.getTotalNumberOfMeasureBlock());
+    blocksChunkHolder.setFileReader(fileReader);
+
+    if (blockExecutionInfo.getFilterExecuterTree() != null) {
+      blockletScanner = new FilterScanner(blockExecutionInfo);
+    } else {
+      blockletScanner = new NonFilterScanner(blockExecutionInfo);
+    }
+
+    this.scannerResultAggregator =
+        new ListBasedResultCollector(blockExecutionInfo);
+    this.batchSize = batchSize;
+  }
+
+  public boolean hasNext() {
+    try {
+      if (scannedResult != null && scannedResult.hasNext()) {
+        return true;
+      } else {
+        scannedResult = getNextScannedResult();
+        while (scannedResult != null) {
+          if (scannedResult.hasNext()) {
+            return true;
+          }
+          scannedResult = getNextScannedResult();
+        }
+        return false;
+      }
+    } catch (QueryExecutionException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private AbstractScannedResult getNextScannedResult() throws QueryExecutionException {
+    if (dataBlockIterator.hasNext()) {
+      blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+      blocksChunkHolder.reset();
+      return blockletScanner.scanBlocklet(blocksChunkHolder);
+    }
+    return null;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java
deleted file mode 100644
index fdcf6f1..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/AbstractDataBlockProcessor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.processor;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.datastore.DataRefNode;
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.aggregator.ScannedResultAggregator;
-import org.carbondata.query.carbon.aggregator.impl.ListBasedResultAggregator;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.scanner.BlockletScanner;
-import org.carbondata.query.carbon.scanner.impl.FilterScanner;
-import org.carbondata.query.carbon.scanner.impl.NonFilterScanner;
-
-/**
- * This class provides a skeletal implementation of the
- * {@link BlockProcessor} interface to minimize the effort required to
- * implement this interface.
- */
-public abstract class AbstractDataBlockProcessor implements BlockProcessor {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(AbstractDataBlockProcessor.class.getName());
-  /**
-   * iterator which will be used to iterate over data blocks
-   */
-  protected CarbonIterator<DataRefNode> dataBlockIterator;
-
-  /**
-   * execution details
-   */
-  protected BlockExecutionInfo blockExecutionInfo;
-
-  /**
-   * result aggregator which will be used to aggregate the scanned result
-   */
-  protected ScannedResultAggregator scannerResultAggregator;
-
-  /**
-   * processor which will be used to process the block processing can be
-   * filter processing or non filter processing
-   */
-  protected BlockletScanner blockletScanner;
-
-  /**
-   * to hold the data block
-   */
-  protected BlocksChunkHolder blocksChunkHolder;
-
-  public AbstractDataBlockProcessor(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader) {
-    this.blockExecutionInfo = blockExecutionInfo;
-    dataBlockIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(),
-        blockExecutionInfo.getNumberOfBlockToScan());
-    blocksChunkHolder = new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
-        blockExecutionInfo.getTotalNumberOfMeasureBlock());
-    blocksChunkHolder.setFileReader(fileReader);
-
-    if (blockExecutionInfo.getFilterExecuterTree() != null) {
-      blockletScanner = new FilterScanner(blockExecutionInfo);
-    } else {
-      blockletScanner = new NonFilterScanner(blockExecutionInfo);
-    }
-
-    this.scannerResultAggregator =
-        new ListBasedResultAggregator(blockExecutionInfo);
-  }
-
-  /**
-   * Below method will be used to add the scanned result to scanned result
-   * processor
-   */
-  protected void finishScanning() {
-    try {
-      this.blockExecutionInfo.getScannedResultProcessor()
-          .addScannedResult(scannerResultAggregator.getAggregatedResult());
-    } catch (QueryExecutionException e) {
-      LOGGER.error(e,
-          "Problem while adding the result to Scanned Result Processor");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java
deleted file mode 100644
index 84e2d5f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/BlockProcessor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.carbon.processor;
-
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-
-/**
- * Scanner interface which will be used
- * to scan the blocks.
- */
-public interface BlockProcessor {
-
-  /**
-   * Below method can be used to scan the block based on the query execution infos
-   *
-   * @throws QueryExecutionException
-   */
-  void processBlock() throws QueryExecutionException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java
deleted file mode 100644
index 260b894..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/impl/AggregateQueryBlockProcessor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.processor.impl;
-
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.processor.AbstractDataBlockProcessor;
-
-/**
- * Below class will be used to process the blocks for
- * aggregated query
- */
-public class AggregateQueryBlockProcessor extends AbstractDataBlockProcessor {
-
-  /**
-   * AggregateQueryScanner constructor
-   *
-   * @param blockExecutionInfos
-   */
-  public AggregateQueryBlockProcessor(BlockExecutionInfo tableBlockExecutionInfos,
-      FileHolder fileReader) {
-    super(tableBlockExecutionInfos, fileReader);
-  }
-
-  /**
-   * Below method will be used to scan the block
-   * then it will call processor to process the data
-   * and the it will call aggregator to aggregate the data
-   * it will call finish once all the blocks of a table is scanned
-   *
-   * @throws QueryExecutionException
-   */
-  @Override public void processBlock() throws QueryExecutionException {
-    while (dataBlockIterator.hasNext()) {
-      try {
-        blocksChunkHolder.setDataBlock(dataBlockIterator.next());
-        blocksChunkHolder.reset();
-        this.scannerResultAggregator.aggregateData(blockletScanner.scanBlocklet(blocksChunkHolder));
-      } catch (Exception e) {
-        throw new QueryExecutionException(e);
-      }
-    }
-    finishScanning();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java
new file mode 100644
index 0000000..ade1965
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/processor/impl/DataBlockIteratorImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.processor.impl;
+
+import org.carbondata.core.datastorage.store.FileHolder;
+import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
+import org.carbondata.query.carbon.processor.AbstractDataBlockIterator;
+import org.carbondata.query.carbon.result.Result;
+
+/**
+ * Below class will be used to process the block for detail query
+ */
+public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
+
+  /**
+   * DataBlockIteratorImpl Constructor
+   *
+   * @param blockExecutionInfo execution information
+   */
+  public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo,
+      FileHolder fileReader, int batchSize) {
+    super(blockExecutionInfo, fileReader, batchSize);
+  }
+
+  /**
+   * It scans the block and returns the result with @batchSize
+   *
+   * @return Result of @batchSize
+   */
+  public Result next() {
+    this.scannerResultAggregator.collectData(scannedResult, batchSize);
+    Result result = this.scannerResultAggregator.getCollectedResult();
+    while (result.size() < batchSize && hasNext()) {
+      this.scannerResultAggregator.collectData(scannedResult, batchSize-result.size());
+      result.merge(this.scannerResultAggregator.getCollectedResult());
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java b/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java
deleted file mode 100644
index b079a84..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/processor/impl/DetailQueryBlockProcessor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.processor.impl;
-
-import org.carbondata.core.datastorage.store.FileHolder;
-import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
-import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
-import org.carbondata.query.carbon.processor.AbstractDataBlockProcessor;
-
-/**
- * Below class will be used to process the block for detail query
- */
-public class DetailQueryBlockProcessor extends AbstractDataBlockProcessor {
-
-  /**
-   * counter for number of records processed
-   */
-  private int counter;
-
-  /**
-   * DetailQueryScanner Constructor
-   *
-   * @param blockExecutionInfo execution information
-   */
-  public DetailQueryBlockProcessor(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader) {
-    super(blockExecutionInfo, fileReader);
-  }
-
-  /**
-   * Below method will be used scan the blocks and then process the scanned blocks
-   * as its a detail query so its will use dummy aggregator
-   * to aggregate the data.
-   * This scanner will handle the limit scenario if detail query is without order by.
-   * In case of detail query once one block is process it will pass to scanned result processor
-   * as in this case number of records will be more and it will take more memory
-   *
-   * @throws QueryExecutionException
-   */
-  @Override public void processBlock() throws QueryExecutionException {
-
-    while (dataBlockIterator.hasNext()) {
-      blocksChunkHolder.setDataBlock(dataBlockIterator.next());
-      blocksChunkHolder.reset();
-      counter += this.scannerResultAggregator
-          .aggregateData(blockletScanner.scanBlocklet(blocksChunkHolder));
-      //      finishScanning();
-      if (blockExecutionInfo.getLimit() != -1 && counter >= blockExecutionInfo.getLimit()) {
-        break;
-      }
-    }
-    finishScanning();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/29360501/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java b/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
index 4aa2adf..b00c021 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/BatchRawResult.java
@@ -19,21 +19,11 @@
 
 package org.carbondata.query.carbon.result;
 
-import org.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.carbondata.query.carbon.model.QueryDimension;
-import org.carbondata.query.carbon.model.QuerySchemaInfo;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-
 /**
  * Below class holds the query result of batches.
  */
 public class BatchRawResult extends BatchResult {
 
-  private QuerySchemaInfo querySchemaInfo;
-
   /**
    * This method will return one row at a time based on the counter given.
    * @param counter
@@ -42,51 +32,6 @@ public class BatchRawResult extends BatchResult {
   public Object[] getRawRow(int counter) {
     return rows[counter];
   }
-  /**
-   * Returns the next element in the iteration.
-   *
-   * @return the next element in the iteration
-   */
-  @Override public Object[] next() {
-    return parseData();
-  }
-
-  private Object[] parseData() {
-    int[] order = querySchemaInfo.getQueryReverseOrder();
-    Object[] row = rows[counter];
-    ByteArrayWrapper key = (ByteArrayWrapper) row[0];
-    QueryDimension[] queryDimensions = querySchemaInfo.getQueryDimensions();
-    Object[] parsedData = new Object[queryDimensions.length + row.length - 1];
-    if(key != null) {
-      long[] surrogateResult = querySchemaInfo.getKeyGenerator()
-          .getKeyArray(key.getDictionaryKey(), querySchemaInfo.getMaskedByteIndexes());
-      int noDictionaryColumnIndex = 0;
-      for (int i = 0; i < queryDimensions.length; i++) {
-        if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
-          parsedData[order[i]] = DataTypeUtil.getDataBasedOnDataType(
-              new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++)),
-              queryDimensions[i].getDimension().getDataType());
-        } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-          DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-              .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
-          parsedData[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
-              (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()]);
-        } else {
-          parsedData[order[i]] =
-              (int) surrogateResult[queryDimensions[i].getDimension().getKeyOrdinal()];
-        }
-      }
-    }
-    for (int i = 0; i < row.length - 1; i++) {
-      parsedData[order[i + queryDimensions.length]] = row[i + 1];
-    }
-    counter++;
-    return parsedData;
-  }
-
-  public void setQuerySchemaInfo(QuerySchemaInfo querySchemaInfo) {
-    this.querySchemaInfo = querySchemaInfo;
-  }
 
   /**
    * For getting the total size.