You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/16 09:28:53 UTC

[10/14] incubator-carbondata git commit: Handled review comments

Handled review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6b3b16c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6b3b16c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6b3b16c5

Branch: refs/heads/master
Commit: 6b3b16c5971bd383f7ed4bb3ae2164b378044020
Parents: 35739e5
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Mar 15 10:50:49 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 16 14:50:43 2017 +0530

----------------------------------------------------------------------
 .../core/cache/dictionary/ManageDictionary.java | 113 ++++++++++
 .../core/datastore/block/SegmentProperties.java |  20 ++
 .../core/metadata/schema/table/CarbonTable.java | 139 +++++++++++-
 .../impl/DictionaryBasedResultCollector.java    | 215 ++++++++++--------
 .../DictionaryBasedVectorResultCollector.java   |  71 +++---
 .../collector/impl/RawBasedResultCollector.java |  44 +++-
 ...structureBasedDictionaryResultCollector.java | 119 ++--------
 .../RestructureBasedRawResultCollector.java     |  80 +++----
 .../RestructureBasedVectorResultCollector.java  | 221 +++++--------------
 .../executor/impl/AbstractQueryExecutor.java    | 103 ++++++---
 .../core/scan/executor/infos/DimensionInfo.java |  54 +++++
 .../core/scan/executor/util/QueryUtil.java      |  67 ------
 .../scan/executor/util/RestructureUtil.java     |  58 +++--
 .../carbondata/core/scan/filter/FilterUtil.java |  82 ++-----
 .../executer/RestructureEvaluatorImpl.java      |  81 +++++++
 .../RestructureExcludeFilterExecutorImpl.java   |  12 +-
 .../RestructureIncludeFilterExecutorImpl.java   |  12 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  27 +--
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  10 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  10 +-
 ...velRangeLessThanEqualFilterExecuterImpl.java |   9 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |   9 +-
 .../RowLevelRangeFilterResolverImpl.java        |   6 +-
 .../visitor/NoDictionaryTypeVisitor.java        |  17 +-
 .../carbondata/core/scan/model/QueryModel.java  |   2 +-
 .../scan/result/vector/CarbonColumnVector.java  |  14 ++
 .../apache/carbondata/core/util/CarbonUtil.java | 113 ++--------
 .../carbondata/core/util/DataTypeUtil.java      |  40 +---
 .../scan/executor/util/RestructureUtilTest.java |   5 +-
 .../spark/merger/CarbonCompactionExecutor.java  |  16 +-
 .../spark/merger/CarbonCompactionUtil.java      |  18 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  14 +-
 .../spark/tasks/DictionaryWriterTask.scala      |  30 ++-
 .../spark/tasks/SortIndexWriterTask.scala       |  22 +-
 .../spark/util/GlobalDictionaryUtil.scala       |  49 ++--
 .../vectorreader/ColumnarVectorWrapper.java     |  34 +++
 .../spark/sql/CarbonDictionaryDecoder.scala     |   6 +-
 .../execution/command/carbonTableSchema.scala   |   3 +-
 .../apache/spark/sql/hive/CarbonMetastore.scala |   4 +-
 .../newflow/DataLoadProcessBuilder.java         |  16 +-
 .../util/CarbonDataProcessorUtil.java           |   8 +-
 41 files changed, 1054 insertions(+), 919 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
