You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:16:04 UTC

[16/56] [abbrv] incubator-carbondata git commit: Refactored core package and fixed all testcases (#684)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java b/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java
deleted file mode 100644
index 42c3027..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.carbon.result.impl;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-
-/**
- * To store aggregated result
- */
-public class MapBasedResult implements Result<Map<ByteArrayWrapper, MeasureAggregator[]>> {
-  /**
-   * iterator over result
-   */
-  private Iterator<Entry<ByteArrayWrapper, MeasureAggregator[]>> resultIterator;
-
-  /**
-   * result entry
-   */
-  private Entry<ByteArrayWrapper, MeasureAggregator[]> resultEntry;
-
-  /**
-   * scanned result
-   */
-  private Map<ByteArrayWrapper, MeasureAggregator[]> scannerResult;
-
-  /**
-   * total number of result
-   */
-  private int resulSize;
-
-  public MapBasedResult() {
-    scannerResult = new HashMap<ByteArrayWrapper, MeasureAggregator[]>(
-        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    this.resultIterator = scannerResult.entrySet().iterator();
-  }
-
-  /**
-   * @return the key
-   */
-  @Override public ByteArrayWrapper getKey() {
-    resultEntry = this.resultIterator.next();
-    return resultEntry.getKey();
-  }
-
-  /**
-   * return the value
-   */
-  @Override public MeasureAggregator[] getValue() {
-    return resultEntry.getValue();
-  }
-
-  /**
-   * Method to check more result is present
-   * or not
-   */
-  @Override public boolean hasNext() {
-    return this.resultIterator.hasNext();
-  }
-
-  /***
-   * below method will be used to merge the
-   * scanned result
-   *
-   * @param otherResult return to be merged
-   */
-  @Override public void addScannedResult(Map<ByteArrayWrapper, MeasureAggregator[]> scannerResult) {
-    this.scannerResult = scannerResult;
-    resulSize = scannerResult.size();
-    this.resultIterator = scannerResult.entrySet().iterator();
-  }
-
-  /***
-   * below method will be used to merge the
-   * scanned result, in case of map based the
-   * result we need to aggregate the result
-   *
-   * @param otherResult return to be merged
-   */
-  @Override public void merge(Result<Map<ByteArrayWrapper, MeasureAggregator[]>> result) {
-    ByteArrayWrapper key = null;
-    MeasureAggregator[] value = null;
-    Map<ByteArrayWrapper, MeasureAggregator[]> otherResult = result.getResult();
-    if (otherResult != null) {
-      while (resultIterator.hasNext()) {
-        Entry<ByteArrayWrapper, MeasureAggregator[]> entry = resultIterator.next();
-        key = entry.getKey();
-        value = entry.getValue();
-        MeasureAggregator[] agg = otherResult.get(key);
-        if (agg != null) {
-          for (int j = 0; j < agg.length; j++) {
-            agg[j].merge(value[j]);
-          }
-        } else {
-          otherResult.put(key, value);
-        }
-      }
-      resulSize = otherResult.size();
-      this.resultIterator = otherResult.entrySet().iterator();
-      this.scannerResult = otherResult;
-    }
-  }
-
-  /**
-   * Return the size of the result
-   */
-  @Override public int size() {
-    return resulSize;
-  }
-
-  /**
-   * @return the complete result
-   */
-  @Override public Map<ByteArrayWrapper, MeasureAggregator[]> getResult() {
-    return this.scannerResult;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
index bd56ec2..09fa50c 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
@@ -39,7 +39,7 @@ import org.carbondata.query.carbon.model.QueryModel;
  * executing that query are returning a iterator over block and every time next
  * call will come it will execute the block and return the result
  */
-public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterator<E> {
+public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
 
   /**
    * LOGGER.
@@ -75,7 +75,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
   /**
    * current counter to check how blocklet has been executed
    */
-  private long currentCounter;
+  protected long currentCounter;
 
   /**
    * keep the track of number of blocklet of a block has been executed
@@ -138,7 +138,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
     return currentCounter < totalNumberOfNode;
   }
 
-  protected void updateSliceIndexToBeExecuted() {
+  protected int updateSliceIndexToBeExecuted() {
     Arrays.fill(blockIndexToBeExecuted, -1);
     int currentSliceIndex = 0;
     int i = 0;
@@ -154,7 +154,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
         break;
       }
     }
-    currentCounter += i;
+    return i;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java
new file mode 100644
index 0000000..826f816
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.query.carbon.result.iterator;
+
+import java.util.List;
+
+import org.carbondata.core.iterator.CarbonIterator;
+import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
+import org.carbondata.query.carbon.model.QueryModel;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
+import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl;
+
+/**
+ * Iterator over chunk result
+ */
+public class ChunkBasedDetailResultIterator extends CarbonIterator<BatchResult> {
+
+  /**
+   * query result prepartor which will be used to create a query result
+   */
+  private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
+
+  /**
+   * iterator over result
+   */
+  private CarbonIterator<Result> queryResultIterator;
+
+  public ChunkBasedDetailResultIterator(CarbonIterator<Result> queryResultIterator,
+      QueryExecutorProperties executerProperties, QueryModel queryModel) {
+    this.queryResultIterator = queryResultIterator;
+    this.queryResultPreparator =
+        new DetailQueryResultPreparatorImpl(executerProperties, queryModel);
+
+  }
+
+  /**
+   * Returns {@code true} if the iteration has more elements. (In other words,
+   * returns {@code true}
+   *
+   * @return {@code true} if the iteration has more elements
+   */
+  @Override public boolean hasNext() {
+    return queryResultIterator.hasNext();
+  }
+
+  /**
+   * Returns the next element in the iteration.
+   *
+   * @return the next element in the iteration
+   */
+  @Override public BatchResult next() {
+    return queryResultPreparator.prepareQueryResult(queryResultIterator.next());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java
deleted file mode 100644
index 71b311f..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.carbon.result.iterator;
-
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
-import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
-import org.carbondata.query.carbon.result.preparator.impl.QueryResultPreparatorImpl;
-
-/**
- * Iterator over chunk result
- */
-public class ChunkBasedResultIterator extends CarbonIterator<BatchResult> {
-
-  /**
-   * query result prepartor which will be used to create a query result
-   */
-  private QueryResultPreparator<BatchResult> queryResultPreparator;
-
-  /**
-   * iterator over result
-   */
-  private CarbonIterator<Result> queryResultIterator;
-
-  public ChunkBasedResultIterator(CarbonIterator<Result> queryResultIterator,
-      QueryExecutorProperties executerProperties, QueryModel queryModel) {
-    this.queryResultIterator = queryResultIterator;
-    this.queryResultPreparator = new QueryResultPreparatorImpl(executerProperties, queryModel);
-
-  }
-
-  /**
-   * Returns {@code true} if the iteration has more elements. (In other words,
-   * returns {@code true}
-   *
-   * @return {@code true} if the iteration has more elements
-   */
-  @Override public boolean hasNext() {
-    return queryResultIterator.hasNext();
-  }
-
-  /**
-   * Returns the next element in the iteration.
-   *
-   * @return the next element in the iteration
-   */
-  @Override public BatchResult next() {
-    return queryResultPreparator.prepareQueryResult(queryResultIterator.next());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java
deleted file mode 100644
index ea4d65c..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.carbondata.query.carbon.result.iterator;
-
-import org.carbondata.core.iterator.CarbonIterator;
-import org.carbondata.query.carbon.result.BatchRawResult;
-
-public class ChunkRawRowIterartor extends CarbonIterator<Object[]> {
-
-  /**
-   * iterator over chunk result
-   */
-  private CarbonIterator<BatchRawResult> iterator;
-
-  /**
-   * currect chunk
-   */
-  private BatchRawResult currentchunk;
-
-  public ChunkRawRowIterartor(CarbonIterator<BatchRawResult> iterator) {
-    this.iterator = iterator;
-    if (iterator.hasNext()) {
-      currentchunk = iterator.next();
-    }
-  }
-
-  /**
-   * Returns {@code true} if the iteration has more elements. (In other words,
-   * returns {@code true} if {@link #next} would return an element rather than
-   * throwing an exception.)
-   *
-   * @return {@code true} if the iteration has more elements
-   */
-  @Override public boolean hasNext() {
-    if (null != currentchunk) {
-      if ((currentchunk.hasNext())) {
-        return true;
-      } else if (!currentchunk.hasNext()) {
-        while (iterator.hasNext()) {
-          currentchunk = iterator.next();
-          if (currentchunk != null && currentchunk.hasNext()) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Returns the next element in the iteration.
-   *
-   * @return the next element in the iteration
-   */
-  @Override public Object[] next() {
-    return currentchunk.next();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
index 6ba54cd..3db3404 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java
@@ -21,12 +21,11 @@ package org.carbondata.query.carbon.result.iterator;
 
 import org.carbondata.core.iterator.CarbonIterator;
 import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.RowResult;
 
 /**
  * Iterator over row result
  */
-public class ChunkRowIterator extends CarbonIterator<RowResult> {
+public class ChunkRowIterator extends CarbonIterator<Object[]> {
 
   /**
    * iterator over chunk result
@@ -73,7 +72,7 @@ public class ChunkRowIterator extends CarbonIterator<RowResult> {
    *
    * @return the next element in the iteration
    */
-  @Override public RowResult next() {
+  @Override public Object[] next() {
     return currentchunk.next();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
index f07eb20..3641e75 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
@@ -29,16 +29,17 @@ import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
 import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
 import org.carbondata.query.carbon.model.QueryModel;
 import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
 import org.carbondata.query.carbon.result.Result;
 import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
-import org.carbondata.query.carbon.result.preparator.impl.QueryResultPreparatorImpl;
+import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl;
 
 /**
  * In case of detail query we cannot keep all the records in memory so for
  * executing that query are returning a iterator over block and every time next
  * call will come it will execute the block and return the result
  */
-public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> {
+public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator {
 
   /**
    * LOGGER.
@@ -49,17 +50,18 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator
   /**
    * to prepare the result
    */
-  private QueryResultPreparator<BatchResult> queryResultPreparator;
+  private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
 
   public DetailQueryResultIterator(List<BlockExecutionInfo> infos,
       QueryExecutorProperties executerProperties, QueryModel queryModel,
       InternalQueryExecutor queryExecutor) {
     super(infos, executerProperties, queryModel, queryExecutor);
-    this.queryResultPreparator = new QueryResultPreparatorImpl(executerProperties, queryModel);
+    this.queryResultPreparator =
+        new DetailQueryResultPreparatorImpl(executerProperties, queryModel);
   }
 
   @Override public BatchResult next() {
-    updateSliceIndexToBeExecuted();
+    currentCounter += updateSliceIndexToBeExecuted();
     CarbonIterator<Result> result = null;
     try {
       result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
index 4f9dbe2..2b14793 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java
@@ -19,6 +19,10 @@
 package org.carbondata.query.carbon.result.iterator;
 
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.carbondata.core.iterator.CarbonIterator;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
@@ -26,7 +30,8 @@ import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
 import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo;
 import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor;
 import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchRawResult;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
 import org.carbondata.query.carbon.result.Result;
 import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
 import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparatorImpl;
@@ -36,10 +41,13 @@ import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparat
  * executing that query are returning a iterator over block and every time next
  * call will come it will execute the block and return the result
  */
-public class DetailRawQueryResultIterator
-    extends AbstractDetailQueryResultIterator<BatchRawResult> {
+public class DetailRawQueryResultIterator extends AbstractDetailQueryResultIterator {
 
-  private QueryResultPreparator<BatchRawResult> queryResultPreparator;
+  private ExecutorService execService = Executors.newFixedThreadPool(1);
+
+  private Future<ResultInfo> future;
+
+  private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator;
 
   public DetailRawQueryResultIterator(List<BlockExecutionInfo> infos,
       QueryExecutorProperties executerProperties, QueryModel queryModel,
@@ -48,26 +56,63 @@ public class DetailRawQueryResultIterator
     this.queryResultPreparator = new RawQueryResultPreparatorImpl(executerProperties, queryModel);
   }
 
-  @Override public BatchRawResult next() {
-    updateSliceIndexToBeExecuted();
-    CarbonIterator<Result> result = null;
-    try {
-      result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted);
-    } catch (QueryExecutionException ex) {
-      throw new RuntimeException(ex.getCause());
+  @Override public BatchResult next() {
+    BatchResult result;
+    if (future == null) {
+      future = execute();
     }
-    for (int i = 0; i < blockIndexToBeExecuted.length; i++) {
-      if (blockIndexToBeExecuted[i] != -1) {
-        blockExecutionInfos.get(blockIndexToBeExecuted[i]).setFirstDataBlock(
-            blockExecutionInfos.get(blockIndexToBeExecuted[i]).getFirstDataBlock()
-                .getNextDataRefNode());
-      }
+    ResultInfo resultFromFuture = getResultFromFuture(future);
+    result = resultFromFuture.result;
+    currentCounter += resultFromFuture.counter;
+    if (hasNext()) {
+      future = execute();
     }
-    if (null != result) {
-      Result next = result.next();
-      return queryResultPreparator.prepareQueryResult(next);
-    } else {
-      return queryResultPreparator.prepareQueryResult(null);
+    return result;
+  }
+
+  private ResultInfo getResultFromFuture(Future<ResultInfo> future) {
+    try {
+      return future.get();
+    } catch (Exception e) {
+      e.printStackTrace();
     }
+    return new ResultInfo();
+  }
+
+  private Future<ResultInfo> execute() {
+    return execService.submit(new Callable<ResultInfo>() {
+      @Override public ResultInfo call() {
+        CarbonIterator<Result> result = null;
+        int counter =  updateSliceIndexToBeExecuted();
+        try {
+          result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted);
+        } catch (QueryExecutionException ex) {
+          throw new RuntimeException(ex.getCause());
+        }
+        for (int i = 0; i < blockIndexToBeExecuted.length; i++) {
+          if (blockIndexToBeExecuted[i] != -1) {
+            blockExecutionInfos.get(blockIndexToBeExecuted[i]).setFirstDataBlock(
+                blockExecutionInfos.get(blockIndexToBeExecuted[i]).getFirstDataBlock()
+                    .getNextDataRefNode());
+          }
+        }
+        BatchResult batchResult;
+        if (null != result) {
+          Result next = result.next();
+          batchResult = queryResultPreparator.prepareQueryResult(next);
+        } else {
+          batchResult = queryResultPreparator.prepareQueryResult(null);
+        }
+        ResultInfo resultInfo = new ResultInfo();
+        resultInfo.counter = counter;
+        resultInfo.result = batchResult;
+        return resultInfo;
+      }
+    });
+  }
+
+  private static class ResultInfo {
+    private int counter;
+    private BatchResult result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
index 431e163..fbf3074 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java
@@ -1,9 +1,10 @@
 package org.carbondata.query.carbon.result.preparator;
 
+import org.carbondata.query.carbon.result.BatchResult;
 import org.carbondata.query.carbon.result.Result;
 
-public interface QueryResultPreparator<E> {
+public interface QueryResultPreparator<K, V> {
 
-  public E prepareQueryResult(Result scannedResult);
+  public BatchResult prepareQueryResult(Result<K, V> scannedResult);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
index 1890baf..ad5b4c5 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java
@@ -1,17 +1,19 @@
 package org.carbondata.query.carbon.result.preparator.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 
-import org.carbondata.query.aggregator.MeasureAggregator;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
+import org.carbondata.query.carbon.model.QueryDimension;
 import org.carbondata.query.carbon.model.QueryModel;
 import org.carbondata.query.carbon.result.BatchResult;
 import org.carbondata.query.carbon.result.preparator.QueryResultPreparator;
-import org.carbondata.query.scanner.impl.CarbonKey;
-import org.carbondata.query.scanner.impl.CarbonValue;
+import org.carbondata.query.carbon.util.DataTypeUtil;
 
-public abstract class AbstractQueryResultPreparator<E> implements QueryResultPreparator<E> {
+public abstract class AbstractQueryResultPreparator<K, V> implements QueryResultPreparator<K, V> {
 
   /**
    * query properties
@@ -29,13 +31,35 @@ public abstract class AbstractQueryResultPreparator<E> implements QueryResultPre
     this.queryModel = queryModel;
   }
 
-  protected void fillMeasureValueForAggGroupByQuery(QueryModel queryModel,
-      Object[][] surrogateResult, int dimensionCount, int columnIndex, MeasureAggregator[] v) {
-    int msrCount = queryModel.getQueryMeasures().size();
-    for (int i = 0; i < msrCount; i++) {
-      v[queryExecuterProperties.measureStartIndex + i] =
-          ((MeasureAggregator) surrogateResult[dimensionCount
-              + queryExecuterProperties.measureStartIndex + i][columnIndex]);
+  protected void fillDimensionData(Object[][] convertedResult, List<QueryDimension> queryDimensions,
+      int dimensionCount, Object[] row, int rowIndex) {
+    QueryDimension queryDimension;
+    for (int i = 0; i < dimensionCount; i++) {
+      queryDimension = queryDimensions.get(i);
+      if (!CarbonUtil
+          .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) {
+        row[queryDimension.getQueryOrder()] = convertedResult[i][rowIndex];
+      } else if (CarbonUtil
+          .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType());
+        row[queryDimension.getQueryOrder()] = directDictionaryGenerator
+            .getValueFromSurrogate((Integer) convertedResult[i][rowIndex]);
+      } else {
+        if (queryExecuterProperties.sortDimIndexes[i] == 1) {
+          row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
+              queryExecuterProperties.columnToDictionayMapping
+                  .get(queryDimension.getDimension().getColumnId())
+                  .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][rowIndex]),
+              queryDimension.getDimension().getDataType());
+        } else {
+          row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
+              queryExecuterProperties.columnToDictionayMapping
+                  .get(queryDimension.getDimension().getColumnId())
+                  .getDictionaryValueForKey((Integer) convertedResult[i][rowIndex]),
+              queryDimension.getDimension().getDataType());
+        }
+      }
     }
   }
 
@@ -54,18 +78,9 @@ public abstract class AbstractQueryResultPreparator<E> implements QueryResultPre
   }
 
   protected BatchResult getEmptyChunkResult(int size) {
-    List<CarbonKey> keys = new ArrayList<CarbonKey>(size);
-    List<CarbonValue> values = new ArrayList<CarbonValue>(size);
-    Object[] row = new Object[1];
-    for (int i = 0; i < size; i++)
-
-    {
-      values.add(new CarbonValue(new MeasureAggregator[0]));
-      keys.add(new CarbonKey(row));
-    }
+    Object[][] row = new Object[size][1];
     BatchResult chunkResult = new BatchResult();
-    chunkResult.setKeys(keys);
-    chunkResult.setValues(values);
+    chunkResult.setRows(row);
     return chunkResult;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java
new file mode 100644
index 0000000..712894a
--- /dev/null
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.query.carbon.result.preparator.impl;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
+import org.carbondata.query.carbon.model.QueryDimension;
+import org.carbondata.query.carbon.model.QueryMeasure;
+import org.carbondata.query.carbon.model.QueryModel;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
+import org.carbondata.query.carbon.result.Result;
+import org.carbondata.query.carbon.util.DataTypeUtil;
+import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
+
+/**
+ * Below class will be used to get the result by converting to actual data
+ * Actual data conversion can be converting the surrogate key to actual data
+ *
+ * @TODO there are many things in class which is very confusing, need to check
+ * why it was handled like that and how we can handle that in a better
+ * way.Need to revisit this class. IF aggregation is push down to spark
+ * layer and if we can process the data in byte array format then this
+ * class wont be useful so in future we can delete this class.
+ * @TODO need to expose one interface which will return the result based on required type
+ * for example its implementation case return converted result or directly result with out
+ * converting to actual value
+ */
+public class DetailQueryResultPreparatorImpl
+    extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DetailQueryResultPreparatorImpl.class.getName());
+
+  public DetailQueryResultPreparatorImpl(QueryExecutorProperties executerProperties,
+      QueryModel queryModel) {
+    super(executerProperties, queryModel);
+  }
+
+  @Override public BatchResult prepareQueryResult(
+      Result<List<ListBasedResultWrapper>, Object> scannedResult) {
+    if ((null == scannedResult || scannedResult.size() < 1)) {
+      return new BatchResult();
+    }
+    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
+    int dimensionCount = queryDimension.size();
+    int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureDataTypes.length;
+    Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
+    if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0
+        && scannedResult.size() > 0) {
+      return getEmptyChunkResult(scannedResult.size());
+    }
+    int currentRow = 0;
+    long[] surrogateResult = null;
+    int noDictionaryColumnIndex = 0;
+    ByteArrayWrapper key = null;
+    Object[] value = null;
+    while (scannedResult.hasNext()) {
+      key = scannedResult.getKey();
+      value = scannedResult.getValue();
+      if (key != null) {
+        surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator()
+            .getKeyArray(key.getDictionaryKey(),
+                queryExecuterProperties.keyStructureInfo.getMaskedBytes());
+        for (int i = 0; i < dimensionCount; i++) {
+          if (!CarbonUtil.hasEncoding(queryDimension.get(i).getDimension().getEncoder(),
+              Encoding.DICTIONARY)) {
+            resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType(
+                new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++),
+                    Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
+                queryDimension.get(i).getDimension().getDataType());
+          } else {
+            resultData[currentRow][i] =
+                (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()];
+          }
+        }
+      }
+      if (value != null) {
+        System.arraycopy(value, 0, resultData[currentRow], dimensionCount,
+            queryExecuterProperties.measureDataTypes.length);
+      }
+      currentRow++;
+      noDictionaryColumnIndex = 0;
+    }
+    if (resultData.length > 0) {
+      resultData = encodeToRows(resultData);
+    }
+    return getResult(queryModel, resultData);
+  }
+
+  private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) {
+
+    int rowSize = convertedResult[0].length;
+    Object[][] rows = new Object[rowSize][];
+    List<QueryDimension> queryDimensions = queryModel.getQueryDimension();
+    int dimensionCount = queryDimensions.size();
+    int msrCount = queryExecuterProperties.measureDataTypes.length;
+    Object[] row;
+    for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) {
+      row = new Object[dimensionCount + msrCount];
+      fillDimensionData(convertedResult, queryDimensions, dimensionCount, row, rowIndex);
+
+      QueryMeasure msr;
+      for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
+        msr = queryModel.getQueryMeasures().get(i);
+        row[msr.getQueryOrder()] = convertedResult[dimensionCount + i][rowIndex];
+      }
+      rows[rowIndex] = row;
+    }
+    LOGGER.info(
+        "###########################################------ Total Number of records" + rowSize);
+    BatchResult chunkResult = new BatchResult();
+    chunkResult.setRows(rows);
+    return chunkResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
deleted file mode 100644
index 5604ecd..0000000
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.carbondata.query.carbon.result.preparator.impl;
-
-import java.math.BigDecimal;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.count.CountAggregator;
-import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregator;
-import org.carbondata.query.aggregator.impl.distinct.DistinctStringCountAggregator;
-import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
-import org.carbondata.query.carbon.model.DimensionAggregatorInfo;
-import org.carbondata.query.carbon.model.QueryDimension;
-import org.carbondata.query.carbon.model.QueryMeasure;
-import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchResult;
-import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
-import org.carbondata.query.scanner.impl.CarbonKey;
-import org.carbondata.query.scanner.impl.CarbonValue;
-
-/**
- * Below class will be used to get the result by converting to actual data
- * Actual data conversion can be converting the surrogate key to actual data
- *
- * @TODO there are many things in class which is very confusing, need to check
- * why it was handled like that and how we can handle that in a better
- * way.Need to revisit this class. IF aggregation is push down to spark
- * layer and if we can process the data in byte array format then this
- * class wont be useful so in future we can delete this class.
- * @TODO need to expose one interface which will return the result based on required type
- * for example its implementation case return converted result or directly result with out
- * converting to actual value
- */
-public class QueryResultPreparatorImpl extends AbstractQueryResultPreparator<BatchResult> {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(QueryResultPreparatorImpl.class.getName());
-
-  public QueryResultPreparatorImpl(QueryExecutorProperties executerProperties,
-      QueryModel queryModel) {
-    super(executerProperties, queryModel);
-  }
-
-  @Override public BatchResult prepareQueryResult(Result scannedResult) {
-    if ((null == scannedResult || scannedResult.size() < 1)) {
-      return new BatchResult();
-    }
-    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
-    int dimensionCount = queryDimension.size();
-    int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureAggregators.length;
-    Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
-    if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0
-        && scannedResult.size() > 0) {
-      return getEmptyChunkResult(scannedResult.size());
-    }
-    int currentRow = 0;
-    long[] surrogateResult = null;
-    int noDictionaryColumnIndex = 0;
-    ByteArrayWrapper key = null;
-    MeasureAggregator[] value = null;
-    while (scannedResult.hasNext()) {
-      key = scannedResult.getKey();
-      value = scannedResult.getValue();
-      surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator()
-          .getKeyArray(key.getDictionaryKey(),
-              queryExecuterProperties.keyStructureInfo.getMaskedBytes());
-      for (int i = 0; i < dimensionCount; i++) {
-        if (!CarbonUtil
-            .hasEncoding(queryDimension.get(i).getDimension().getEncoder(), Encoding.DICTIONARY)) {
-          resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType(
-              new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++),
-                  Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
-              queryDimension.get(i).getDimension().getDataType());
-        } else {
-          resultData[currentRow][i] =
-              (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()];
-        }
-      }
-
-      // @TODO need to check why it was handled like this
-      if (queryExecuterProperties.isFunctionQuery) {
-        if (value[0].toString().contains("Long")) {
-          Long sizeOfListL = value[0].getLongValue();
-          return getEmptyChunkResult(sizeOfListL.intValue());
-        } else if (value[0].toString().contains("Decimal")) {
-          BigDecimal sizeOfListD = value[0].getBigDecimalValue();
-          return getEmptyChunkResult(sizeOfListD.intValue());
-        } else {
-          Double sizeOfList = value[0].getDoubleValue();
-          return getEmptyChunkResult(sizeOfList.intValue());
-        }
-
-      }
-      for (int i = 0; i < queryExecuterProperties.measureAggregators.length; i++) {
-        resultData[currentRow][dimensionCount + i] = value[i];
-      }
-      currentRow++;
-      noDictionaryColumnIndex = 0;
-    }
-    if (resultData.length > 0) {
-      resultData = encodeToRows(resultData);
-    }
-    return getResult(queryModel, resultData);
-  }
-
-  private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) {
-
-    List<CarbonKey> keys = new ArrayList<CarbonKey>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<CarbonValue> values =
-        new ArrayList<CarbonValue>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<QueryDimension> queryDimensions = queryModel.getQueryDimension();
-    int dimensionCount = queryDimensions.size();
-    int msrCount = queryExecuterProperties.measureAggregators.length;
-    Object[][] resultDataA = null;
-    // @TODO no sure why this check is here as in the caller of this method
-    // is returning in case of
-    // function query. Need to confirm with other developer who handled this
-    // scneario
-    if (queryExecuterProperties.isFunctionQuery) {
-      msrCount = 1;
-      resultDataA = new Object[dimensionCount + msrCount][msrCount];
-    } else {
-      resultDataA = new Object[dimensionCount + msrCount][convertedResult[0].length];
-    }
-    Object[] row = null;
-    QueryDimension queryDimension = null;
-    for (int columnIndex = 0; columnIndex < resultDataA[0].length; columnIndex++) {
-      row = new Object[dimensionCount + msrCount];
-      for (int i = 0; i < dimensionCount; i++) {
-        queryDimension = queryDimensions.get(i);
-        if (!CarbonUtil
-            .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) {
-          row[queryDimension.getQueryOrder()] = convertedResult[i][columnIndex];
-        } else if (CarbonUtil
-            .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) {
-          DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-              .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType());
-          row[queryDimension.getQueryOrder()] = directDictionaryGenerator
-              .getValueFromSurrogate((Integer) convertedResult[i][columnIndex]);
-        } else {
-          if (queryExecuterProperties.sortDimIndexes[i] == 1) {
-            row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
-                queryExecuterProperties.columnToDictionayMapping
-                    .get(queryDimension.getDimension().getColumnId())
-                    .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][columnIndex]),
-                queryDimension.getDimension().getDataType());
-          } else {
-            row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType(
-                queryExecuterProperties.columnToDictionayMapping
-                    .get(queryDimension.getDimension().getColumnId())
-                    .getDictionaryValueForKey((Integer) convertedResult[i][columnIndex]),
-                queryDimension.getDimension().getDataType());
-          }
-        }
-      }
-      MeasureAggregator[] msrAgg =
-          new MeasureAggregator[queryExecuterProperties.measureAggregators.length];
-
-      fillMeasureValueForAggGroupByQuery(queryModel, convertedResult, dimensionCount, columnIndex,
-          msrAgg);
-      fillDimensionAggValue(queryModel, convertedResult, dimensionCount, columnIndex, msrAgg);
-
-      if (!queryModel.isDetailQuery()) {
-        for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
-          row[queryModel.getQueryMeasures().get(i).getQueryOrder()] =
-              msrAgg[queryExecuterProperties.measureStartIndex + i].get();
-        }
-        int index = 0;
-        for (int i = 0; i < queryModel.getDimAggregationInfo().size(); i++) {
-          DimensionAggregatorInfo dimensionAggregatorInfo =
-              queryModel.getDimAggregationInfo().get(i);
-          for (int j = 0; j < dimensionAggregatorInfo.getOrderList().size(); j++) {
-            row[dimensionAggregatorInfo.getOrderList().get(j)] = msrAgg[index++].get();
-          }
-        }
-        for (int i = 0; i < queryModel.getExpressions().size(); i++) {
-          row[queryModel.getExpressions().get(i).getQueryOrder()] =
-              ((MeasureAggregator) convertedResult[dimensionCount
-                  + queryExecuterProperties.aggExpressionStartIndex + i][columnIndex]).get();
-        }
-      } else {
-        QueryMeasure msr = null;
-        for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
-          msr = queryModel.getQueryMeasures().get(i);
-          if (msrAgg[queryExecuterProperties.measureStartIndex + i].isFirstTime()) {
-            row[msr.getQueryOrder()] = null;
-          } else {
-            Object msrVal;
-            switch (msr.getMeasure().getDataType()) {
-              case LONG:
-                msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getLongValue();
-                break;
-              case DECIMAL:
-                msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getBigDecimalValue();
-                break;
-              default:
-                msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getDoubleValue();
-            }
-            row[msr.getQueryOrder()] = DataTypeUtil
-                .getMeasureDataBasedOnDataType(msrVal,msr.getMeasure().getDataType());
-          }
-        }
-      }
-      values.add(new CarbonValue(new MeasureAggregator[0]));
-      keys.add(new CarbonKey(row));
-    }
-    LOGGER.info("###########################################------ Total Number of records"
-            + resultDataA[0].length);
-    BatchResult chunkResult = new BatchResult();
-    chunkResult.setKeys(keys);
-    chunkResult.setValues(values);
-    return chunkResult;
-  }
-
-  private void fillDimensionAggValue(QueryModel queryModel, Object[][] surrogateResult,
-      int dimensionCount, int columnIndex, MeasureAggregator[] v) {
-    Iterator<DimensionAggregatorInfo> dimAggInfoIterator =
-        queryModel.getDimAggregationInfo().iterator();
-    DimensionAggregatorInfo dimensionAggregatorInfo = null;
-    List<String> partitionColumns = queryModel.getParitionColumns();
-    int rowIndex = -1;
-    int index = 0;
-    while (dimAggInfoIterator.hasNext()) {
-      dimensionAggregatorInfo = dimAggInfoIterator.next();
-      for (int j = 0; j < dimensionAggregatorInfo.getAggList().size(); j++) {
-        ++rowIndex;
-        if (!dimensionAggregatorInfo.getAggList().get(j)
-            .equals(CarbonCommonConstants.DISTINCT_COUNT)) {
-          v[index++] =
-              ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]);
-        } else if (partitionColumns.size() == 1 && partitionColumns
-            .contains(dimensionAggregatorInfo.getColumnName()) && dimensionAggregatorInfo
-            .getAggList().get(j).equals(CarbonCommonConstants.DISTINCT_COUNT)) {
-          double value =
-              ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex])
-                  .getDoubleValue();
-
-          MeasureAggregator countAggregator = new CountAggregator();
-          countAggregator.setNewValue(value);
-          v[index++] = countAggregator;
-        } else {
-          if (surrogateResult[dimensionCount
-              + rowIndex][columnIndex] instanceof DistinctCountAggregator) {
-
-            Iterator<Integer> iterator =
-                ((DistinctCountAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex])
-                    .getBitMap().iterator();
-
-            MeasureAggregator distinctCountAggregatorObjct = new DistinctStringCountAggregator();
-            while (iterator.hasNext()) {
-              String member = queryExecuterProperties.columnToDictionayMapping
-                  .get(dimensionAggregatorInfo.getDim().getColumnId())
-                  .getDictionaryValueForKey(iterator.next());
-              if (!member.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-                distinctCountAggregatorObjct.agg(member);
-              }
-            }
-            v[index++] = distinctCountAggregatorObjct;
-          } else {
-            v[index++] =
-                ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]);
-          }
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
index 0eb60ff..0ae6651 100644
--- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
+++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java
@@ -1,22 +1,24 @@
 package org.carbondata.query.carbon.result.preparator.impl;
 
+import java.util.List;
+
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.query.aggregator.MeasureAggregator;
 import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties;
 import org.carbondata.query.carbon.model.QueryDimension;
 import org.carbondata.query.carbon.model.QueryMeasure;
 import org.carbondata.query.carbon.model.QueryModel;
 import org.carbondata.query.carbon.model.QuerySchemaInfo;
 import org.carbondata.query.carbon.result.BatchRawResult;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.ListBasedResultWrapper;
 import org.carbondata.query.carbon.result.Result;
-import org.carbondata.query.carbon.util.DataTypeUtil;
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper;
 
 /**
  * It does not decode the dictionary.
  */
-public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<BatchRawResult> {
+public class RawQueryResultPreparatorImpl
+    extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(RawQueryResultPreparatorImpl.class.getName());
@@ -33,7 +35,7 @@ public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<
         .toArray(new QueryDimension[queryModel.getQueryDimension().size()]));
     querySchemaInfo.setQueryMeasures(queryModel.getQueryMeasures()
         .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()]));
-    int msrSize = queryExecuterProperties.measureAggregators.length;
+    int msrSize = queryExecuterProperties.measureDataTypes.length;
     int dimensionCount = queryModel.getQueryDimension().size();
     int[] queryOrder = new int[dimensionCount + msrSize];
     int[] queryReverseOrder = new int[dimensionCount + msrSize];
@@ -49,75 +51,34 @@ public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<
     querySchemaInfo.setQueryReverseOrder(queryReverseOrder);
   }
 
-  @Override public BatchRawResult prepareQueryResult(Result scannedResult) {
+  @Override public BatchResult prepareQueryResult(
+      Result<List<ListBasedResultWrapper>, Object> scannedResult) {
     if ((null == scannedResult || scannedResult.size() < 1)) {
-      BatchRawResult batchRawResult = new BatchRawResult(new Object[0][0]);
+      BatchRawResult batchRawResult = new BatchRawResult();
       batchRawResult.setQuerySchemaInfo(querySchemaInfo);
       return batchRawResult;
     }
-    int msrSize = queryExecuterProperties.measureAggregators.length;
-    int totalNumberOfColumn = msrSize + 1;
-    Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn];
-    int currentRow = 0;
-    ByteArrayWrapper key = null;
-    MeasureAggregator[] value = null;
+    int msrSize = queryExecuterProperties.measureDataTypes.length;
+    Object[][] resultData = new Object[scannedResult.size()][];
+    Object[] value;
+    Object[] row;
+    int counter = 0;
     while (scannedResult.hasNext()) {
-      key = scannedResult.getKey();
       value = scannedResult.getValue();
-      resultData[currentRow][0] = key;
-      for (int i = 0; i < msrSize; i++) {
-        resultData[currentRow][1 + i] = value[i];
+      row = new Object[msrSize + 1];
+      row[0] = scannedResult.getKey();
+      if(value != null) {
+        System.arraycopy(value, 0, row, 1, msrSize);
       }
-      currentRow++;
-    }
-
-    if (resultData.length > 0) {
-      resultData = encodeToRows(resultData);
+      resultData[counter] = row;
+      counter ++;
     }
-    BatchRawResult result = getResult(queryModel, resultData);
+    LOGGER.info("###########################---- Total Number of records" + scannedResult.size());
+    BatchRawResult result = new BatchRawResult();
+    result.setRows(resultData);
     result.setQuerySchemaInfo(querySchemaInfo);
     return result;
   }
 
-
-  private BatchRawResult getResult(QueryModel queryModel, Object[][] convertedResult) {
-
-    int msrCount = queryExecuterProperties.measureAggregators.length;
-    Object[][] resultDataA = new Object[1 + msrCount][convertedResult[0].length];
-
-    for (int columnIndex = 0; columnIndex < resultDataA[0].length; columnIndex++) {
-      resultDataA[0][columnIndex] = convertedResult[0][columnIndex];
-      MeasureAggregator[] msrAgg =
-          new MeasureAggregator[queryExecuterProperties.measureAggregators.length];
-
-      fillMeasureValueForAggGroupByQuery(queryModel, convertedResult, 1, columnIndex, msrAgg);
-
-      QueryMeasure msr = null;
-      for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) {
-        msr = queryModel.getQueryMeasures().get(i);
-        if (msrAgg[queryExecuterProperties.measureStartIndex + i].isFirstTime()) {
-          resultDataA[i + 1][columnIndex] = null;
-        } else {
-          Object msrVal;
-          switch (msr.getMeasure().getDataType()) {
-            case LONG:
-              msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getLongValue();
-              break;
-            case DECIMAL:
-              msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getBigDecimalValue();
-              break;
-            default:
-              msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getDoubleValue();
-          }
-          resultDataA[i + 1][columnIndex] = DataTypeUtil
-              .getMeasureDataBasedOnDataType(msrVal,
-                  msr.getMeasure().getDataType());
-        }
-      }
-    }
-    LOGGER.info("###########################################------ Total Number of records"
-            + resultDataA[0].length);
-    return new BatchRawResult(resultDataA);
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
index ca7c77a..0377580 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java
@@ -35,8 +35,6 @@ import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.util.MeasureAggregatorFactory;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbon.processor.BlocksChunkHolder;
 import org.carbondata.query.carbon.util.DataTypeUtil;
@@ -209,44 +207,25 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
       if (!msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice()) {
         record[msrColumnEvalutorInfo.getRowIndex()] = msrColumnEvalutorInfo.getDefaultValue();
       } else {
-        if (msrColumnEvalutorInfo.isCustomMeasureValue()) {
-          MeasureAggregator aggregator = MeasureAggregatorFactory
-              .getAggregator(msrColumnEvalutorInfo.getAggregator(),
-                  msrColumnEvalutorInfo.getType());
-          aggregator.merge(
-              blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-                  .getMeasureDataHolder().getReadableByteArrayValueByIndex(index));
-          switch (msrType) {
-            case LONG:
-              record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getLongValue();
-              break;
-            case DECIMAL:
-              record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getBigDecimalValue();
-              break;
-            default:
-              record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getDoubleValue();
-          }
-        } else {
-          Object msrValue;
-          switch (msrType) {
-            case LONG:
-              msrValue =
-                  blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-                      .getMeasureDataHolder().getReadableLongValueByIndex(index);
-              break;
-            case DECIMAL:
-              msrValue =
-                  blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-                      .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
-              break;
-            default:
-              msrValue =
-                  blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
-                      .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
-          }
-          record[msrColumnEvalutorInfo.getRowIndex()] = msrValue;
-
+        Object msrValue;
+        switch (msrType) {
+          case LONG:
+            msrValue =
+                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                    .getMeasureDataHolder().getReadableLongValueByIndex(index);
+            break;
+          case DECIMAL:
+            msrValue =
+                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                    .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+            break;
+          default:
+            msrValue =
+                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                    .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
         }
+        record[msrColumnEvalutorInfo.getRowIndex()] = msrValue;
+
       }
     }
     row.setValues(record);
@@ -275,7 +254,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
    * Read the actual filter member by passing the dictionary value from
    * the forward dictionary cache which which holds column wise cache
    *
-   * @param dimColumnEvaluaatorInfo
+   * @param dimColumnEvaluatorInfo
    * @param dictionaryValue
    * @param forwardDictionary
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java b/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
index 9cefbdb..e6877d5 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java
@@ -31,8 +31,6 @@ public class MeasureColumnResolvedFilterInfo implements Serializable {
 
   private int rowIndex = -1;
 
-  private boolean isCustomMeasureValue;
-
   private Object uniqueValue;
 
   private String aggregator;
@@ -59,14 +57,6 @@ public class MeasureColumnResolvedFilterInfo implements Serializable {
     this.rowIndex = rowIndex;
   }
 
-  public boolean isCustomMeasureValue() {
-    return isCustomMeasureValue;
-  }
-
-  public void setCustomMeasureValue(boolean isCustomMeasureValue) {
-    this.isCustomMeasureValue = isCustomMeasureValue;
-  }
-
   public Object getUniqueValue() {
     return uniqueValue;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java b/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java
deleted file mode 100644
index 7f6d7f2..0000000
--- a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- *
- */
-package org.carbondata.query.scanner.impl;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-/**
- * @author R00900208
- */
-public class CarbonKey implements Serializable, Comparable<CarbonKey> {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -8773813519739848506L;
-
-  private Object[] key;
-
-  public CarbonKey(Object[] key) {
-    this.key = key;
-  }
-
-  /**
-   * @return the key
-   */
-  public Object[] getKey() {
-    return key;
-  }
-
-  public CarbonKey getSubKey(int size) {
-    Object[] crop = new Object[size];
-    System.arraycopy(key, 0, crop, 0, size);
-    return new CarbonKey(crop);
-  }
-
-  /* (non-Javadoc)
-   * @see java.lang.Object#hashCode()
-   */
-  @Override public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + Arrays.hashCode(key);
-    return result;
-  }
-
-  /* (non-Javadoc)
-   * @see java.lang.Object#equals(java.lang.Object)
-   */
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    CarbonKey other = (CarbonKey) obj;
-    if (!Arrays.equals(key, other.key)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override public String toString() {
-    return Arrays.toString(key);
-  }
-
-  @Override public int compareTo(CarbonKey other) {
-    Object[] oKey = other.key;
-
-    int l = 0;
-    for (int i = 0; i < key.length; i++) {
-      l = ((Comparable) key[i]).compareTo(oKey[i]);
-      if (l != 0) {
-        return l;
-      }
-    }
-
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java b/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java
deleted file mode 100644
index 68e7226..0000000
--- a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.scanner.impl;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public class CarbonValue implements Serializable, Comparable<CarbonValue> {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 8034398963696130423L;
-
-  private MeasureAggregator[] values;
-
-  private int topNIndex;
-
-  public CarbonValue(MeasureAggregator[] values) {
-    this.values = values;
-  }
-
-  /**
-   * @return the values
-   */
-  public MeasureAggregator[] getValues() {
-    return values;
-  }
-
-  public CarbonValue merge(CarbonValue another) {
-    for (int i = 0; i < values.length; i++) {
-      values[i].merge(another.values[i]);
-    }
-    return this;
-  }
-
-  public void setTopNIndex(int index) {
-    this.topNIndex = index;
-  }
-
-  public void addGroup(CarbonKey key, CarbonValue value) {
-
-  }
-
-  public CarbonValue mergeKeyVal(CarbonValue another) {
-    return another;
-  }
-
-  @Override public String toString() {
-    return Arrays.toString(values);
-  }
-
-  @Override public int compareTo(CarbonValue o) {
-    return values[topNIndex].compareTo(o.values[topNIndex]);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(!(obj instanceof CarbonValue)) {
-      return false;
-    }
-    CarbonValue o = (CarbonValue)obj;
-    return values[topNIndex].equals(o.values[o.topNIndex]);
-  }
-
-  @Override public int hashCode() {
-    return values[topNIndex].hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
index 8ea0104..3d54d96 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java
@@ -13,8 +13,8 @@ import org.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.carbondata.query.carbon.executor.QueryExecutorFactory;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbon.model.QueryModel;
-import org.carbondata.query.carbon.result.BatchRawResult;
-import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor;
+import org.carbondata.query.carbon.result.BatchResult;
+import org.carbondata.query.carbon.result.iterator.ChunkRowIterator;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -48,8 +48,8 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
     readSupport
         .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
     try {
-      carbonIterator = new ChunkRawRowIterartor(
-          (CarbonIterator<BatchRawResult>) QueryExecutorFactory.getQueryExecutor(queryModel)
+      carbonIterator = new ChunkRowIterator(
+          (CarbonIterator<BatchResult>) QueryExecutorFactory.getQueryExecutor(queryModel)
               .execute(queryModel));
     } catch (QueryExecutionException e) {
       throw new InterruptedException(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index f728a32..b6f589d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,17 +17,12 @@
 
 package org.apache.spark.sql
 
-import scala.collection.mutable.MutableList
-
-import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
 import org.apache.spark.sql.execution.command.tableModel
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
 import org.apache.spark.sql.types._
@@ -202,328 +197,3 @@ case class FakeCarbonCast(child: Literal, dataType: DataType)
 
   override def eval(input: InternalRow): Any = child.value
 }
-
-/**
- * A pattern that matches any number of project or filter operations on top of another relational
- * operator.  All filter operators are collected and their conditions are broken up and returned
- * together with the top project operator.
- * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if
- * necessary.
- */
-object PhysicalOperation1 extends PredicateHelper {
-  type ReturnType = (Seq[NamedExpression], Seq[Expression], Option[Seq[Expression]],
-    Option[Seq[SortOrder]], Option[Expression], LogicalPlan)
-
-  def apply(plan: LogicalPlan): Option[ReturnType] = {
-    val (fields, filters, child, _, groupby, sortOrder, limit) =
-      collectProjectsAndFilters(plan)
-
-    Some((fields.getOrElse(child.output), filters, groupby, sortOrder, limit, child))
-  }
-
-  /**
-   * Collects projects and filters, in-lining/substituting aliases if necessary.  Here are two
-   * examples for alias in-lining/substitution.  Before:
-   * {{{
-   *   SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
-   *   SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
-   * }}}
-   * After:
-   * {{{
-   *   SELECT key AS c1 FROM t1 WHERE key > 10
-   *   SELECT key AS c2 FROM t1 WHERE key > 10
-   * }}}
-   */
-  def collectProjectsAndFilters(plan: LogicalPlan):
-  (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan,
-    Map[Attribute, Expression], Option[Seq[Expression]],
-    Option[Seq[SortOrder]], Option[Expression]) = {
-    plan match {
-      case Project(fields, child) =>
-        val (_, filters, other, aliases, groupby, sortOrder, limit) = collectProjectsAndFilters(
-          child)
-        val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
-        (Some(substitutedFields), filters, other, collectAliases(
-          substitutedFields), groupby, sortOrder, limit)
-
-      case Filter(condition, child) =>
-        val (fields, filters, other, aliases, groupby, sortOrder, limit) =
-          collectProjectsAndFilters(child)
-        val substitutedCondition = substitute(aliases)(condition)
-        (fields, filters ++ splitConjunctivePredicates(
-          substitutedCondition), other, aliases, groupby, sortOrder, limit)
-
-      case Aggregate(groupingExpressions, aggregateExpressions, child) =>
-        val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters(
-          child)
-
-        var aggExps: Seq[AggregateExpression] = Nil
-        aggregateExpressions.foreach(v => {
-          val list = findAggreagateExpression(v)
-          aggExps = aggExps ++ list
-        })
-
-        (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some(
-          aggregateExpressions), sortOrder, limit)
-      case Sort(order, _, child) =>
-        val (fields, filters, other, aliases, groupby, _, limit) = collectProjectsAndFilters(child)
-        val substitutedOrder = order.map(s => SortOrder(substitute(aliases)(s.child), s.direction))
-        (fields, filters, other, aliases, groupby, Some(substitutedOrder), limit)
-      case Limit(limitExpr, child) =>
-        val (fields, filters, other, aliases, groupby, sortOrder, _) = collectProjectsAndFilters(
-          child)
-        (fields, filters, other, aliases, groupby, sortOrder, Some(limitExpr))
-      case other =>
-        (None, Nil, other, Map.empty, None, None, None)
-    }
-  }
-
-  def findAggreagateExpression(expr: Expression): Seq[AggregateExpression] = {
-    val exprList = expr match {
-      case d: AggregateExpression => d :: Nil
-      case Alias(ref, name) => findAggreagateExpression(ref)
-      case other =>
-        var listout: Seq[AggregateExpression] = Nil
-
-        other.children.foreach(v => {
-          val list = findAggreagateExpression(v)
-          listout = listout ++ list
-        })
-        listout
-    }
-    exprList
-  }
-
-  def collectProjectsAndFilters1(plan: LogicalPlan):
-  (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression],
-    Option[Seq[Expression]], Option[Seq[SortOrder]], Option[Expression]) = {
-    plan match {
-      case Project(fields, child) =>
-        val (_, filters, other, aliases, groupby, sortOrder, limit) = collectProjectsAndFilters(
-          child)
-        val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
-        (Some(substitutedFields), filters, other, collectAliases(
-          substitutedFields), groupby, sortOrder, limit)
-
-      case Filter(condition, child) =>
-        val (fields, filters, other, aliases, groupby, sortOrder, limit) =
-          collectProjectsAndFilters(child)
-        val substitutedCondition = substitute(aliases)(condition)
-        (fields, filters ++ splitConjunctivePredicates(
-          substitutedCondition), other, aliases, groupby, sortOrder, limit)
-
-      case Aggregate(groupingExpressions, aggregateExpressions, child) =>
-        val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters(
-          child)
-        val aggExps = aggregateExpressions.map {
-          case Alias(ref, name) => ref
-          case others => others
-        }.filter {
-          case d: AggregateExpression => true
-          case _ => false
-        }
-        (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some(
-          aggExps), sortOrder, limit)
-      case Sort(order, _, child) =>
-        val (fields, filters, other, aliases, groupby, _, limit) = collectProjectsAndFilters(child)
-        val substitutedOrder = order.map(s => SortOrder(substitute(aliases)(s.child), s.direction))
-        (fields, filters, other, aliases, groupby, Some(substitutedOrder), limit)
-      case Limit(limitExpr, child) =>
-        val (fields, filters, other, aliases, groupby, sortOrder, _) = collectProjectsAndFilters(
-          child)
-        (fields, filters, other, aliases, groupby, sortOrder, Some(limitExpr))
-      case other =>
-        (None, Nil, other, Map.empty, None, None, None)
-    }
-  }
-
-  private def collectAliases(fields: Seq[Expression]) = {
-    fields.collect {
-      case a@Alias(child, _) => a.toAttribute -> child
-    }.toMap
-  }
-
-  private def substitute(aliases: Map[Attribute, Expression])(expr: Expression) = {
-    expr.transform {
-      case a@Alias(ref: AttributeReference, name) =>
-        aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifiers)).getOrElse(a)
-
-      case a: AttributeReference =>
-        aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifiers)).getOrElse(a)
-    }
-  }
-}
-
-case class PositionLiteral(expr: Expression, intermediateDataType: DataType)
-  extends LeafExpression with CodegenFallback {
-  override def dataType: DataType = expr.dataType
-
-  override def nullable: Boolean = false
-
-  type EvaluatedType = Any
-  var position = -1
-
-  def setPosition(pos: Int): Unit = position = pos
-
-  override def toString: String = s"PositionLiteral($position : $expr)"
-
-  override def eval(input: InternalRow): Any = {
-    if (position != -1) {
-      input.get(position, intermediateDataType)
-    } else {
-      expr.eval(input)
-    }
-  }
-}
-
-/**
- * Matches a logical aggregation that can be performed on distributed data in two steps.  The first
- * operates on the data in each partition performing partial aggregation for each group.  The second
- * occurs after the shuffle and completes the aggregation.
- *
- * This pattern will only match if all aggregate expressions can be computed partially and will
- * return the rewritten aggregation expressions for both phases.
- *
- * The returned values for this match are as follows:
- * - Grouping attributes for the final aggregation.
- * - Aggregates for the final aggregation.
- * - Grouping expressions for the partial aggregation.
- * - Partial aggregate expressions.
- * - Input to the aggregation.
- */
-object CarbonAggregation {
-  type ReturnType = (Seq[Expression], Seq[NamedExpression], LogicalPlan)
-
-  private def convertAggregatesForPushdown(convertUnknown: Boolean,
-      rewrittenAggregateExpressions: Seq[Expression],
-      oneAttr: AttributeReference) = {
-    if (canBeConvertedToCarbonAggregate(rewrittenAggregateExpressions)) {
-      var counter: Int = 0
-      var updatedExpressions = MutableList[Expression]()
-      rewrittenAggregateExpressions.foreach(v => {
-        val updated = convertAggregate(v, counter, convertUnknown, oneAttr)
-        updatedExpressions += updated
-        counter = counter + 1
-      })
-      updatedExpressions
-    } else {
-      rewrittenAggregateExpressions
-    }
-  }
-
-  def makePositionLiteral(expr: Expression, index: Int, dataType: DataType): PositionLiteral = {
-    val posLiteral = PositionLiteral(expr, dataType)
-    posLiteral.setPosition(index)
-    posLiteral
-  }
-
-  def convertAggregate(current: Expression,
-      index: Int,
-      convertUnknown: Boolean,
-      oneAttr: AttributeReference): Expression = {
-    if (!convertUnknown && canBeConverted(current)) {
-      current.transform {
-        case Average(attr: AttributeReference) =>
-          val convertedDataType = transformArrayType(attr)
-          CarbonAverage(makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
-        case Average(Cast(attr: AttributeReference, dataType)) =>
-          val convertedDataType = transformArrayType(attr)
-          CarbonAverage(
-              makePositionLiteral(convertedDataType, index, convertedDataType.dataType))
-        case Count(Seq(s: Literal)) =>
-          CarbonCount(s, Some(makePositionLiteral(transformLongType(oneAttr), index, LongType)))
-        case Count(Seq(attr: AttributeReference)) =>
-          CarbonCount(makePositionLiteral(transformLongType(attr), index, LongType))
-        case Sum(attr: AttributeReference) =>
-          Sum(makePositionLiteral(attr, index, attr.dataType))
-        case Sum(Cast(attr: AttributeReference, dataType)) =>
-          Sum(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
-        case Min(attr: AttributeReference) => Min(makePositionLiteral(attr, index, attr.dataType))
-        case Min(Cast(attr: AttributeReference, dataType)) =>
-          Min(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
-        case Max(attr: AttributeReference) =>
-          Max(makePositionLiteral(attr, index, attr.dataType))
-        case Max(Cast(attr: AttributeReference, dataType)) =>
-          Max(Cast(makePositionLiteral(attr, index, attr.dataType), dataType))
-      }
-    } else {
-      current
-    }
-  }
-
-  def canBeConverted(current: Expression): Boolean = current match {
-    case Alias(AggregateExpression(Average(attr: AttributeReference), _, false), _) => true
-    case Alias(AggregateExpression(Average(Cast(attr: AttributeReference, _)), _, false), _) => true
-    case Alias(AggregateExpression(Count(Seq(s: Literal)), _, false), _) => true
-    case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, false), _) => true
-    case Alias(AggregateExpression(Sum(attr: AttributeReference), _, false), _) => true
-    case Alias(AggregateExpression(Sum(Cast(attr: AttributeReference, _)), _, false), _) => true
-    case Alias(AggregateExpression(Min(attr: AttributeReference), _, false), _) => true
-    case Alias(AggregateExpression(Min(Cast(attr: AttributeReference, _)), _, false), _) => true
-    case Alias(AggregateExpression(Max(attr: AttributeReference), _, false), _) => true
-    case Alias(AggregateExpression(Max(Cast(attr: AttributeReference, _)), _, false), _) => true
-    case _ => false
-  }
-
-  def transformArrayType(attr: AttributeReference): AttributeReference = {
-    AttributeReference(attr.name, ArrayType(DoubleType), attr.nullable, attr.metadata)(attr.exprId,
-      attr.qualifiers)
-  }
-
-  def transformLongType(attr: AttributeReference): AttributeReference = {
-    AttributeReference(attr.name, LongType, attr.nullable, attr.metadata)(attr.exprId,
-      attr.qualifiers)
-  }
-
-  /**
-   * There should be sync between carbonOperators validation and here. we should not convert to
-   * carbon aggregates if the validation does not satisfy.
-   */
-  def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = {
-    val detailQuery = expressions.map {
-      case attr@AttributeReference(_, _, _, _) => true
-      case Alias(agg: AggregateExpression, name) => true
-      case _ => false
-    }.exists(!_)
-    !detailQuery
-  }
-
-  def unapply(plan: LogicalPlan): Option[ReturnType] = unapply((plan, false))
-
-  def unapply(combinedPlan: (LogicalPlan, Boolean)): Option[ReturnType] = {
-    val oneAttr = getOneAttribute(combinedPlan._1)
-    combinedPlan._1 match {
-      case Aggregate(groupingExpressions, aggregateExpressionsOrig, child) =>
-
-        // if detailed query dont convert aggregate expressions to Carbon Aggregate expressions
-        val aggregateExpressions =
-          if (combinedPlan._2) {
-            aggregateExpressionsOrig
-          }
-          else {
-            convertAggregatesForPushdown(false, aggregateExpressionsOrig, oneAttr)
-          }
-        Some((groupingExpressions, aggregateExpressions.asInstanceOf[Seq[NamedExpression]], child))
-      case _ => None
-    }
-  }
-
-  def getOneAttribute(plan: LogicalPlan): AttributeReference = {
-    var relation: LogicalRelation = null
-    plan collect {
-      case l: LogicalRelation => relation = l
-    }
-    if (relation != null) {
-      relation.output.find { p =>
-        p.dataType match {
-          case n: NumericType => true
-          case _ => false
-        }
-      }.getOrElse(relation.output.head)
-    } else {
-      null
-    }
-  }
-}
-
-