new file mode 100644
index 0000000..706bc20
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.cache.dictionary;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * This class is aimed at managing dictionary files for any new addition and deletion
+ * and calling of clear cache for the non existing dictionary files
+ */
+public class ManageDictionary {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ManageDictionary.class.getName());
+
+  /**
+   * This method will delete the dictionary files for the given column IDs and
+   * clear the dictionary cache
+   *
+   * @param dictionaryColumns
+   * @param carbonTable
+   */
+  public static void deleteDictionaryFileAndCache(List<CarbonColumn> dictionaryColumns,
+      CarbonTable carbonTable) {
+    if (!dictionaryColumns.isEmpty()) {
+      CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+      CarbonTablePath carbonTablePath =
+          CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier);
+      String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
+      CarbonFile metadataDir = FileFactory
+          .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
+      for (final CarbonColumn column : dictionaryColumns) {
+        // sort index file is created with dictionary size appended to it. So all the files
+        // with a given column ID need to be listed
+        CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
+          @Override public boolean accept(CarbonFile path) {
+            if (path.getName().startsWith(column.getColumnId())) {
+              return true;
+            }
+            return false;
+          }
+        });
+        for (CarbonFile file : listFiles) {
+          // try catch is inside for loop because even if one deletion fails, other files
+          // still need to be deleted
+          try {
+            FileFactory.deleteFile(file.getCanonicalPath(),
+                FileFactory.getFileType(file.getCanonicalPath()));
+          } catch (IOException e) {
+            LOGGER.error(
+                "Failed to delete dictionary or sortIndex file for column " + column.getColName()
+                    + "with column ID " + column.getColumnId());
+          }
+        }
+        // remove dictionary cache
+        removeDictionaryColumnFromCache(carbonTable, column.getColumnId());
+      }
+    }
+  }
+
+  /**
+   * This method will remove dictionary cache from driver for both reverse and forward dictionary
+   *
+   * @param carbonTable
+   * @param columnId
+   */
+  public static void removeDictionaryColumnFromCache(CarbonTable carbonTable, String columnId) {
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance()
+        .createCache(CacheType.REVERSE_DICTIONARY, carbonTable.getStorePath());
+    DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
+        new DictionaryColumnUniqueIdentifier(carbonTable.getCarbonTableIdentifier(),
+            new ColumnIdentifier(columnId, null, null));
+    dictCache.invalidate(dictionaryColumnUniqueIdentifier);
+    dictCache = CacheProvider.getInstance()
+        .createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+    dictCache.invalidate(dictionaryColumnUniqueIdentifier);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 57cd145..84bf208 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -761,4 +761,24 @@ public class SegmentProperties {
     return blockTodimensionOrdinalMapping;
   }
 
+  /**
+   * This method will search a given dimension and return the dimension from current block
+   *
+   * @param queryDimension
+   * @return
+   */
+  public CarbonDimension getDimensionFromCurrentBlock(CarbonDimension queryDimension) {
+    return CarbonUtil.getDimensionFromCurrentBlock(this.dimensions, queryDimension);
+  }
+
+  /**
+   * This method will search for a given measure in the current block measures list
+   *
+   * @param columnId
+   * @return
+   */
+  public CarbonMeasure getMeasureFromCurrentBlock(String columnId) {
+    return CarbonUtil.getMeasureFromCurrentBlock(this.measures, columnId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index fd6992c..4527b77 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -55,11 +55,22 @@ public class CarbonTable implements Serializable {
   private AbsoluteTableIdentifier absoluteTableIdentifier;
 
   /**
-   * TableName, Dimensions list
+   * TableName, Dimensions list. This map will contain dimensions which are visible
    */
-  private Map<String, List<CarbonDimension>> tableDimensionsMap;
+  private transient Map<String, List<CarbonDimension>> tableDimensionsMap;
+
+  /**
+   * TableName, Dimensions list. This map will contain both visible and not visible dimensions
+   */
+  private transient Map<String, List<CarbonDimension>> tableDimensionsMapWithDeletedColumns;
+
+  /**
+   * list of all the dimensions
+   */
+  private List<CarbonDimension> dimensions;
 
   private Map<String, List<CarbonColumn>> createOrderColumn;
+
   /**
    * TableName, Dimensions and children dimensions list
    */
@@ -71,9 +82,19 @@ public class CarbonTable implements Serializable {
   private Map<String, List<CarbonDimension>> tableImplicitDimensionsMap;
 
   /**
-   * table measures list.
+   * table measures list. This map will contain dimensions which are visible
    */
-  private Map<String, List<CarbonMeasure>> tableMeasuresMap;
+  private transient Map<String, List<CarbonMeasure>> tableMeasuresMap;
+
+  /**
+   * table measures list. This map will contain both visible and not visible measures
+   */
+  private transient Map<String, List<CarbonMeasure>> tableMeasuresMapWithDeletedMeasures;
+
+  /**
+   * list of measures
+   */
+  private List<CarbonMeasure> measures;
 
   /**
    * table bucket map.
@@ -106,9 +127,7 @@ public class CarbonTable implements Serializable {
   private int blockSize;
 
   public CarbonTable() {
-    this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
-    this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
     this.tableBucketMap = new HashMap<>();
     this.aggregateTablesName = new ArrayList<String>();
     this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
@@ -148,6 +167,8 @@ public class CarbonTable implements Serializable {
    */
   private void fillCreateOrderColumn(String tableName) {
     List<CarbonColumn> columns = new ArrayList<CarbonColumn>();
+    fillVisibleDimensions(tableName);
+    fillVisibleMeasures(tableName);
     List<CarbonDimension> dimensions = this.tableDimensionsMap.get(tableName);
     List<CarbonMeasure> measures = this.tableMeasuresMap.get(tableName);
     Iterator<CarbonDimension> dimItr = dimensions.iterator();
@@ -199,13 +220,11 @@ public class CarbonTable implements Serializable {
    * @param tableSchema
    */
   private void fillDimensionsAndMeasuresForTables(TableSchema tableSchema) {
-    List<CarbonDimension> dimensions = new ArrayList<CarbonDimension>();
     List<CarbonDimension> primitiveDimensions = new ArrayList<CarbonDimension>();
     List<CarbonDimension> implicitDimensions = new ArrayList<CarbonDimension>();
-    List<CarbonMeasure> measures = new ArrayList<CarbonMeasure>();
-    this.tableDimensionsMap.put(tableSchema.getTableName(), dimensions);
+    dimensions = new ArrayList<>();
+    measures = new ArrayList<>();
     this.tablePrimitiveDimensionsMap.put(this.tableUniqueName, primitiveDimensions);
-    this.tableMeasuresMap.put(tableSchema.getTableName(), measures);
     this.tableImplicitDimensionsMap.put(tableSchema.getTableName(), implicitDimensions);
     int dimensionOrdinal = 0;
     int measureOrdinal = 0;
@@ -385,6 +404,7 @@ public class CarbonTable implements Serializable {
    * @return number of dimension present the table
    */
   public int getNumberOfDimensions(String tableName) {
+    fillVisibleDimensions(tableName);
     return tableDimensionsMap.get(tableName).size();
   }
 
@@ -395,6 +415,7 @@ public class CarbonTable implements Serializable {
    * @return number of measures present the table
    */
   public int getNumberOfMeasures(String tableName) {
+    fillVisibleMeasures(tableName);
     return tableMeasuresMap.get(tableName).size();
   }
 
@@ -405,6 +426,7 @@ public class CarbonTable implements Serializable {
    * @return all dimension of a table
    */
   public List<CarbonDimension> getDimensionByTableName(String tableName) {
+    fillVisibleDimensions(tableName);
     return tableDimensionsMap.get(tableName);
   }
 
@@ -415,6 +437,7 @@ public class CarbonTable implements Serializable {
    * @return all measure of a table
    */
   public List<CarbonMeasure> getMeasureByTableName(String tableName) {
+    fillVisibleMeasures(tableName);
     return tableMeasuresMap.get(tableName);
   }
 
@@ -445,6 +468,7 @@ public class CarbonTable implements Serializable {
    * @return
    */
   public CarbonMeasure getMeasureByName(String tableName, String columnName) {
+    fillVisibleMeasures(tableName);
     List<CarbonMeasure> measureList = tableMeasuresMap.get(tableName);
     for (CarbonMeasure measure : measureList) {
       if (!measure.isInvisible() && measure.getColName().equalsIgnoreCase(columnName)) {
@@ -463,6 +487,7 @@ public class CarbonTable implements Serializable {
    */
   public CarbonDimension getDimensionByName(String tableName, String columnName) {
     CarbonDimension carbonDimension = null;
+    fillVisibleDimensions(tableName);
     List<CarbonDimension> dimList = tableDimensionsMap.get(tableName);
     for (CarbonDimension dim : dimList) {
       if (!dim.isInvisible() && dim.getColName().equalsIgnoreCase(columnName)) {
@@ -503,6 +528,7 @@ public class CarbonTable implements Serializable {
    * @return list of child dimensions
    */
   public List<CarbonDimension> getChildren(String dimName) {
+    fillVisibleDimensions(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
     for (List<CarbonDimension> list : tableDimensionsMap.values()) {
       List<CarbonDimension> childDims = getChildren(dimName, list);
       if (childDims != null) {
@@ -580,4 +606,97 @@ public class CarbonTable implements Serializable {
     }
     return null;
   }
+
+  /**
+   * fill all dimensions if empty and return the map
+   *
+   * @param tableName
+   * @return
+   */
+  public Map<String, List<CarbonDimension>> getTableDimensionsMapWithDeletedColumns(
+      String tableName) {
+    if (null == tableDimensionsMapWithDeletedColumns || null == tableDimensionsMapWithDeletedColumns
+        .get(tableName)) {
+      synchronized (this) {
+        if (null == tableDimensionsMapWithDeletedColumns
+            || null == tableDimensionsMapWithDeletedColumns.get(tableName)) {
+          if (null == tableDimensionsMapWithDeletedColumns) {
+            tableDimensionsMapWithDeletedColumns = new HashMap<>();
+          }
+          tableDimensionsMapWithDeletedColumns.put(tableName, dimensions);
+        }
+      }
+    }
+    return tableDimensionsMapWithDeletedColumns;
+  }
+
+  /**
+   * This method will all the visible dimensions
+   *
+   * @param tableName
+   */
+  private void fillVisibleDimensions(String tableName) {
+    if (null == tableDimensionsMap || null == tableDimensionsMap.get(tableName)) {
+      synchronized (this) {
+        if (null == tableDimensionsMap || null == tableDimensionsMap.get(tableName)) {
+          if (null == tableDimensionsMap) {
+            tableDimensionsMap = new HashMap<>();
+          }
+          List<CarbonDimension> visibleDimensions = new ArrayList<>(dimensions.size());
+          for (CarbonDimension dimension : dimensions) {
+            if (!dimension.isInvisible()) {
+              visibleDimensions.add(dimension);
+            }
+          }
+          tableDimensionsMap.put(tableName, visibleDimensions);
+        }
+      }
+    }
+  }
+
+  /**
+   * fill all measures if empty and return the map
+   *
+   * @param tableName
+   * @return
+   */
+  public Map<String, List<CarbonMeasure>> getTableMeasuresMapWithDeletedMeasures(String tableName) {
+    if (null == tableMeasuresMapWithDeletedMeasures || null == tableMeasuresMapWithDeletedMeasures
+        .get(tableName)) {
+      synchronized (this) {
+        if (null == tableMeasuresMapWithDeletedMeasures
+            || null == tableMeasuresMapWithDeletedMeasures.get(tableName)) {
+          if (null == tableMeasuresMapWithDeletedMeasures) {
+            tableMeasuresMapWithDeletedMeasures = new HashMap<>();
+          }
+          tableMeasuresMapWithDeletedMeasures.put(tableName, measures);
+        }
+      }
+    }
+    return tableMeasuresMapWithDeletedMeasures;
+  }
+
+  /**
+   * This method will all the visible measures
+   *
+   * @param tableName
+   */
+  private void fillVisibleMeasures(String tableName) {
+    if (null == tableMeasuresMap || null == tableMeasuresMap.get(tableName)) {
+      synchronized (this) {
+        if (null == tableMeasuresMap || null == tableMeasuresMap.get(tableName)) {
+          if (null == tableMeasuresMap) {
+            tableMeasuresMap = new HashMap<>();
+          }
+          List<CarbonMeasure> visibleMeasures = new ArrayList<>(measures.size());
+          for (CarbonMeasure measure : measures) {
+            if (!measure.isInvisible()) {
+              visibleMeasures.add(measure);
+            }
+          }
+          tableMeasuresMap.put(tableName, visibleMeasures);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index 14e1ab9..c5539b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -40,6 +40,31 @@ import org.apache.commons.lang3.ArrayUtils;
  */
 public class DictionaryBasedResultCollector extends AbstractScannedResultCollector {
 
+  protected QueryDimension[] queryDimensions;
+
+  protected QueryMeasure[] queryMeasures;
+
+  protected DirectDictionaryGenerator[] directDictionaryGenerators;
+
+  /**
+   * query order
+   */
+  protected int[] order;
+
+  protected int[] actualIndexInSurrogateKey;
+
+  protected boolean[] dictionaryEncodingArray;
+
+  protected boolean[] directDictionaryEncodingArray;
+
+  protected boolean[] implictColumnArray;
+
+  protected boolean[] complexDataTypeArray;
+
+  protected int dictionaryColumnIndex;
+  protected int noDictionaryColumnIndex;
+  protected int complexTypeColumnIndex;
+
   public DictionaryBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
   }
@@ -49,66 +74,22 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
    * it will keep track of how many record is processed, to handle limit scenario
    */
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
-
-    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
-    boolean isMsrsPresent = measureInfo.getMeasureDataTypes().length > 0;
-
-    QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
-    List<Integer> dictionaryIndexes = new ArrayList<Integer>();
-    for (int i = 0; i < queryDimensions.length; i++) {
-      if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
-          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
-      }
-    }
-    int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
-        new Integer[dictionaryIndexes.size()]));
-    Arrays.sort(primitive);
-    int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
-    int index = 0;
-    for (int i = 0; i < queryDimensions.length; i++) {
-      if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
-          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        actualIndexInSurrogateKey[index++] =
-            Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
-      }
-    }
-
-    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
-    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
-        scannedResult.getDeleteDeltaDataCache();
-    Map<Integer, GenericQueryType> comlexDimensionInfoMap =
-        tableBlockExecutionInfos.getComlexDimensionInfoMap();
-    boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
-    boolean[] directDictionaryEncodingArray =
-        CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
-    boolean[] implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
-    boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
-    int dimSize = queryDimensions.length;
-    boolean isDimensionsExist = dimSize > 0;
-    int[] order = new int[dimSize + queryMeasures.length];
-    for (int i = 0; i < dimSize; i++) {
-      order[i] = queryDimensions[i].getQueryOrder();
-    }
-    for (int i = 0; i < queryMeasures.length; i++) {
-      order[i + dimSize] = queryMeasures[i].getQueryOrder();
-    }
+    queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+    queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    initDimensionAndMeasureIndexesForFillingData();
     // scan the record and add to list
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     int rowCounter = 0;
-    int dictionaryColumnIndex = 0;
-    int noDictionaryColumnIndex = 0;
-    int complexTypeColumnIndex = 0;
     int[] surrogateResult;
     String[] noDictionaryKeys;
     byte[][] complexTypeKeyArray;
-
-    DirectDictionaryGenerator[] directDictionaryGenerators = new DirectDictionaryGenerator[dimSize];
-    for (int i = 0; i < dimSize; i++) {
-      directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
-              .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
-    }
+    boolean isDimensionsExist = queryDimensions.length > 0;
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
+        scannedResult.getDeleteDeltaDataCache();
+    Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+        tableBlockExecutionInfos.getComlexDimensionInfoMap();
     while (scannedResult.hasNext() && rowCounter < batchSize) {
-      Object[] row = new Object[dimSize + queryMeasures.length];
+      Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionsExist) {
         surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
         noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
@@ -116,38 +97,10 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
         dictionaryColumnIndex = 0;
         noDictionaryColumnIndex = 0;
         complexTypeColumnIndex = 0;
-        for (int i = 0; i < dimSize; i++) {
-          if (!dictionaryEncodingArray[i]) {
-            if (implictColumnArray[i]) {
-              if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
-                  .equals(queryDimensions[i].getDimension().getColName())) {
-                row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
-                    scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR
-                        + scannedResult.getCurrenrRowId(), DataType.STRING);
-              } else {
-                row[order[i]] = DataTypeUtil
-                    .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
-              }
-            } else {
-              row[order[i]] = DataTypeUtil
-                  .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
-                      queryDimensions[i].getDimension().getDataType());
-            }
-          } else if (directDictionaryEncodingArray[i]) {
-            if (directDictionaryGenerators[i] != null) {
-              row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
-                  surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
-            }
-          } else if (complexDataTypeArray[i]) {
-            row[order[i]] = comlexDimensionInfoMap
-                .get(queryDimensions[i].getDimension().getOrdinal())
-                .getDataBasedOnDataTypeFromSurrogates(
-                    ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
-          } else {
-            row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
-          }
+        for (int i = 0; i < queryDimensions.length; i++) {
+          fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
+              comlexDimensionInfoMap, row, i);
         }
-
       } else {
         scannedResult.incrementCounter();
       }
@@ -155,17 +108,95 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
           .contains(scannedResult.getCurrenrRowId())) {
         continue;
       }
-      if (isMsrsPresent) {
-        Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
-        fillMeasureData(msrValues, 0, scannedResult);
-        for (int i = 0; i < msrValues.length; i++) {
-          row[order[i + dimSize]] = msrValues[i];
-        }
-      }
+      fillMeasureData(scannedResult, row);
       listBasedResult.add(row);
       rowCounter++;
     }
     return listBasedResult;
   }
 
+  protected void fillDimensionData(AbstractScannedResult scannedResult, int[] surrogateResult,
+      String[] noDictionaryKeys, byte[][] complexTypeKeyArray,
+      Map<Integer, GenericQueryType> comlexDimensionInfoMap, Object[] row, int i) {
+    if (!dictionaryEncodingArray[i]) {
+      if (implictColumnArray[i]) {
+        if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
+            .equals(queryDimensions[i].getDimension().getColName())) {
+          row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
+              scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR
+                  + scannedResult.getCurrenrRowId(), DataType.STRING);
+        } else {
+          row[order[i]] = DataTypeUtil
+              .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
+        }
+      } else {
+        row[order[i]] = DataTypeUtil
+            .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
+                queryDimensions[i].getDimension().getDataType());
+      }
+    } else if (directDictionaryEncodingArray[i]) {
+      if (directDictionaryGenerators[i] != null) {
+        row[order[i]] = directDictionaryGenerators[i].getValueFromSurrogate(
+            surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
+      }
+    } else if (complexDataTypeArray[i]) {
+      row[order[i]] =
+          comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+              .getDataBasedOnDataTypeFromSurrogates(
+                  ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
+    } else {
+      row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
+    }
+  }
+
+  protected void fillMeasureData(AbstractScannedResult scannedResult, Object[] row) {
+    if (measureInfo.getMeasureDataTypes().length > 0) {
+      Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
+      fillMeasureData(msrValues, 0, scannedResult);
+      for (int i = 0; i < msrValues.length; i++) {
+        row[order[i + queryDimensions.length]] = msrValues[i];
+      }
+    }
+  }
+
+  protected void initDimensionAndMeasureIndexesForFillingData() {
+    List<Integer> dictionaryIndexes = new ArrayList<Integer>();
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
+          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
+      }
+    }
+    int[] primitive =
+        ArrayUtils.toPrimitive(dictionaryIndexes.toArray(new Integer[dictionaryIndexes.size()]));
+    Arrays.sort(primitive);
+    actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
+    int index = 0;
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
+          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        actualIndexInSurrogateKey[index++] =
+            Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
+      }
+    }
+
+    dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
+    directDictionaryEncodingArray = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
+    implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
+    complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
+    order = new int[queryDimensions.length + queryMeasures.length];
+    for (int i = 0; i < queryDimensions.length; i++) {
+      order[i] = queryDimensions[i].getQueryOrder();
+    }
+    for (int i = 0; i < queryMeasures.length; i++) {
+      order[i + queryDimensions.length] = queryMeasures[i].getQueryOrder();
+    }
+    directDictionaryGenerators =
+        new DirectDictionaryGenerator[queryDimensions.length];
+    for (int i = 0; i < queryDimensions.length; i++) {
+      directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 670df1a..82eaac7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -35,22 +35,30 @@ import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
  */
 public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
 
-  private ColumnVectorInfo[] dictionaryInfo;
+  protected QueryDimension[] queryDimensions;
 
-  private ColumnVectorInfo[] noDictionaryInfo;
+  protected QueryMeasure[] queryMeasures;
 
-  private ColumnVectorInfo[] complexInfo;
+  protected ColumnVectorInfo[] dictionaryInfo;
 
-  private ColumnVectorInfo[] measureColumnInfo;
+  protected ColumnVectorInfo[] noDictionaryInfo;
 
-  private ColumnVectorInfo[] allColumnInfo;
+  protected ColumnVectorInfo[] complexInfo;
+
+  protected ColumnVectorInfo[] measureColumnInfo;
+
+  protected ColumnVectorInfo[] allColumnInfo;
 
   public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
-    QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
-    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
-    measureColumnInfo = new ColumnVectorInfo[queryMeasures.length];
+    queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+    queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
     allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
+    prepareDimensionAndMeasureColumnVectors();
+  }
+
+  protected void prepareDimensionAndMeasureColumnVectors() {
+    measureColumnInfo = new ColumnVectorInfo[queryMeasures.length];
     List<ColumnVectorInfo> dictInfoList = new ArrayList<>();
     List<ColumnVectorInfo> noDictInfoList = new ArrayList<>();
     List<ColumnVectorInfo> complexList = new ArrayList<>();
@@ -122,26 +130,35 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
       if (requiredRows < 1) {
         return;
       }
-      for (int i = 0; i < allColumnInfo.length; i++) {
-        allColumnInfo[i].size = requiredRows;
-        allColumnInfo[i].offset = rowCounter;
-        allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
-        allColumnInfo[i].vector = columnarBatch.columnVectors[i];
-      }
+      fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+      scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
+    }
+  }
 
-      scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
-      scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
-      scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals());
-      scannedResult.fillColumnarComplexBatch(complexInfo);
-      // it means fetched all data out of page so increment the page counter
-      if (availableRows == requiredRows) {
-        scannedResult.incrementPageCounter();
-      } else {
-        // Or set the row counter.
-        scannedResult.setRowCounter(rowCounter + requiredRows);
-      }
-      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
-      columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+  protected void scanAndFillResult(AbstractScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch, int rowCounter, int availableRows, int requiredRows) {
+    scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
+    scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
+    scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals());
+    scannedResult.fillColumnarComplexBatch(complexInfo);
+    // it means fetched all data out of page so increment the page counter
+    if (availableRows == requiredRows) {
+      scannedResult.incrementPageCounter();
+    } else {
+      // Or set the row counter.
+      scannedResult.setRowCounter(rowCounter + requiredRows);
+    }
+    columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+    columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+  }
+
+  protected void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch, int rowCounter,
+      int requiredRows) {
+    for (int i = 0; i < allColumnInfo.length; i++) {
+      allColumnInfo[i].size = requiredRows;
+      allColumnInfo[i].offset = rowCounter;
+      allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+      allColumnInfo[i].vector = columnarBatch.columnVectors[i];
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
index 1acd830..bdaffc8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RawBasedResultCollector.java
@@ -33,6 +33,16 @@ import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
  */
 public class RawBasedResultCollector extends AbstractScannedResultCollector {
 
+  protected ByteArrayWrapper wrapper;
+
+  protected byte[] dictionaryKeyArray;
+
+  protected byte[][] noDictionaryKeyArray;
+
+  protected byte[][] complexTypeKeyArray;
+
+  protected byte[] implicitColumnByteArray;
+
   public RawBasedResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
   }
@@ -44,28 +54,40 @@ public class RawBasedResultCollector extends AbstractScannedResultCollector {
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
     List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
-    ByteArrayWrapper wrapper = null;
     BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
         scannedResult.getDeleteDeltaDataCache();
     // scan the record and add to list
     int rowCounter = 0;
     while (scannedResult.hasNext() && rowCounter < batchSize) {
-      Object[] row = new Object[1 + queryMeasures.length];
-      wrapper = new ByteArrayWrapper();
-      wrapper.setDictionaryKey(scannedResult.getDictionaryKeyArray());
-      wrapper.setNoDictionaryKeys(scannedResult.getNoDictionaryKeyArray());
-      wrapper.setComplexTypesKeys(scannedResult.getComplexTypeKeyArray());
-      wrapper.setImplicitColumnByteArray(scannedResult.getBlockletId()
-          .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+      scanResultAndGetData(scannedResult);
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
           .contains(scannedResult.getCurrenrRowId())) {
         continue;
       }
-      row[0] = wrapper;
-      fillMeasureData(row, 1, scannedResult);
-      listBasedResult.add(row);
+      prepareRow(scannedResult, listBasedResult, queryMeasures);
       rowCounter++;
     }
     return listBasedResult;
   }
+
+  protected void prepareRow(AbstractScannedResult scannedResult, List<Object[]> listBasedResult,
+      QueryMeasure[] queryMeasures) {
+    Object[] row = new Object[1 + queryMeasures.length];
+    wrapper = new ByteArrayWrapper();
+    wrapper.setDictionaryKey(dictionaryKeyArray);
+    wrapper.setNoDictionaryKeys(noDictionaryKeyArray);
+    wrapper.setComplexTypesKeys(complexTypeKeyArray);
+    wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
+    row[0] = wrapper;
+    fillMeasureData(row, 1, scannedResult);
+    listBasedResult.add(row);
+  }
+
+  protected void scanResultAndGetData(AbstractScannedResult scannedResult) {
+    dictionaryKeyArray = scannedResult.getDictionaryKeyArray();
+    noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray();
+    complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
+    implicitColumnByteArray = scannedResult.getBlockletId()
+        .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
index 99bdae4..5ab400a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedDictionaryResultCollector.java
@@ -16,31 +16,19 @@
  */
 package org.apache.carbondata.core.scan.collector.impl;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.cache.update.BlockletLevelDeleteDeltaDataCache;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 
-import org.apache.commons.lang3.ArrayUtils;
 /**
- * It is not a collector it is just a scanned result holder.
+ * class for handling restructure scenarios for filling result
  */
-public class RestructureBasedDictionaryResultCollector extends AbstractScannedResultCollector {
+public class RestructureBasedDictionaryResultCollector extends DictionaryBasedResultCollector {
 
   public RestructureBasedDictionaryResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
@@ -51,60 +39,22 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
    * it will keep track of how many record is processed, to handle limit scenario
    */
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
-
-    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
-    boolean isMsrsPresent = measureInfo.getMeasureDataTypes().length > 0;
-
-    QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
-    List<Integer> dictionaryIndexes = new ArrayList<Integer>();
-    for (int i = 0; i < queryDimensions.length; i++) {
-      if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
-          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
-      }
-    }
-    int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
-        new Integer[dictionaryIndexes.size()]));
-    Arrays.sort(primitive);
-    int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
-    int index = 0;
-    for (int i = 0; i < queryDimensions.length; i++) {
-      if (queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || queryDimensions[i]
-          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        actualIndexInSurrogateKey[index++] =
-            Arrays.binarySearch(primitive, queryDimensions[i].getDimension().getOrdinal());
-      }
-    }
-
-    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
-    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
-        scannedResult.getDeleteDeltaDataCache();
-    Map<Integer, GenericQueryType> comlexDimensionInfoMap =
-        tableBlockExecutionInfos.getComlexDimensionInfoMap();
-    boolean[] dictionaryEncodingArray = CarbonUtil.getDictionaryEncodingArray(queryDimensions);
-    boolean[] directDictionaryEncodingArray =
-        CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions);
-    boolean[] implictColumnArray = CarbonUtil.getImplicitColumnArray(queryDimensions);
-    boolean[] complexDataTypeArray = CarbonUtil.getComplexDataTypeArray(queryDimensions);
-    int dimSize = queryDimensions.length;
-    boolean isDimensionsExist = dimSize > 0;
-    int[] order = new int[dimSize + queryMeasures.length];
-    for (int i = 0; i < dimSize; i++) {
-      order[i] = queryDimensions[i].getQueryOrder();
-    }
-    for (int i = 0; i < queryMeasures.length; i++) {
-      order[i + dimSize] = queryMeasures[i].getQueryOrder();
-    }
+    queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
+    queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
+    initDimensionAndMeasureIndexesForFillingData();
     // scan the record and add to list
+    List<Object[]> listBasedResult = new ArrayList<>(batchSize);
     int rowCounter = 0;
-    int dictionaryColumnIndex = 0;
-    int noDictionaryColumnIndex = 0;
-    int complexTypeColumnIndex = 0;
     int[] surrogateResult;
     String[] noDictionaryKeys;
     byte[][] complexTypeKeyArray;
+    boolean isDimensionsExist = queryDimensions.length > 0;
+    BlockletLevelDeleteDeltaDataCache deleteDeltaDataCache =
+        scannedResult.getDeleteDeltaDataCache();
+    Map<Integer, GenericQueryType> comlexDimensionInfoMap =
+        tableBlockExecutionInfos.getComlexDimensionInfoMap();
     while (scannedResult.hasNext() && rowCounter < batchSize) {
-      Object[] row = new Object[dimSize + queryMeasures.length];
+      Object[] row = new Object[queryDimensions.length + queryMeasures.length];
       if (isDimensionsExist) {
         surrogateResult = scannedResult.getDictionaryKeyIntegerArray();
         noDictionaryKeys = scannedResult.getNoDictionaryKeyStringArray();
@@ -112,7 +62,7 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
         dictionaryColumnIndex = 0;
         noDictionaryColumnIndex = 0;
         complexTypeColumnIndex = 0;
-        for (int i = 0; i < dimSize; i++) {
+        for (int i = 0; i < queryDimensions.length; i++) {
           // fill default value in case the dimension does not exist in the current block
           if (!dimensionInfo.getDimensionExists()[i]) {
             if (dictionaryEncodingArray[i] || directDictionaryEncodingArray[i]) {
@@ -123,40 +73,9 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
             }
             continue;
           }
-          if (!dictionaryEncodingArray[i]) {
-            if (implictColumnArray[i]) {
-              if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID
-                  .equals(queryDimensions[i].getDimension().getColName())) {
-                row[order[i]] = DataTypeUtil.getDataBasedOnDataType(
-                    scannedResult.getBlockletId() + CarbonCommonConstants.FILE_SEPARATOR
-                        + scannedResult.getCurrenrRowId(), DataType.STRING);
-              } else {
-                row[order[i]] = DataTypeUtil
-                    .getDataBasedOnDataType(scannedResult.getBlockletId(), DataType.STRING);
-              }
-            } else {
-              row[order[i]] = DataTypeUtil
-                  .getDataBasedOnDataType(noDictionaryKeys[noDictionaryColumnIndex++],
-                      queryDimensions[i].getDimension().getDataType());
-            }
-          } else if (directDictionaryEncodingArray[i]) {
-            DirectDictionaryGenerator directDictionaryGenerator =
-                DirectDictionaryKeyGeneratorFactory
-                    .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
-            if (directDictionaryGenerator != null) {
-              row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
-                  surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
-            }
-          } else if (complexDataTypeArray[i]) {
-            row[order[i]] = comlexDimensionInfoMap
-                .get(queryDimensions[i].getDimension().getOrdinal())
-                .getDataBasedOnDataTypeFromSurrogates(
-                    ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
-          } else {
-            row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
-          }
+          fillDimensionData(scannedResult, surrogateResult, noDictionaryKeys, complexTypeKeyArray,
+              comlexDimensionInfoMap, row, i);
         }
-
       } else {
         scannedResult.incrementCounter();
       }
@@ -164,13 +83,7 @@ public class RestructureBasedDictionaryResultCollector extends AbstractScannedRe
           .contains(scannedResult.getCurrenrRowId())) {
         continue;
       }
-      if (isMsrsPresent) {
-        Object[] msrValues = new Object[measureInfo.getMeasureDataTypes().length];
-        fillMeasureData(msrValues, 0, scannedResult);
-        for (int i = 0; i < msrValues.length; i++) {
-          row[order[i + dimSize]] = msrValues[i];
-        }
-      }
+      fillMeasureData(scannedResult, row);
       listBasedResult.add(row);
       rowCounter++;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 14867d6..77dc3db 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.scan.collector.impl;
 
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,7 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * It is not a collector it is just a scanned result holder.
  */
-public class RestructureBasedRawResultCollector extends AbstractScannedResultCollector {
+public class RestructureBasedRawResultCollector extends RawBasedResultCollector {
 
   /**
    * logger
@@ -85,10 +84,10 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
           // will only be for dictionary encoded columns
           CarbonDimension currentBlockDimension = segmentProperties.getDimensions()
               .get(dictionaryColumnBlockIndex[dimCounterInCurrentBlock]);
-          updatedColumnCardinality.add(segmentProperties
-              .getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]);
-          updatedDimensionPartitioner.add(segmentProperties
-              .getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]);
+          updatedColumnCardinality.add(
+              segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()]);
+          updatedDimensionPartitioner.add(
+              segmentProperties.getDimensionPartitions()[currentBlockDimension.getKeyOrdinal()]);
           dimCounterInCurrentBlock++;
         } else {
           // partitioner index will be 1 every column will be in columnar format
@@ -133,8 +132,8 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
     for (int i = 0; i < dictionaryColumnBlockIndex.length; i++) {
       // get the dictionary key ordinal as column cardinality in segment properties
       // will only be for dictionary encoded columns
-      CarbonDimension currentBlockDimension = segmentProperties.getDimensions()
-          .get(dictionaryColumnBlockIndex[i]);
+      CarbonDimension currentBlockDimension =
+          segmentProperties.getDimensions().get(dictionaryColumnBlockIndex[i]);
       updatedColumnCardinality[i] =
           segmentProperties.getDimColumnsCardinality()[currentBlockDimension.getKeyOrdinal()];
       updatedDimensionPartitioner[i] =
@@ -160,24 +159,19 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
     // scan the record and add to list
     int rowCounter = 0;
     while (scannedResult.hasNext() && rowCounter < batchSize) {
-      byte[] dictionaryKeyArray = scannedResult.getDictionaryKeyArray();
-      byte[][] noDictionaryKeyArray = scannedResult.getNoDictionaryKeyArray();
-      byte[][] complexTypeKeyArray = scannedResult.getComplexTypeKeyArray();
-      byte[] implicitColumnByteArray = scannedResult.getBlockletId()
-          .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      scanResultAndGetData(scannedResult);
       if (null != deleteDeltaDataCache && deleteDeltaDataCache
           .contains(scannedResult.getCurrenrRowId())) {
         continue;
       }
-      Object[] row = new Object[1 + queryMeasures.length];
-      wrapper = new ByteArrayWrapper();
-      wrapper.setDictionaryKey(fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray));
-      wrapper.setNoDictionaryKeys(fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray));
-      wrapper.setComplexTypesKeys(complexTypeKeyArray);
-      wrapper.setImplicitColumnByteArray(implicitColumnByteArray);
-      row[0] = wrapper;
-      fillMeasureData(row, 1, scannedResult);
-      listBasedResult.add(row);
+      // re-fill dictionary and no dictionary key arrays for the newly added columns
+      if (dimensionInfo.isDictionaryColumnAdded()) {
+        dictionaryKeyArray = fillDictionaryKeyArrayWithLatestSchema(dictionaryKeyArray);
+      }
+      if (dimensionInfo.isNoDictionaryColumnAdded()) {
+        noDictionaryKeyArray = fillNoDictionaryKeyArrayWithLatestSchema(noDictionaryKeyArray);
+      }
+      prepareRow(scannedResult, listBasedResult, queryMeasures);
       rowCounter++;
     }
     return listBasedResult;
@@ -191,17 +185,19 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
    */
   private byte[] fillDictionaryKeyArrayWithLatestSchema(byte[] dictionaryKeyArray) {
     QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
-    List<Long> keyArrayWithNewColumnValues = new ArrayList<>(actualQueryDimensions.length);
     long[] keyArray = updatedCurrentBlockKeyGenerator.getKeyArray(dictionaryKeyArray);
+    long[] keyArrayWithNewAddedColumns =
+        new long[keyArray.length + dimensionInfo.getNewDictionaryColumnCount()];
     int existingColumnKeyArrayIndex = 0;
+    int newKeyArrayIndex = 0;
     for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
       if (CarbonUtil
           .hasEncoding(actualQueryDimensions[i].getDimension().getEncoder(), Encoding.DICTIONARY)) {
         // if dimension exists then add the key array value else add the default value
         if (dimensionInfo.getDimensionExists()[i]) {
-          keyArrayWithNewColumnValues.add(keyArray[existingColumnKeyArrayIndex++]);
+          keyArrayWithNewAddedColumns[newKeyArrayIndex++] = keyArray[existingColumnKeyArrayIndex++];
         } else {
-          Long defaultValueAsLong = null;
+          long defaultValueAsLong;
           Object defaultValue = dimensionInfo.getDefaultValues()[i];
           if (null != defaultValue) {
             defaultValueAsLong = ((Integer) defaultValue).longValue();
@@ -209,18 +205,14 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
             defaultValueAsLong =
                 new Integer(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY).longValue();
           }
-          keyArrayWithNewColumnValues.add(defaultValueAsLong);
+          keyArrayWithNewAddedColumns[newKeyArrayIndex++] = defaultValueAsLong;
         }
       }
     }
-    if (!keyArrayWithNewColumnValues.isEmpty()) {
-      long[] keyArrayWithLatestSchema = ArrayUtils.toPrimitive(
-          keyArrayWithNewColumnValues.toArray(new Long[keyArrayWithNewColumnValues.size()]));
-      try {
-        dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithLatestSchema);
-      } catch (KeyGenException e) {
-        LOGGER.error(e, e.getMessage());
-      }
+    try {
+      dictionaryKeyArray = restructuredKeyGenerator.generateKey(keyArrayWithNewAddedColumns);
+    } catch (KeyGenException e) {
+      LOGGER.error(e, e.getMessage());
     }
     return dictionaryKeyArray;
   }
@@ -233,33 +225,29 @@ public class RestructureBasedRawResultCollector extends AbstractScannedResultCol
    */
   private byte[][] fillNoDictionaryKeyArrayWithLatestSchema(byte[][] noDictionaryKeyArray) {
     QueryDimension[] actualQueryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
-    List<byte[]> noDictionaryValueList = new ArrayList<>(actualQueryDimensions.length);
+    byte[][] noDictionaryKeyArrayWithNewlyAddedColumns =
+        new byte[noDictionaryKeyArray.length + dimensionInfo.getNewNoDictionaryColumnCount()][];
     int existingColumnValueIndex = 0;
+    int newKeyArrayIndex = 0;
     for (int i = 0; i < dimensionInfo.getDimensionExists().length; i++) {
       if (!actualQueryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
         // if dimension exists then add the byte array value else add the default value
         if (dimensionInfo.getDimensionExists()[i]) {
-          noDictionaryValueList.add(noDictionaryKeyArray[existingColumnValueIndex++]);
+          noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] =
+              noDictionaryKeyArray[existingColumnValueIndex++];
         } else {
           byte[] newColumnDefaultValue = null;
           Object defaultValue = dimensionInfo.getDefaultValues()[i];
           if (null != defaultValue) {
-            newColumnDefaultValue = ((UTF8String)defaultValue).getBytes();
+            newColumnDefaultValue = ((UTF8String) defaultValue).getBytes();
           } else {
             newColumnDefaultValue =
                 UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL).getBytes();
           }
-          noDictionaryValueList.add(newColumnDefaultValue);
+          noDictionaryKeyArrayWithNewlyAddedColumns[newKeyArrayIndex++] = newColumnDefaultValue;
         }
       }
     }
-    // fill the 2-D byte array with all the values of columns in latest schema
-    if (!noDictionaryValueList.isEmpty()) {
-      noDictionaryKeyArray = new byte[noDictionaryValueList.size()][];
-      for (int i = 0; i < noDictionaryKeyArray.length; i++) {
-        noDictionaryKeyArray[i] = noDictionaryValueList.get(i);
-      }
-    }
-    return noDictionaryKeyArray;
+    return noDictionaryKeyArrayWithNewlyAddedColumns;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index b1ce040..dd84e6f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -17,114 +17,59 @@
 package org.apache.carbondata.core.scan.collector.impl;
 
 import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
-import org.apache.carbondata.core.scan.model.QueryDimension;
-import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
-import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
 
 import org.apache.spark.sql.types.Decimal;
 
 /**
  * It is not a collector it is just a scanned result holder.
  */
-public class RestructureBasedVectorResultCollector extends AbstractScannedResultCollector {
-
-  private ColumnVectorInfo[] dictionaryInfo;
-
-  private ColumnVectorInfo[] noDictionaryInfo;
-
-  private ColumnVectorInfo[] complexInfo;
-
-  private ColumnVectorInfo[] measureColumnInfo;
-
-  private ColumnVectorInfo[] allColumnInfo;
+public class RestructureBasedVectorResultCollector extends DictionaryBasedVectorResultCollector {
 
   public RestructureBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
-    QueryDimension[] queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
-    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
-    measureColumnInfo = new ColumnVectorInfo[queryMeasures.length];
+    queryDimensions = tableBlockExecutionInfos.getActualQueryDimensions();
+    queryMeasures = tableBlockExecutionInfos.getActualQueryMeasures();
     allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
-    List<ColumnVectorInfo> dictInfoList = new ArrayList<>();
-    List<ColumnVectorInfo> noDictInfoList = new ArrayList<>();
-    List<ColumnVectorInfo> complexList = new ArrayList<>();
-    int dimensionExistIndex = 0;
+    createVectorForNewlyAddedDimensions();
+    createVectorForNewlyAddedMeasures();
+    prepareDimensionAndMeasureColumnVectors();
+  }
+
+  /**
+   * create column vector for newly added dimension columns
+   */
+  private void createVectorForNewlyAddedDimensions() {
     for (int i = 0; i < queryDimensions.length; i++) {
       if (!dimensionInfo.getDimensionExists()[i]) {
         // add a dummy column vector result collector object
         ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
         allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
-        continue;
-      }
-      // get the current block dimension and fetch the required information from it
-      QueryDimension currentBlockDimension =
-          tableBlockExecutionInfos.getQueryDimensions()[dimensionExistIndex++];
-      if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
-        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
-        noDictInfoList.add(columnVectorInfo);
-        columnVectorInfo.dimension = currentBlockDimension;
-        columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
-        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
-      } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
-        dictInfoList.add(columnVectorInfo);
-        columnVectorInfo.dimension = currentBlockDimension;
-        columnVectorInfo.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(currentBlockDimension.getDimension().getDataType());
-        columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
-        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
-      } else if (queryDimensions[i].getDimension().isComplex()) {
-        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
-        complexList.add(columnVectorInfo);
-        columnVectorInfo.dimension = currentBlockDimension;
-        columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
-        columnVectorInfo.genericQueryType =
-            tableBlockExecutionInfos.getComlexDimensionInfoMap().get(columnVectorInfo.ordinal);
-        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
-      } else {
-        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
-        dictInfoList.add(columnVectorInfo);
-        columnVectorInfo.dimension = currentBlockDimension;
-        columnVectorInfo.ordinal = currentBlockDimension.getDimension().getOrdinal();
-        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
       }
     }
-    int measureExistIndex = 0;
+  }
+
+  /**
+   * create column vector for newly added measure columns
+   */
+  private void createVectorForNewlyAddedMeasures() {
     for (int i = 0; i < queryMeasures.length; i++) {
       if (!measureInfo.getMeasureExists()[i]) {
         // add a dummy column vector result collector object
         ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
         allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
-        continue;
       }
-      QueryMeasure currentBlockMeasure =
-          tableBlockExecutionInfos.getQueryMeasures()[measureExistIndex++];
-      ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
-      columnVectorInfo.measureVectorFiller = MeasureDataVectorProcessor.MeasureVectorFillerFactory
-          .getMeasureVectorFiller(currentBlockMeasure.getMeasure().getDataType());
-      columnVectorInfo.ordinal = currentBlockMeasure.getMeasure().getOrdinal();
-      columnVectorInfo.measure = currentBlockMeasure;
-      this.measureColumnInfo[i] = columnVectorInfo;
-      allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
     }
-    dictionaryInfo = dictInfoList.toArray(new ColumnVectorInfo[dictInfoList.size()]);
-    noDictionaryInfo = noDictInfoList.toArray(new ColumnVectorInfo[noDictInfoList.size()]);
-    complexInfo = complexList.toArray(new ColumnVectorInfo[complexList.size()]);
-    Arrays.sort(dictionaryInfo);
-    Arrays.sort(noDictionaryInfo);
-    Arrays.sort(complexInfo);
   }
 
   @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
@@ -147,29 +92,12 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
       if (requiredRows < 1) {
         return;
       }
-      for (int i = 0; i < allColumnInfo.length; i++) {
-        allColumnInfo[i].size = requiredRows;
-        allColumnInfo[i].offset = rowCounter;
-        allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
-        allColumnInfo[i].vector = columnarBatch.columnVectors[i];
-      }
-
-      scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
-      scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
-      scannedResult.fillColumnarMeasureBatch(measureColumnInfo, measureInfo.getMeasureOrdinals());
-      scannedResult.fillColumnarComplexBatch(complexInfo);
+      fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
       // fill default values for non existing dimensions and measures
       fillDataForNonExistingDimensions();
       fillDataForNonExistingMeasures();
-      // it means fetched all data out of page so increment the page counter
-      if (availableRows == requiredRows) {
-        scannedResult.incrementPageCounter();
-      } else {
-        // Or set the row counter.
-        scannedResult.setRowCounter(rowCounter + requiredRows);
-      }
-      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
-      columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+      // fill existing dimensions and measures data
+      scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
     }
   }
 
@@ -203,17 +131,11 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
    *
    * @param vector
    * @param columnVectorInfo
-   *
    * @param defaultValue
    */
   private void fillDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo,
       Object defaultValue) {
-    int offset = columnVectorInfo.offset;
-    int vectorOffset = columnVectorInfo.vectorOffset;
-    int len = columnVectorInfo.size + offset;
-    for (int j = offset; j < len; j++) {
-      vector.putInt(vectorOffset++, (int) defaultValue);
-    }
+    vector.putInts(columnVectorInfo.vectorOffset, columnVectorInfo.size, (int) defaultValue);
   }
 
   /**
@@ -225,15 +147,10 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
    */
   private void fillDirectDictionaryData(CarbonColumnVector vector,
       ColumnVectorInfo columnVectorInfo, Object defaultValue) {
-    int offset = columnVectorInfo.offset;
-    int vectorOffset = columnVectorInfo.vectorOffset;
-    int len = columnVectorInfo.size + offset;
-    for (int j = offset; j < len; j++) {
-      if (null != defaultValue) {
-        vector.putLong(vectorOffset++, (long) defaultValue);
-      } else {
-        vector.putNull(vectorOffset++);
-      }
+    if (null != defaultValue) {
+      vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size, (long) defaultValue);
+    } else {
+      vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
     }
   }
 
@@ -246,15 +163,10 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
    */
   private void fillNoDictionaryData(CarbonColumnVector vector, ColumnVectorInfo columnVectorInfo,
       byte[] defaultValue) {
-    int offset = columnVectorInfo.offset;
-    int vectorOffset = columnVectorInfo.vectorOffset;
-    int len = columnVectorInfo.size + offset;
-    for (int j = offset; j < len; j++) {
-      if (null != defaultValue) {
-        vector.putBytes(vectorOffset++, defaultValue);
-      } else {
-        vector.putNull(vectorOffset++);
-      }
+    if (null != defaultValue) {
+      vector.putBytes(columnVectorInfo.vectorOffset, columnVectorInfo.size, defaultValue);
+    } else {
+      vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
     }
   }
 
@@ -267,55 +179,36 @@ public class RestructureBasedVectorResultCollector extends AbstractScannedResult
         CarbonMeasure measure = tableBlockExecutionInfos.getActualQueryMeasures()[i].getMeasure();
         ColumnVectorInfo columnVectorInfo = allColumnInfo[i];
         CarbonColumnVector vector = allColumnInfo[i].vector;
-        int offset = columnVectorInfo.offset;
-        int len = offset + columnVectorInfo.size;
-        int vectorOffset = columnVectorInfo.vectorOffset;
-        // convert decimal default value to spark decimal type so that new object is not getting
-        // created for every row added
-        Object defaultValue = convertDecimalValue(measure, measureInfo.getDefaultValues()[i]);
-        for (int j = offset; j < len; j++) {
-          if (null == defaultValue) {
-            vector.putNull(vectorOffset++);
-          } else {
-            switch (measureInfo.getMeasureDataTypes()[i]) {
-              case SHORT:
-                vector.putShort(vectorOffset++, (short) defaultValue);
-                break;
-              case INT:
-                vector.putInt(vectorOffset++, (int) defaultValue);
-                break;
-              case LONG:
-                vector.putLong(vectorOffset++, (long) defaultValue);
-                break;
-              case DECIMAL:
-                vector.putDecimal(vectorOffset, (Decimal) defaultValue, measure.getPrecision());
-                break;
-              default:
-                vector.putDouble(vectorOffset++, (double) defaultValue);
-            }
+        Object defaultValue = RestructureUtil
+            .getMeasureDefaultValue(measure.getColumnSchema(), measure.getDefaultValue());
+        if (null == defaultValue) {
+          vector.putNulls(columnVectorInfo.vectorOffset, columnVectorInfo.size);
+        } else {
+          switch (measureInfo.getMeasureDataTypes()[i]) {
+            case SHORT:
+              vector.putShorts(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+                  (short) defaultValue);
+              break;
+            case INT:
+              vector.putInts(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+                  (int) defaultValue);
+              break;
+            case LONG:
+              vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+                  (long) defaultValue);
+              break;
+            case DECIMAL:
+              Decimal convertToSparkType = Decimal.apply((BigDecimal) defaultValue);
+              vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+                  convertToSparkType, measure.getPrecision());
+              break;
+            default:
+              vector.putDoubles(columnVectorInfo.vectorOffset, columnVectorInfo.size,
+                  (double) defaultValue);
           }
         }
       }
     }
   }
 
-  /**
-   * This method will parse the measure default value based on data type
-   *
-   * @param measure
-   * @return
-   */
-  private Object convertDecimalValue(CarbonMeasure measure, Object defaultValue) {
-    if (null != measure.getDefaultValue()) {
-      switch (measure.getDataType()) {
-        case DECIMAL:
-          defaultValue = org.apache.spark.sql.types.Decimal.apply((BigDecimal) defaultValue);
-          return defaultValue;
-        default:
-          return defaultValue;
-      }
-    }
-    return defaultValue;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6b3b16c5/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0bd0bf6..2a5c342 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -223,8 +223,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
 
     // below is to get only those dimension in query which is present in the
     // table block
-    List<QueryDimension> updatedQueryDimension = RestructureUtil
-        .createDimensionInfoAndGetUpdatedQueryDimension(blockExecutionInfo,
+    List<QueryDimension> currentBlockQueryDimensions = RestructureUtil
+        .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
             queryModel.getQueryDimension(), tableBlockDimensions,
             segmentProperties.getComplexDimensions());
     int tableFactPathLength = CarbonStorePath
@@ -234,13 +234,13 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength));
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
-    blockExecutionInfo.setQueryDimensions(
-        updatedQueryDimension.toArray(new QueryDimension[updatedQueryDimension.size()]));
+    blockExecutionInfo.setQueryDimensions(currentBlockQueryDimensions
+        .toArray(new QueryDimension[currentBlockQueryDimensions.size()]));
     // get measures present in the current block
-    List<QueryMeasure> updatedQueryMeasures =
-        getUpdatedQueryMeasures(blockExecutionInfo, queryModel, blockIndex);
+    List<QueryMeasure> currentBlockQueryMeasures =
+        getCurrentBlockQueryMeasures(blockExecutionInfo, queryModel, blockIndex);
     blockExecutionInfo.setQueryMeasures(
-        updatedQueryMeasures.toArray(new QueryMeasure[updatedQueryMeasures.size()]));
+        currentBlockQueryMeasures.toArray(new QueryMeasure[currentBlockQueryMeasures.size()]));
     blockExecutionInfo.setDataBlock(blockIndex);
     blockExecutionInfo.setBlockKeyGenerator(blockKeyGenerator);
     // setting whether raw record query or not
@@ -252,7 +252,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         .setTotalNumberOfMeasureBlock(segmentProperties.getMeasuresOrdinalToBlockMapping().size());
     blockExecutionInfo.setAbsoluteTableIdentifier(queryModel.getAbsoluteTableIdentifier());
     blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil
-        .getComplexDimensionsMap(updatedQueryDimension,
+        .getComplexDimensionsMap(currentBlockQueryDimensions,
             segmentProperties.getDimensionOrdinalToBlockMapping(),
             segmentProperties.getEachComplexDimColumnValueSize(),
             queryProperties.columnToDictionayMapping, queryProperties.complexFilterDimension));
@@ -291,12 +291,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // list of dimensions to be projected
     Set<Integer> allProjectionListDimensionIdexes = new LinkedHashSet<>();
     // create a list of filter dimensions present in the current block
-    Set<CarbonDimension> updatedFilterDimensions = QueryUtil
-        .getUpdatedFilterDimensions(queryProperties.complexFilterDimension,
-            segmentProperties.getDimensions());
-    int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
+    Set<CarbonDimension> currentBlockFilterDimensions =
+        getCurrentBlockFilterDimensions(queryProperties.complexFilterDimension, segmentProperties);
+    int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(currentBlockQueryDimensions,
         segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
-        updatedFilterDimensions, allProjectionListDimensionIdexes);
+        currentBlockFilterDimensions, allProjectionListDimensionIdexes);
     int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
             CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE));
@@ -313,13 +312,13 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
     }
     // get the list of updated filter measures present in the current block
-    Set<CarbonMeasure> updatedFilterMeasures = QueryUtil
-        .getUpdatedFilterMeasures(queryProperties.filterMeasures, segmentProperties.getMeasures());
+    Set<CarbonMeasure> currentBlockFilterMeasures =
+        getCurrentBlockFilterMeasures(queryProperties.filterMeasures, segmentProperties);
     // list of measures to be projected
     List<Integer> allProjectionListMeasureIndexes = new ArrayList<>();
     int[] measureBlockIndexes = QueryUtil
-        .getMeasureBlockIndexes(updatedQueryMeasures, expressionMeasures,
-            segmentProperties.getMeasuresOrdinalToBlockMapping(), updatedFilterMeasures,
+        .getMeasureBlockIndexes(currentBlockQueryMeasures, expressionMeasures,
+            segmentProperties.getMeasuresOrdinalToBlockMapping(), currentBlockFilterMeasures,
             allProjectionListMeasureIndexes);
     if (measureBlockIndexes.length > 0) {
 
@@ -342,16 +341,14 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     blockExecutionInfo.setProjectionListMeasureIndexes(ArrayUtils.toPrimitive(
         allProjectionListMeasureIndexes
             .toArray(new Integer[allProjectionListMeasureIndexes.size()])));
-    // setting the key structure info which will be required
-    // to update the older block key with new key generator
-    // blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);
     // setting the size of fixed key column (dictionary column)
-    blockExecutionInfo.setFixedLengthKeySize(getKeySize(updatedQueryDimension, segmentProperties));
+    blockExecutionInfo
+        .setFixedLengthKeySize(getKeySize(currentBlockQueryDimensions, segmentProperties));
     Set<Integer> dictionaryColumnBlockIndex = new HashSet<Integer>();
     List<Integer> noDictionaryColumnBlockIndex = new ArrayList<Integer>();
     // get the block index to be read from file for query dimension
     // for both dictionary columns and no dictionary columns
-    QueryUtil.fillQueryDimensionsBlockIndexes(updatedQueryDimension,
+    QueryUtil.fillQueryDimensionsBlockIndexes(currentBlockQueryDimensions,
         segmentProperties.getDimensionOrdinalToBlockMapping(), dictionaryColumnBlockIndex,
         noDictionaryColumnBlockIndex);
     int[] queryDictionaryColumnBlockIndexes = ArrayUtils.toPrimitive(
@@ -368,7 +365,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // setting each column value size
     blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
     blockExecutionInfo.setComplexColumnParentBlockIndexes(
-        getComplexDimensionParentBlockIndexes(updatedQueryDimension));
+        getComplexDimensionParentBlockIndexes(currentBlockQueryDimensions));
     blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
     try {
       // to set column group and its key structure info which will be used
@@ -376,7 +373,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       // for getting the column group column data in case of final row
       // and in case of dimension aggregation
       blockExecutionInfo.setColumnGroupToKeyStructureInfo(
-          QueryUtil.getColumnGroupKeyStructureInfo(updatedQueryDimension, segmentProperties));
+          QueryUtil.getColumnGroupKeyStructureInfo(currentBlockQueryDimensions, segmentProperties));
     } catch (KeyGenException e) {
       throw new QueryExecutionException(e);
     }
@@ -398,6 +395,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   private int getKeySize(List<QueryDimension> queryDimension,
       SegmentProperties blockMetadataInfo) {
+    // add the dimension block ordinal for each dictionary column
+    // existing in the current block dimensions. Set is used because in case of column groups
+    // ordinal of columns in a column group will be same
     Set<Integer> fixedLengthDimensionOrdinal =
         new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     int counter = 0;
@@ -416,6 +416,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     }
     int[] dictionaryColumnOrdinal = ArrayUtils.toPrimitive(
         fixedLengthDimensionOrdinal.toArray(new Integer[fixedLengthDimensionOrdinal.size()]));
+    // calculate the size of existing query dictionary columns in this block
     if (dictionaryColumnOrdinal.length > 0) {
       int[] eachColumnValueSize = blockMetadataInfo.getEachDimColumnValueSize();
       int keySize = 0;
@@ -435,11 +436,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    * @param tableBlock         table block
    * @return
    */
-  private List<QueryMeasure> getUpdatedQueryMeasures(BlockExecutionInfo blockExecutionInfo,
+  private List<QueryMeasure> getCurrentBlockQueryMeasures(BlockExecutionInfo blockExecutionInfo,
       QueryModel queryModel, AbstractIndex tableBlock) throws QueryExecutionException {
-    // getting the aggregate infos which will be used during aggregation
+    // getting the measure info which will be used while filling up measure data
     List<QueryMeasure> updatedQueryMeasures = RestructureUtil
-        .createMeasureInfoAndGetUpdatedQueryMeasures(blockExecutionInfo,
+        .createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo,
             queryModel.getQueryMeasures(), tableBlock.getSegmentProperties().getMeasures());
     // setting the measure aggregator for all aggregation function selected
     // in query
@@ -460,6 +461,54 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
   }
 
   /**
+   * This method will create the updated list of filter measures present in the current block
+   *
+   * @param queryFilterMeasures
+   * @param segmentProperties
+   * @return
+   */
+  private Set<CarbonMeasure> getCurrentBlockFilterMeasures(Set<CarbonMeasure> queryFilterMeasures,
+      SegmentProperties segmentProperties) {
+    if (!queryFilterMeasures.isEmpty()) {
+      Set<CarbonMeasure> updatedFilterMeasures = new HashSet<>(queryFilterMeasures.size());
+      for (CarbonMeasure queryMeasure : queryFilterMeasures) {
+        CarbonMeasure measureFromCurrentBlock =
+            segmentProperties.getMeasureFromCurrentBlock(queryMeasure.getColumnId());
+        if (null != measureFromCurrentBlock) {
+          updatedFilterMeasures.add(measureFromCurrentBlock);
+        }
+      }
+      return updatedFilterMeasures;
+    } else {
+      return queryFilterMeasures;
+    }
+  }
+
+  /**
+   * This method will create the updated list of filter dimensions present in the current block
+   *
+   * @param queryFilterDimensions
+   * @param segmentProperties
+   * @return
+   */
+  private Set<CarbonDimension> getCurrentBlockFilterDimensions(
+      Set<CarbonDimension> queryFilterDimensions, SegmentProperties segmentProperties) {
+    if (!queryFilterDimensions.isEmpty()) {
+      Set<CarbonDimension> updatedFilterDimensions = new HashSet<>(queryFilterDimensions.size());
+      for (CarbonDimension queryDimension : queryFilterDimensions) {
+        CarbonDimension dimensionFromCurrentBlock =
+            segmentProperties.getDimensionFromCurrentBlock(queryDimension);
+        if (null != dimensionFromCurrentBlock) {
+          updatedFilterDimensions.add(dimensionFromCurrentBlock);
+        }
+      }
+      return updatedFilterDimensions;
+    } else {
+      return queryFilterDimensions;
+    }
+  }
+
+  /**
    * Below method will be used to finish the execution
    *
    * @throws QueryExecutionException