You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/16 19:05:46 UTC

[carbondata] 13/22: [CARBONDATA-3343] Compaction for Range Sort

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 7449c346499cc3d454317e5b223c22bb034358a6
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Mon Apr 22 18:52:45 2019 +0530

    [CARBONDATA-3343] Compaction for Range Sort
    
    Problem: To support Compaction for Range Sort in correct way as earlier it was grouping the ranges/partitions based on taskId which was not correct.
    
    Solution: Combine all the data and create new ranges using Spark's RangePartitioner and using them give each
    range to one task and apply the filter query to get the compacted segment.
    
    This closes #3182
---
 .../core/constants/CarbonCommonConstants.java      |   1 +
 .../core/metadata/schema/table/CarbonTable.java    |  24 +-
 .../core/scan/expression/Expression.java           |  13 +
 .../scan/filter/FilterExpressionProcessor.java     |   5 +-
 .../carbondata/core/scan/filter/FilterUtil.java    |  52 +-
 .../resolver/ConditionalFilterResolverImpl.java    |   2 +-
 .../resolver/RowLevelRangeFilterResolverImpl.java  |  40 +-
 .../core/scan/model/QueryModelBuilder.java         |  18 +-
 .../core/scan/result/BlockletScannedResult.java    |  62 +-
 .../scan/result/impl/FilterQueryScannedResult.java |  20 +-
 .../result/impl/NonFilterQueryScannedResult.java   |  59 +-
 .../dataload/TestRangeColumnDataLoad.scala         | 669 ++++++++++++++++++++-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  43 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 202 ++++++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |   7 +-
 .../org/apache/spark/CarbonInputMetrics.scala      |   0
 .../apache/spark/DataSkewRangePartitioner.scala    |  26 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |  12 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |   1 -
 .../merger/CarbonCompactionExecutor.java           |  20 +-
 .../processing/merger/CarbonCompactionUtil.java    | 140 +++++
 .../merger/RowResultMergerProcessor.java           |   6 +-
 22 files changed, 1274 insertions(+), 148 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 608b5fb..ba8e20a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1759,6 +1759,7 @@ public final class CarbonCommonConstants {
   public static final String ARRAY = "array";
   public static final String STRUCT = "struct";
   public static final String MAP = "map";
+  public static final String DECIMAL = "decimal";
   public static final String FROM = "from";
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 54ea772..c66d1fc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1081,22 +1081,26 @@ public class CarbonTable implements Serializable {
     return dataSize + indexSize;
   }
 
-  public void processFilterExpression(Expression filterExpression,
-      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
-    QueryModel.FilterProcessVO processVO =
-        new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
-            getMeasureByTableName(getTableName()), getImplicitDimensionByTableName(getTableName()));
-    QueryModel.processFilterExpression(processVO, filterExpression, isFilterDimensions,
-        isFilterMeasures, this);
-
+  public void processFilterExpression(Expression filterExpression, boolean[] isFilterDimensions,
+      boolean[] isFilterMeasures) {
+    processFilterExpressionWithoutRange(filterExpression, isFilterDimensions, isFilterMeasures);
     if (null != filterExpression) {
       // Optimize Filter Expression and fit RANGE filters is conditions apply.
-      FilterOptimizer rangeFilterOptimizer =
-          new RangeFilterOptmizer(filterExpression);
+      FilterOptimizer rangeFilterOptimizer = new RangeFilterOptmizer(filterExpression);
       rangeFilterOptimizer.optimizeFilter();
     }
   }
 
+  public void processFilterExpressionWithoutRange(Expression filterExpression,
+      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
+    QueryModel.FilterProcessVO processVO =
+        new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
+            getMeasureByTableName(getTableName()), getImplicitDimensionByTableName(getTableName()));
+    QueryModel
+        .processFilterExpression(processVO, filterExpression, isFilterDimensions, isFilterMeasures,
+            this);
+  }
+
   /**
    * Resolve the filter expression.
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
index 13acc63..2513b0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
@@ -33,6 +33,11 @@ public abstract class Expression implements Serializable {
   protected List<Expression> children =
       new ArrayList<Expression>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
+  // When a filter expression already has the dictionary surrogate values in
+  // it then we set isAlreadyResolved as true so that we donot resolve the
+  // filter expression in further steps.
+  protected boolean isAlreadyResolved;
+
   public abstract ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException;
 
@@ -52,4 +57,12 @@ public abstract class Expression implements Serializable {
   public abstract String getString();
 
   public abstract String getStatement();
+
+  public boolean isAlreadyResolved() {
+    return isAlreadyResolved;
+  }
+
+  public void setAlreadyResolved(boolean alreadyResolved) {
+    isAlreadyResolved = alreadyResolved;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 7269304..fd75496 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -377,7 +377,10 @@ public class FilterExpressionProcessor implements FilterProcessor {
           // getting new dim index.
           if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
               .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
-              .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+              .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) || currentCondExpression
+              .isAlreadyResolved()) {
+            // In case of Range Column Dictionary Include we do not need to resolve the range
+            // expression as it is already resolved and has the surrogates in the filter value
             if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
                 && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
                 FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 9d8fe8d..cef3af1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1028,6 +1028,37 @@ public final class FilterUtil {
     return filterValuesList.toArray(new byte[filterValuesList.size()][]);
   }
 
+  // This function is used for calculating filter values in case when Range Column
+  // is given as a Dictionary Include Column
+  private static byte[][] getFilterValueInBytesForDictRange(ColumnFilterInfo columnFilterInfo,
+      KeyGenerator blockLevelKeyGenerator, int[] dimColumnsCardinality, int[] keys,
+      List<byte[]> filterValuesList, int keyOrdinalOfDimensionFromCurrentBlock) {
+    if (null != columnFilterInfo) {
+      int[] rangesForMaskedByte =
+          getRangesForMaskedByte(keyOrdinalOfDimensionFromCurrentBlock, blockLevelKeyGenerator);
+      List<Integer> listOfsurrogates = columnFilterInfo.getFilterList();
+      if (listOfsurrogates == null || listOfsurrogates.size() > 1) {
+        throw new RuntimeException(
+            "Filter values cannot be null in case of range in dictionary include");
+      }
+      // Here we only get the first column as there can be only one range column.
+      try {
+        if (listOfsurrogates.get(0)
+            <= dimColumnsCardinality[keyOrdinalOfDimensionFromCurrentBlock]) {
+          keys[keyOrdinalOfDimensionFromCurrentBlock] = listOfsurrogates.get(0);
+        } else {
+          keys[keyOrdinalOfDimensionFromCurrentBlock] =
+              dimColumnsCardinality[keyOrdinalOfDimensionFromCurrentBlock];
+        }
+        filterValuesList
+            .add(getMaskedKey(rangesForMaskedByte, blockLevelKeyGenerator.generateKey(keys)));
+      } catch (KeyGenException e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+    }
+    return filterValuesList.toArray(new byte[filterValuesList.size()][]);
+  }
+
   /**
    * This method will be used to get the Filter key array list for blocks which do not contain
    * filter column and the column Encoding is Direct Dictionary
@@ -1057,10 +1088,12 @@ public final class FilterUtil {
    * @param columnFilterInfo
    * @param carbonDimension
    * @param segmentProperties
+   * @param isDictRange
    * @return
    */
   public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo,
-      CarbonDimension carbonDimension, SegmentProperties segmentProperties,  boolean isExclude) {
+      CarbonDimension carbonDimension, SegmentProperties segmentProperties, boolean isExclude,
+      boolean isDictRange) {
     if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
       return columnFilterInfo.getNoDictionaryFilterValuesList()
           .toArray((new byte[columnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
@@ -1071,8 +1104,14 @@ public final class FilterUtil {
     List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
     Arrays.fill(keys, 0);
     int keyOrdinalOfDimensionFromCurrentBlock = carbonDimension.getKeyOrdinal();
-    return getFilterValuesInBytes(columnFilterInfo, isExclude, blockLevelKeyGenerator,
-        dimColumnsCardinality, keys, filterValuesList, keyOrdinalOfDimensionFromCurrentBlock);
+    if (!isDictRange) {
+      return getFilterValuesInBytes(columnFilterInfo, isExclude, blockLevelKeyGenerator,
+          dimColumnsCardinality, keys, filterValuesList, keyOrdinalOfDimensionFromCurrentBlock);
+    } else {
+      // For Dictionary Include Range Column
+      return getFilterValueInBytesForDictRange(columnFilterInfo, blockLevelKeyGenerator,
+          dimColumnsCardinality, keys, filterValuesList, keyOrdinalOfDimensionFromCurrentBlock);
+    }
   }
 
   /**
@@ -1500,10 +1539,11 @@ public final class FilterUtil {
       if (filterValues == null) {
         dimColumnExecuterInfo.setFilterKeys(new byte[0][]);
       } else {
-        byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, segmentProperties, false);
+        byte[][] keysBasedOnFilter =
+            getKeyArray(filterValues, dimension, segmentProperties, false, false);
         if (!filterValues.isIncludeFilter() || filterValues.isOptimized()) {
-          dimColumnExecuterInfo
-              .setExcludeFilterKeys(getKeyArray(filterValues, dimension, segmentProperties, true));
+          dimColumnExecuterInfo.setExcludeFilterKeys(
+              getKeyArray(filterValues, dimension, segmentProperties, true, false));
         }
         dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
       }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/ConditionalFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/ConditionalFilterResolverImpl.java
index 8ad0c48..2fd1996 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/ConditionalFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/ConditionalFilterResolverImpl.java
@@ -300,7 +300,7 @@ public class ConditionalFilterResolverImpl implements FilterResolverIntf {
     } else if (null != dimColResolvedFilterInfo.getFilterValues() && dimColResolvedFilterInfo
         .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
       return FilterUtil.getKeyArray(this.dimColResolvedFilterInfo.getFilterValues(),
-          this.dimColResolvedFilterInfo.getDimension(), segmentProperties, false);
+          this.dimColResolvedFilterInfo.getDimension(), segmentProperties, false, false);
     }
     return null;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index 4a713d5..963b445 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -89,7 +89,18 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
           .getDimensionFromCurrentBlock(this.dimColEvaluatorInfoList.get(0).getDimension());
       if (null != dimensionFromCurrentBlock) {
         return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(),
-            dimensionFromCurrentBlock, segmentProperties, false);
+            dimensionFromCurrentBlock, segmentProperties, false, false);
+      } else {
+        return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(), false);
+      }
+    } else if (dimColEvaluatorInfoList.size() > 0 && null != dimColEvaluatorInfoList.get(0)
+        .getFilterValues() && dimColEvaluatorInfoList.get(0).getDimension()
+        .hasEncoding(Encoding.DICTIONARY)) {
+      CarbonDimension dimensionFromCurrentBlock = segmentProperties
+          .getDimensionFromCurrentBlock(this.dimColEvaluatorInfoList.get(0).getDimension());
+      if (null != dimensionFromCurrentBlock) {
+        return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(),
+            dimensionFromCurrentBlock, segmentProperties, false, true);
       } else {
         return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(), false);
       }
@@ -249,6 +260,13 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
             } else {
               filterInfo.setFilterList(getDirectSurrogateValues(columnExpression));
             }
+          } else if (columnExpression.getDimension().hasEncoding(Encoding.DICTIONARY)
+              && !columnExpression.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            if (!isIncludeFilter) {
+              filterInfo.setExcludeFilterList(getSurrogateValues());
+            } else {
+              filterInfo.setFilterList(getSurrogateValues());
+            }
           } else {
             filterInfo.setFilterListForNoDictionaryCols(getNoDictionaryRangeValues());
           }
@@ -303,6 +321,26 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
     return filterValuesList;
   }
 
+  private List<Integer> getSurrogateValues() throws FilterUnsupportedException {
+    List<ExpressionResult> listOfExpressionResults = new ArrayList<ExpressionResult>(20);
+
+    if (this.getFilterExpression() instanceof BinaryConditionalExpression) {
+      listOfExpressionResults =
+          ((BinaryConditionalExpression) this.getFilterExpression()).getLiterals();
+    }
+    List<Integer> filterValuesList = new ArrayList<Integer>(20);
+    try {
+      // If any filter member provided by user is invalid throw error else
+      // system can display inconsistent result.
+      for (ExpressionResult result : listOfExpressionResults) {
+        filterValuesList.add(result.getInt());
+      }
+    } catch (FilterIllegalMemberException e) {
+      throw new FilterUnsupportedException(e);
+    }
+    return filterValuesList;
+  }
+
   /**
    * Method will return the DimColumnResolvedFilterInfo instance which consists
    * the mapping of the respective dimension and its surrogates involved in
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index d736805..e91d14d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -43,6 +43,7 @@ public class QueryModelBuilder {
   private DataTypeConverter dataTypeConverter;
   private boolean forcedDetailRawQuery;
   private boolean readPageByPage;
+  private boolean convertToRangeFilter = true;
   /**
    * log information
    */
@@ -301,6 +302,15 @@ public class QueryModelBuilder {
     return this;
   }
 
+  public QueryModelBuilder convertToRangeFilter(boolean convertToRangeFilter) {
+    this.convertToRangeFilter = convertToRangeFilter;
+    return this;
+  }
+
+  public boolean isConvertToRangeFilter() {
+    return this.convertToRangeFilter;
+  }
+
   public void enableReadPageByPage() {
     this.readPageByPage = true;
   }
@@ -316,7 +326,13 @@ public class QueryModelBuilder {
       // set the filter to the query model in order to filter blocklet before scan
       boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
       boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
-      table.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
+      // In case of Dictionary Include Range Column we donot optimize the range expression
+      if (isConvertToRangeFilter()) {
+        table.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
+      } else {
+        table.processFilterExpressionWithoutRange(filterExpression, isFilterDimensions,
+            isFilterMeasures);
+      }
       queryModel.setIsFilterDimensions(isFilterDimensions);
       queryModel.setIsFilterMeasures(isFilterMeasures);
       FilterResolverIntf filterIntf =
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index ad4d2b3..ee8a254 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -518,19 +518,11 @@ public abstract class BlockletScannedResult {
    * @param batchSize
    * @return
    */
-  protected void fillValidRowIdsBatchFilling(int rowId, int batchSize) {
-    // row id will be different for every batch so clear it before filling
-    clearValidRowIdList();
-    int startPosition = rowId;
-    for (int i = 0; i < batchSize; i++) {
-      if (!containsDeletedRow(startPosition)) {
-        validRowIds.add(startPosition);
-      }
-      startPosition++;
-    }
-  }
 
-  private void clearValidRowIdList() {
+
+  public abstract void fillValidRowIdsBatchFilling(int rowId, int batchSize);
+
+  protected void clearValidRowIdList() {
     if (null != validRowIds && !validRowIds.isEmpty()) {
       validRowIds.clear();
     }
@@ -773,7 +765,30 @@ public abstract class BlockletScannedResult {
    * @param batchSize
    * @return
    */
-  public abstract List<byte[]> getDictionaryKeyArrayBatch(int batchSize);
+  public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
+    // rowId from where computing need to start
+    int startRowId = currentRow + 1;
+    fillValidRowIdsBatchFilling(startRowId, batchSize);
+    List<byte[]> dictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
+    int[] columnDataOffsets = null;
+    byte[] completeKey = null;
+    // everyTime it is initialized new as in case of prefetch it can modify the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      completeKey = new byte[fixedLengthKeySize];
+      dictionaryKeyArrayList.add(completeKey);
+    }
+    // initialize offset array onli if data is present
+    if (this.dictionaryColumnChunkIndexes.length > 0) {
+      columnDataOffsets = new int[validRowIds.size()];
+    }
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      for (int j = 0; j < validRowIds.size(); j++) {
+        columnDataOffsets[j] += dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
+            .fillRawData(validRowIds.get(j), columnDataOffsets[j], dictionaryKeyArrayList.get(j));
+      }
+    }
+    return dictionaryKeyArrayList;
+  }
 
   /**
    * Below method will be used to get the complex type key array
@@ -806,7 +821,26 @@ public abstract class BlockletScannedResult {
    *
    * @return no dictionary keys for all no dictionary dimension
    */
-  public abstract List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize);
+  public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
+    List<byte[][]> noDictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
+    byte[][] noDictionaryColumnsKeys = null;
+    // everyTime it is initialized new as in case of prefetch it can modify the data
+    for (int i = 0; i < validRowIds.size(); i++) {
+      noDictionaryColumnsKeys = new byte[noDictionaryColumnChunkIndexes.length][];
+      noDictionaryKeyArrayList.add(noDictionaryColumnsKeys);
+    }
+    int columnPosition = 0;
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      for (int j = 0; j < validRowIds.size(); j++) {
+        byte[][] noDictionaryArray = noDictionaryKeyArrayList.get(j);
+        noDictionaryArray[columnPosition] =
+            dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
+                .getChunkData(validRowIds.get(j));
+      }
+      columnPosition++;
+    }
+    return noDictionaryKeyArrayList;
+  }
 
   /**
    * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later.
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
index 1b83110..0a7338f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java
@@ -53,8 +53,18 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
     return getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
-  @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
-    throw new UnsupportedOperationException("Operation not supported");
+  @Override public void fillValidRowIdsBatchFilling(int rowId, int batchSize) {
+    // row id will be different for every batch so clear it before filling
+    clearValidRowIdList();
+    int startPosition = rowId;
+    int minSize = Math.min(batchSize, pageFilteredRowId[pageCounter].length);
+    for (int j = startPosition; j < startPosition + minSize; ) {
+      int pos = pageFilteredRowId[pageCounter][j];
+      if (!containsDeletedRow(pos)) {
+        validRowIds.add(pos);
+      }
+      j++;
+    }
   }
 
   /**
@@ -67,7 +77,7 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
   }
 
   @Override public List<byte[][]> getComplexTypeKeyArrayBatch(int batchSize) {
-    throw new UnsupportedOperationException("Operation not supported");
+    return getComplexTypeKeyArrayBatch();
   }
 
   /**
@@ -80,10 +90,6 @@ public class FilterQueryScannedResult extends BlockletScannedResult {
     return getNoDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]);
   }
 
-  @Override public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
-    throw new UnsupportedOperationException("Operation not supported");
-  }
-
   /**
    * will return the current valid row id
    *
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
index b5f9d66..36a1017 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.scan.result.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -53,31 +52,6 @@ public class NonFilterQueryScannedResult extends BlockletScannedResult {
     return getDictionaryKeyIntegerArray(currentRow);
   }
 
-  @Override public List<byte[]> getDictionaryKeyArrayBatch(int batchSize) {
-    // rowId from where computing need to start
-    int startRowId = currentRow + 1;
-    fillValidRowIdsBatchFilling(startRowId, batchSize);
-    List<byte[]> dictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
-    int[] columnDataOffsets = null;
-    byte[] completeKey = null;
-    // everyTime it is initialized new as in case of prefetch it can modify the data
-    for (int i = 0; i < validRowIds.size(); i++) {
-      completeKey = new byte[fixedLengthKeySize];
-      dictionaryKeyArrayList.add(completeKey);
-    }
-    // initialize offset array onli if data is present
-    if (this.dictionaryColumnChunkIndexes.length > 0) {
-      columnDataOffsets = new int[validRowIds.size()];
-    }
-    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
-      for (int j = 0; j < validRowIds.size(); j++) {
-        columnDataOffsets[j] += dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter]
-            .fillRawData(validRowIds.get(j), columnDataOffsets[j], dictionaryKeyArrayList.get(j));
-      }
-    }
-    return dictionaryKeyArrayList;
-  }
-
   /**
    * Below method will be used to get the complex type key array
    *
@@ -101,35 +75,18 @@ public class NonFilterQueryScannedResult extends BlockletScannedResult {
     return getNoDictionaryKeyArray(currentRow);
   }
 
-  /**
-   * Below method will be used to get the dimension key array
-   * for all the no dictionary dimension present in the query
-   * This method will fill the data column wise for the given batch size
-   *
-   * @return no dictionary keys for all no dictionary dimension
-   */
-  @Override public List<byte[][]> getNoDictionaryKeyArrayBatch(int batchSize) {
-    List<byte[][]> noDictionaryKeyArrayList = new ArrayList<>(validRowIds.size());
-    byte[][] noDictionaryColumnsKeys = null;
-    // everyTime it is initialized new as in case of prefetch it can modify the data
-    for (int i = 0; i < validRowIds.size(); i++) {
-      noDictionaryColumnsKeys = new byte[noDictionaryColumnChunkIndexes.length][];
-      noDictionaryKeyArrayList.add(noDictionaryColumnsKeys);
-    }
-    int columnPosition = 0;
-    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
-      for (int j = 0; j < validRowIds.size(); j++) {
-        byte[][] noDictionaryArray = noDictionaryKeyArrayList.get(j);
-        noDictionaryArray[columnPosition] =
-            dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter]
-                .getChunkData(validRowIds.get(j));
+  @Override public void fillValidRowIdsBatchFilling(int rowId, int batchSize) {
+    // row id will be different for every batch so clear it before filling
+    clearValidRowIdList();
+    int startPosition = rowId;
+    for (int i = 0; i < batchSize; i++) {
+      if (!containsDeletedRow(startPosition)) {
+        validRowIds.add(startPosition);
       }
-      columnPosition++;
+      startPosition++;
     }
-    return noDictionaryKeyArrayList;
   }
 
-
   /**
    * will return the current valid row id
    *
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
index 2caf46c..ff383f9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestRangeColumnDataLoad.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import java.io.{File, PrintWriter}
+
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.classTag
 
@@ -38,6 +40,9 @@ import org.apache.carbondata.spark.load.PrimtiveOrdering
 
 class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   var filePath: String = s"$resourcesPath/globalsort"
+  var filePath2: String = s"$resourcesPath/range_compact_test"
+  var filePath3: String = s"$resourcesPath/range_compact_test1"
+  var filePath4: String = s"$resourcesPath/range_compact_test2"
 
   override def beforeAll(): Unit = {
     dropTable
@@ -137,9 +142,586 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
     checkAnswer(sql("SELECT COUNT(*) FROM carbon_range_column4"), Seq(Row(20)))
   }
 
+  test("Test compaction for range_column - SHORT Datatype") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age SHORT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='age, city', 'range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+  }
+
+  test("Test compaction for range_column - INT Datatype") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='age, city', 'range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - 2 levels") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='age, city',
+        | 'range_column'='age')
+      """.stripMargin)
+
+    for (i <- 0 until 12) {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+          "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+    }
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MINOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - CUSTOM Compaction") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'SORT_COLUMNS'='age, city',
+        | 'range_column'='age')
+      """.stripMargin)
+
+    for (i <- 0 until 12) {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+          "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+    }
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'CUSTOM' WHERE SEGMENT.ID IN(3,4,5)")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - INT Datatype with null values") {
+    deleteFile(filePath3)
+    createFile(filePath3, 2000, 3)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city',
+        | 'range_column'='name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    deleteFile(filePath3)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - BOOLEAN Datatype") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    val exception = intercept[MalformedCarbonCommandException](
+      sql(
+        """
+          | CREATE TABLE carbon_range_column1(id Boolean, name STRING, city STRING, age INT)
+          | STORED BY 'org.apache.carbondata.format'
+          | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='id, city',
+          | 'range_column'='id')
+        """.stripMargin)
+    )
+
+    assertResult("RANGE_COLUMN doesn't support boolean data type: id")(exception.getMessage)
+  }
+
+  test("Test compaction for range_column - DECIMAL Datatype") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    val exception = intercept[MalformedCarbonCommandException](
+      sql(
+        """
+          | CREATE TABLE carbon_range_column1(id decimal, name STRING, city STRING, age INT)
+          | STORED BY 'org.apache.carbondata.format'
+          | TBLPROPERTIES('range_column'='id')
+        """.stripMargin)
+    )
+
+    assertResult("RANGE_COLUMN doesn't support decimal data type: id")(exception.getMessage)
+  }
+
+  test("Test compaction for range_column - INT Datatype with no overlapping") {
+    deleteFile(filePath2)
+    createFile(filePath2, 1000, 4)
+    deleteFile(filePath3)
+    createFile(filePath3, 1000, 5)
+    deleteFile(filePath4)
+    createFile(filePath4, 1000, 6)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='id, city',
+        | 'range_column'='id')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='1')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='2')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath4' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    deleteFile(filePath2)
+    deleteFile(filePath3)
+    deleteFile(filePath4)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - INT Datatype with overlapping") {
+    deleteFile(filePath2)
+    createFile(filePath2, 10, 9)
+    deleteFile(filePath3)
+    createFile(filePath3, 10, 10)
+    deleteFile(filePath4)
+    createFile(filePath4, 10, 11)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='id, city',
+        | 'range_column'='id')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath4' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    deleteFile(filePath2)
+    deleteFile(filePath3)
+    deleteFile(filePath4)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - INT Datatype with Global Dict") {
+    deleteFile(filePath2)
+    createFile(filePath2, 10, 9)
+    deleteFile(filePath3)
+    createFile(filePath3, 10, 10)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city',
+        | 'range_column'='name', 'DICTIONARY_INCLUDE'='name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    deleteFile(filePath2)
+    deleteFile(filePath3)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - STRING Datatype with Global Dict") {
+    deleteFile(filePath2)
+    createFile(filePath2, 1000, 9)
+    deleteFile(filePath3)
+    createFile(filePath3, 10, 10)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city',
+        | 'range_column'='name', 'DICTIONARY_INCLUDE'='name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath3' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    deleteFile(filePath2)
+    deleteFile(filePath3)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Compact range_column with data skew") {
+    sql("DROP TABLE IF EXISTS carbon_range_column4")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column4(c1 int, c2 string)
+        | STORED AS carbondata
+        | TBLPROPERTIES('sort_columns'='c1,c2', 'sort_scope'='local_sort', 'range_column'='c2')
+      """.stripMargin)
+
+    val dataSkewPath = s"$resourcesPath/range_column"
+
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$dataSkewPath'
+         | INTO TABLE carbon_range_column4
+         | OPTIONS('FILEHEADER'='c1,c2', 'global_sort_partitions'='10')
+        """.stripMargin)
+
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$dataSkewPath'
+         | INTO TABLE carbon_range_column4
+         | OPTIONS('FILEHEADER'='c1,c2', 'global_sort_partitions'='10')
+        """.stripMargin)
+
+    val res = sql("SELECT * FROM carbon_range_column4").collect()
+
+    sql("ALTER TABLE carbon_range_column4 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("SELECT * FROM carbon_range_column4"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column4")
+  }
+
+  test("Test compaction for range_column - INT Datatype without SORT Column") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+  }
+
+  test("Test compaction for range_column - INT Datatype with single value in range column") {
+    deleteFile(filePath2)
+    createFile(filePath2, 10, 8)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('range_column'='id')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+    deleteFile(filePath2)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - LONG Datatype") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='age, city', 'range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+  }
+
+  test("Test compaction for range_column - LONG Datatype without SORT Column") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+  }
+
+  test("Test compaction for range_column - STRING Datatype") {
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city',
+        | 'range_column'='name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('SORT_SCOPE'='GLOBAL_SORT','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('SORT_SCOPE'='NO_SORT','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+  }
+
+  test("Test compaction for range_column - STRING Datatype min/max not stored") {
+    deleteFile(filePath2)
+    createFile(filePath2, 1000, 7)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "10")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age LONG)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='name, city',
+        | 'range_column'='name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        "OPTIONS('HEADER'='false','GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select name from carbon_range_column1 order by name").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select name from carbon_range_column1 order by name"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT,
+        CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT)
+    deleteFile(filePath2)
+  }
+
+  test("Test compaction for range_column - DATE Datatype") {
+    deleteFile(filePath2)
+    createFile(filePath2, 12, 0)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age DATE)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='age, city' ,
+        | 'range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        s"OPTIONS('HEADER'='false', 'GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        s"OPTIONS('HEADER'='false', 'GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+
+    deleteFile(filePath2)
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  }
+
+  test("Test compaction for range_column - TIMESTAMP Datatype skewed data") {
+    deleteFile(filePath2)
+    createFile(filePath2, 12, 1)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:SS")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, age TIMESTAMP)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_SCOPE'='LOCAL_SORT', 'SORT_COLUMNS'='city' ,
+        | 'range_column'='age')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        s"OPTIONS('HEADER'='false', 'GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        s"OPTIONS('HEADER'='false', 'GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+
+    deleteFile(filePath2)
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+
+  test("Test compaction for range_column - Float Datatype") {
+    deleteFile(filePath2)
+    createFile(filePath2, 12, 2)
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+    sql(
+      """
+        | CREATE TABLE carbon_range_column1(id INT, name STRING, city STRING, floatval FLOAT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('range_column'='floatval')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        s"OPTIONS('HEADER'='false', 'GLOBAL_SORT_PARTITIONS'='3')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$filePath2' INTO TABLE carbon_range_column1 " +
+        s"OPTIONS('HEADER'='false', 'GLOBAL_SORT_PARTITIONS'='3')")
+
+    var res = sql("select * from carbon_range_column1").collect()
+
+    sql("ALTER TABLE carbon_range_column1 COMPACT 'MAJOR'")
+
+    checkAnswer(sql("select * from carbon_range_column1"), res)
+
+    sql("DROP TABLE IF EXISTS carbon_range_column1")
+
+    deleteFile(filePath2)
+
+  }
+
   test("DataSkewRangePartitioner.combineDataSkew") {
     val partitioner =
-      new DataSkewRangePartitioner(1, null)(new PrimtiveOrdering(DataTypes.STRING),
+      new DataSkewRangePartitioner(1, null,
+        false)(new PrimtiveOrdering(DataTypes.STRING),
         classTag[Object])
 
     testCombineDataSkew(
@@ -263,4 +845,89 @@ class TestRangeColumnDataLoad extends QueryTest with BeforeAndAfterEach with Bef
         .size()
     }
   }
+
+  def createFile(fileName: String, line: Int = 10000, lastCol: Int = 0): Boolean = {
+    try {
+      val write = new PrintWriter(new File(fileName))
+      val start = 0
+      if (0 == lastCol) {
+        // Date data generation
+        for (i <- start until (start + line)) {
+          write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i) + "-10-10")
+        }
+      } else if (1 == lastCol) {
+        // Timestamp data generation
+        for (i <- start until (start + line)) {
+          if (i == start) {
+            write
+              .println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i) + "-10-10 " +
+                       "00:00:00")
+          } else {
+            write.println(i + "," + "n" + i + "," + "c" + (i % 10000) + ",")
+          }
+        }
+      } else if (2 == lastCol) {
+        // Float data generation
+        for (i <- start until (start + line)) {
+          write
+            .println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i) + (i % 3.14))
+        }
+      } else if (3 == lastCol) {
+        // Null data generation
+        for (i <- start until (start + line)) {
+          if (i % 3 != 0) {
+            write
+              .println(i + "," + "," + "c" + (i % 10000) + "," + (1990 + i))
+          } else {
+            write
+              .println(i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
+          }
+        }
+      } else if (4 <= lastCol && 6 >= lastCol) {
+        // No overlap data generation 1
+        for (i <- start until (start + line)) {
+          write
+            .println(
+              (line * lastCol + i) + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
+        }
+      } else if (7 == lastCol) {
+        // Min/max not stored data generation
+        for (i <- start until (start + line)) {
+          write
+            .println(
+              (100 * lastCol + i) + "," + "nnnnnnnnnnnn" + i + "," + "c" + (i % 10000) + "," +
+              (1990 + i))
+        }
+      } else if (8 == lastCol) {
+        // Range values less than default parallelism (Single value)
+        for (i <- start until (start + line)) {
+          write
+            .println(
+              100 + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
+        }
+      } else if (9 <= lastCol) {
+        for (i <- lastCol until (lastCol + line)) {
+          write
+            .println(
+              i + "," + "n" + i + "," + "c" + (i % 10000) + "," + (1990 + i))
+        }
+      }
+      write.close()
+    } catch {
+      case _: Exception => false
+    }
+    true
+  }
+
+  def deleteFile(fileName: String): Boolean = {
+    try {
+      val file = new File(fileName)
+      if (file.exists()) {
+        file.delete()
+      }
+    } catch {
+      case _: Exception => false
+    }
+    true
+  }
 }
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 77d0d84..a751887 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -27,11 +27,13 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util._
@@ -220,7 +222,10 @@ object DataLoadProcessBuilderOnSpark {
     val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast)
     val rangeRDD = keyRDD
       .partitionBy(
-        new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object]))
+        new DataSkewRangePartitioner(
+          numPartitions,
+          sampleRDD,
+          false)(objectOrdering, classTag[Object]))
       .map(_._2)
 
     // 4. Sort and Write data
@@ -306,23 +311,29 @@ object DataLoadProcessBuilderOnSpark {
         // better to generate a CarbonData file for each partition
         val totalSize = model.getTotalSize.toDouble
         val table = model.getCarbonDataLoadSchema.getCarbonTable
-        val blockSize = 1024L * 1024 * table.getBlockSizeInMB
-        val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
-        val scaleFactor = if (model.getScaleFactor == 0) {
-          // use system properties
-          CarbonProperties.getInstance().getRangeColumnScaleFactor
-        } else {
-          model.getScaleFactor
-        }
-        // For Range_Column, it will try to generate one big file for each partition.
-        // And the size of the big file is about TABLE_BLOCKSIZE of this table.
-        val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * scaleFactor
-        numPartitions = Math.ceil(totalSize / splitSize).toInt
+        numPartitions = getNumPatitionsBasedOnSize(totalSize, table, model)
       }
     }
     numPartitions
   }
 
+  def getNumPatitionsBasedOnSize(totalSize: Double,
+      table: CarbonTable,
+      model: CarbonLoadModel): Int = {
+    val blockSize = 1024L * 1024 * table.getBlockSizeInMB
+    val blockletSize = 1024L * 1024 * table.getBlockletSizeInMB
+    val scaleFactor = if (model.getScaleFactor == 0) {
+      // use system properties
+      CarbonProperties.getInstance().getRangeColumnScaleFactor
+    } else {
+      model.getScaleFactor
+    }
+    // For Range_Column, it will try to generate one big file for each partition.
+    // And the size of the big file is about TABLE_BLOCKSIZE of this table.
+    val splitSize = Math.max(blockletSize, (blockSize - blockletSize)) * scaleFactor
+    Math.ceil(totalSize / splitSize).toInt
+  }
+
   private def indexOfColumn(column: CarbonColumn, fields: Array[DataField]): Int = {
     (0 until fields.length)
       .find(index => fields(index).getColumn.getColName.equals(column.getColName))
@@ -371,3 +382,9 @@ class ByteArrayOrdering() extends Ordering[Object] {
     UnsafeComparer.INSTANCE.compareTo(x.asInstanceOf[Array[Byte]], y.asInstanceOf[Array[Byte]])
   }
 }
+
+class StringOrdering() extends Ordering[Object] {
+  override def compare(x: Object, y: Object): Int = {
+    (x.asInstanceOf[UTF8String]).compare(y.asInstanceOf[UTF8String])
+  }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0e44f6d..e361c14 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -19,20 +19,22 @@ package org.apache.carbondata.spark.rdd
 
 import java.io.IOException
 import java.util
-import java.util.{Collections, List}
+import java.util.{Collections, List, Map}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
+import scala.reflect.classTag
 
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.{InputSplit, Job}
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -43,12 +45,17 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.mutate.UpdateVO
+import org.apache.carbondata.core.scan.expression
+import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.loading.TableProcessingOperations
@@ -56,7 +63,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
+import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class CarbonMergerRDD[K, V](
     @transient private val ss: SparkSession,
@@ -77,12 +85,14 @@ class CarbonMergerRDD[K, V](
   val databaseName = carbonMergerMapping.databaseName
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
+  var expressionMapForRangeCol: util.Map[Integer, Expression] = null
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val queryStartTime = System.currentTimeMillis()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val rangeColumn = carbonTable.getRangeColumn
       val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
       if (carbonTable.isPartitionTable) {
         carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
@@ -181,7 +191,12 @@ class CarbonMergerRDD[K, V](
         }
         try {
           // fire a query and get the results.
-          rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration)
+          var expr: expression.Expression = null
+          if (null != expressionMapForRangeCol) {
+            expr = expressionMapForRangeCol
+              .get(theSplit.asInstanceOf[CarbonSparkPartition].idx)
+          }
+          rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration, expr)
         } catch {
           case e: Throwable =>
             LOGGER.error(e)
@@ -281,6 +296,14 @@ class CarbonMergerRDD[K, V](
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val rangeColumn = carbonTable.getRangeColumn
+    val dataType: DataType = if (null != rangeColumn) {
+      rangeColumn.getDataType
+    } else {
+      null
+    }
+    val isRangeColSortCol = rangeColumn != null && rangeColumn.isDimension &&
+                            rangeColumn.asInstanceOf[CarbonDimension].isSortColumn
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       carbonTable)
     val jobConf: JobConf = new JobConf(getConf)
@@ -303,14 +326,30 @@ class CarbonMergerRDD[K, V](
 
     val taskInfoList = new java.util.ArrayList[Distributable]
     var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+    var allSplits = new java.util.ArrayList[InputSplit]
 
     var splitsOfLastSegment: List[CarbonInputSplit] = null
     // map for keeping the relation of a task and its blocks.
     val taskIdMapping: java.util.Map[String, java.util.List[CarbonInputSplit]] = new
         java.util.HashMap[String, java.util.List[CarbonInputSplit]]
 
+    var totalSize: Double = 0
+    var loadMetadataDetails: Array[LoadMetadataDetails] = null
+    // Only for range column get the details for the size of segments
+    if (null != rangeColumn) {
+      loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
+    }
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
+      // In case of range column get the size for calculation of number of ranges
+      if (null != rangeColumn) {
+        for (details <- loadMetadataDetails) {
+          if (details.getLoadName == eachSeg.getSegmentNo) {
+            totalSize = totalSize + (details.getDataSize.toDouble)
+          }
+        }
+      }
 
       // map for keeping the relation of a task and its blocks.
       job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo)
@@ -329,7 +368,7 @@ class CarbonMergerRDD[K, V](
           .map(_.asInstanceOf[CarbonInputSplit])
           .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
       }
-      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
+       val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
         val blockInfo = new TableBlockInfo(entry.getFilePath,
           entry.getStart, entry.getSegmentId,
           entry.getLocations, entry.getLength, entry.getVersion,
@@ -342,6 +381,31 @@ class CarbonMergerRDD[K, V](
             updateDetails, updateStatusManager)))) &&
         FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
       }
+      carbonInputSplits ++:= filteredSplits
+      allSplits.addAll(filteredSplits.asJava)
+    }
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    var allRanges: Array[Object] = new Array[Object](0)
+    if (rangeColumn != null) {
+      // To calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
+      val numOfPartitions = Math
+        .max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, DataLoadProcessBuilderOnSpark
+          .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel))
+      val colName = rangeColumn.getColName
+      LOGGER.info(s"Compacting on range column: $colName")
+      allRanges = getRangesFromRDD(rangeColumn,
+        carbonTable,
+        numOfPartitions,
+        allSplits,
+        dataType)
+      // If RangePartitioner does not give ranges in the case when the data is skewed with
+      // a lot of null records then we take the min/max from footer and set them for tasks
+      if (null == allRanges || (allRanges.size == 1 && allRanges(0) == null)) {
+        allRanges = CarbonCompactionUtil.getOverallMinMax(carbonInputSplits.toList.toArray,
+          rangeColumn,
+          isRangeColSortCol)
+      }
+      LOGGER.info(s"Number of ranges:" + allRanges.size)
     }
 
     // prepare the details required to extract the segment properties using last segment.
@@ -362,20 +426,28 @@ class CarbonMergerRDD[K, V](
     val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
     val partitionTaskMap = new util.HashMap[PartitionSpec, String]()
     val counter = new AtomicInteger()
+    var indexOfRangeColumn = -1
+    var taskIdCount = 0
+    // As we are already handling null values in the filter expression separately so we
+    // can remove the null from the ranges we get, else it may lead to duplicate data
+    val newRanges = allRanges.filter { range =>
+      range != null
+    }
     carbonInputSplits.foreach { split =>
-      val taskNo = getTaskNo(split, partitionTaskMap, counter)
       var dataFileFooter: DataFileFooter = null
-
-      val splitList = taskIdMapping.get(taskNo)
-      noOfBlocks += 1
-      if (null == splitList) {
-        val splitTempList = new util.ArrayList[CarbonInputSplit]()
-        splitTempList.add(split)
-        taskIdMapping.put(taskNo, splitTempList)
-      } else {
-        splitList.add(split)
+      if (null == rangeColumn) {
+        val taskNo = getTaskNo(split, partitionTaskMap, counter)
+        var sizeOfSplit = split.getDetailInfo.getBlockSize
+        val splitList = taskIdMapping.get(taskNo)
+        noOfBlocks += 1
+        if (null == splitList) {
+          val splitTempList = new util.ArrayList[CarbonInputSplit]()
+          splitTempList.add(split)
+          taskIdMapping.put(taskNo, splitTempList)
+        } else {
+          splitList.add(split)
+        }
       }
-
       // Check the cardinality of each columns and set the highest.
       try {
         dataFileFooter = CarbonUtil.readMetadataFile(
@@ -390,6 +462,48 @@ class CarbonMergerRDD[K, V](
         .addColumnCardinalityToMap(columnToCardinalityMap,
           dataFileFooter.getColumnInTable,
           dataFileFooter.getSegmentInfo.getColumnCardinality)
+
+      // Create taskIdMapping here for range column by reading min/max values.
+      if (null != rangeColumn) {
+        if (null == expressionMapForRangeCol) {
+          expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
+        }
+        if (-1 == indexOfRangeColumn) {
+          val allColumns = dataFileFooter.getColumnInTable
+          for (i <- 0 until allColumns.size()) {
+            if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
+              indexOfRangeColumn = i
+            }
+          }
+        }
+        // Create ranges and add splits to the tasks
+        for (i <- 0 until (newRanges.size + 1)) {
+          if (null == expressionMapForRangeCol.get(i)) {
+            // Creating FilterExpression for the range column
+            var minVal: Object = null
+            var maxVal: Object = null
+            // For first task we will create an Or Filter and also accomodate null values
+            // For last task we will take as GreaterThan Expression of last value
+            if (i != 0) {
+              minVal = newRanges(i - 1)
+            }
+            if (i != newRanges.size) {
+              maxVal = newRanges(i)
+            }
+            val filterExpr = CarbonCompactionUtil
+              .getFilterExpressionForRange(rangeColumn,
+                minVal, maxVal, dataType)
+            expressionMapForRangeCol.put(i, filterExpr)
+          }
+          var splitList = taskIdMapping.get(i.toString)
+          noOfBlocks += 1
+          if (null == splitList) {
+            splitList = new util.ArrayList[CarbonInputSplit]()
+            taskIdMapping.put(i.toString, splitList)
+          }
+          splitList.add(split)
+        }
+      }
     }
     val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
     // update cardinality and column schema list according to master schema
@@ -472,6 +586,52 @@ class CarbonMergerRDD[K, V](
     result.toArray(new Array[Partition](result.size))
   }
 
+  private def getRangesFromRDD(rangeColumn: CarbonColumn,
+      carbonTable: CarbonTable,
+      defaultParallelism: Int,
+      allSplits: java.util.ArrayList[InputSplit],
+      dataType: DataType): Array[Object] = {
+    val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
+    val projection = new CarbonProjection
+    projection.addColumn(rangeColumn.getColName)
+    val scanRdd = new CarbonScanRDD[InternalRow](
+      ss,
+      projection,
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      carbonTable.getTableInfo.serialize(),
+      carbonTable.getTableInfo,
+      inputMetricsStats,
+      partitionNames = null,
+      splits = allSplits)
+    val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn)
+    val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
+    // Change string type to support all types
+    val sampleRdd = scanRdd
+      .map(row => (row.get(0, sparkDataType), null))
+    val value = new DataSkewRangePartitioner(
+      defaultParallelism, sampleRdd, true)(objectOrdering, classTag[Object])
+    value.rangeBounds
+  }
+
+  private def createOrderingForColumn(column: CarbonColumn): Ordering[Object] = {
+    if (column.isDimension) {
+      val dimension = column.asInstanceOf[CarbonDimension]
+      if ((dimension.isGlobalDictionaryEncoding || dimension.isDirectDictionaryEncoding) &&
+          dimension.getDataType != DataTypes.TIMESTAMP) {
+        new PrimtiveOrdering(DataTypes.INT)
+      } else {
+        if (DataTypeUtil.isPrimitiveColumn(column.getDataType)) {
+          new PrimtiveOrdering(column.getDataType)
+        } else {
+          new StringOrdering()
+        }
+      }
+    } else {
+      new PrimtiveOrdering(column.getDataType)
+    }
+  }
+
   private def getTaskNo(
       split: CarbonInputSplit,
       partitionTaskMap: util.Map[PartitionSpec, String],
@@ -495,8 +655,6 @@ class CarbonMergerRDD[K, V](
     }
   }
 
-
-
   private def getPartitionNamesFromTask(taskId: String,
       partitionTaskMap: util.Map[PartitionSpec, String]): Option[PartitionSpec] = {
     if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index d0ed815..b62a7e2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -82,7 +82,8 @@ class CarbonScanRDD[T: ClassTag](
     inputMetricsStats: InitInputMetrics,
     @transient val partitionNames: Seq[PartitionSpec],
     val dataTypeConverterClz: Class[_ <: DataTypeConverter] = classOf[SparkDataTypeConverterImpl],
-    val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass)
+    val readSupportClz: Class[_ <: CarbonReadSupport[_]] = SparkReadSupport.readSupportClass,
+    @transient var splits: java.util.List[InputSplit] = null)
   extends CarbonRDDWithTableInfo[T](spark, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
@@ -126,7 +127,9 @@ class CarbonScanRDD[T: ClassTag](
 
       // get splits
       getSplitsStartTime = System.currentTimeMillis()
-      val splits = format.getSplits(job)
+      if (null == splits) {
+        splits = format.getSplits(job)
+      }
       getSplitsEndTime = System.currentTimeMillis()
       if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) {
         throw new SparkException(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
similarity index 100%
rename from integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
rename to integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
index 12285d3..d434108 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
@@ -80,7 +80,8 @@ import org.apache.spark.util.{CollectionsUtils, Utils}
  */
 class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
     partitions: Int,
-    rdd: RDD[_ <: Product2[K, V]])
+    rdd: RDD[_ <: Product2[K, V]],
+    withoutSkew: Boolean)
   extends Partitioner {
 
   // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
@@ -92,7 +93,8 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
   // dataSkewCount: how many bounds happened data skew
   // dataSkewIndex: the index of data skew bounds
   // dataSkewNum: how many partition of each data skew bound
-  private var (rangeBounds: Array[K], skewCount: Int, skewIndexes: Array[Int],
+  // Min and Max values of complete range
+  var (rangeBounds: Array[K], skewCount: Int, skewIndexes: Array[Int],
   skewWeights: Array[Int]) = {
     if (partitions <= 1) {
       (Array.empty[K], 0, Array.empty[Int], Array.empty[Int])
@@ -103,7 +105,7 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
       val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
       val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
       if (numItems == 0L) {
-        (Array.empty[K], 0, Array.empty[Int], Array.empty[Int])
+        (Array.empty[K], 0, Array.empty[Int], Array.empty[Int], Array.empty[K])
       } else {
         // If a partition contains much more than the average number of items, we re-sample from it
         // to ensure that enough items are collected from that partition.
@@ -129,14 +131,25 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
           val weight = (1.0 / fraction).toFloat
           candidates ++= reSampled.map(x => (x, weight))
         }
-        determineBounds(candidates, partitions)
+        // In case of compaction we do not need the skew handled ranges so we use RangePartitioner,
+        // but we require the overall minmax for creating the separate ranges.
+        // withoutSkew = true for Compaction only
+        if (withoutSkew == false) {
+          determineBounds(candidates, partitions, false)
+        } else {
+          var ranges = RangePartitioner.determineBounds(candidates, partitions)
+          var otherRangeParams = determineBounds(candidates, partitions, true)
+          (ranges, otherRangeParams._2, otherRangeParams._3,
+            otherRangeParams._4)
+        }
       }
     }
   }
 
   def determineBounds(
       candidates: ArrayBuffer[(K, Float)],
-      partitions: Int): (Array[K], Int, Array[Int], Array[Int]) = {
+      partitions: Int,
+      withoutSkew: Boolean): (Array[K], Int, Array[Int], Array[Int]) = {
     val ordered = candidates.sortBy(_._1)
     val numCandidates = ordered.size
     val sumWeights = ordered.map(_._2.toDouble).sum
@@ -196,7 +209,8 @@ class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
       dataSkewNumTmp += dataSkewCountTmp
     }
     if (dataSkewIndexTmp.size > 0) {
-      (finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, dataSkewNumTmp.toArray)
+      (finalBounds.toArray, dataSkewIndexTmp.size, dataSkewIndexTmp.toArray, dataSkewNumTmp
+        .toArray)
     } else {
       (finalBounds.toArray, 0, Array.empty[Int], Array.empty[Int])
     }
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index d978128..a2b2af6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
-import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -784,13 +784,19 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         throw new MalformedCarbonCommandException(errorMsg)
       }
       val rangeField = fields.find(_.column.equalsIgnoreCase(rangeColumn))
+      val dataType = rangeField.get.dataType.get
       if (rangeField.isEmpty) {
         val errorMsg = "range_column: " + rangeColumn +
                        " does not exist in table. Please check the create table statement."
         throw new MalformedCarbonCommandException(errorMsg)
-      } else if (DataTypes.BINARY.getName.equalsIgnoreCase(rangeField.get.dataType.get)) {
+      } else if (DataTypes.BINARY.getName.equalsIgnoreCase(dataType) ||
+                 DataTypes.BOOLEAN.getName.equalsIgnoreCase(dataType) ||
+                 CarbonCommonConstants.ARRAY.equalsIgnoreCase(dataType) ||
+                 CarbonCommonConstants.STRUCT.equalsIgnoreCase(dataType) ||
+                 CarbonCommonConstants.MAP.equalsIgnoreCase(dataType) ||
+                 CarbonCommonConstants.DECIMAL.equalsIgnoreCase(dataType)) {
         throw new MalformedCarbonCommandException(
-          "RANGE_COLUMN doesn't support binary data type:" + rangeColumn)
+          s"RANGE_COLUMN doesn't support $dataType data type: " + rangeColumn)
       } else {
         tableProperties.put(CarbonCommonConstants.RANGE_COLUMN, rangeField.get.column)
       }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 57dd356..09763fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.rdd.{CarbonScanRDD, SparkReadSupport}
 
 case class CarbonDatasourceHadoopRelation(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 619b45a..d9c7be7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
@@ -108,9 +109,10 @@ public class CarbonCompactionExecutor {
    * Map has 2 elements: UNSORTED and SORTED
    * Map(UNSORTED) = List of Iterators which yield sorted data
    * Map(Sorted) = List of Iterators which yield sorted data
+   * In Range Column compaction we will have a Filter Expression to process
    */
-  public Map<String, List<RawResultIterator>> processTableBlocks(Configuration configuration)
-      throws QueryExecutionException, IOException {
+  public Map<String, List<RawResultIterator>> processTableBlocks(Configuration configuration,
+      Expression filterExpr) throws QueryExecutionException, IOException {
 
     Map<String, List<RawResultIterator>> resultList = new HashMap<>(2);
     resultList.put(CarbonCompactionUtil.UNSORTED_IDX,
@@ -119,10 +121,16 @@ public class CarbonCompactionExecutor {
         new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
 
     List<TableBlockInfo> tableBlockInfos = null;
-    QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
-        .projectAllColumns()
-        .dataConverter(dataTypeConverter)
-        .enableForcedDetailRawQuery();
+    QueryModelBuilder builder = null;
+    if (null == filterExpr) {
+      builder =
+          new QueryModelBuilder(carbonTable).projectAllColumns().dataConverter(dataTypeConverter)
+              .enableForcedDetailRawQuery();
+    } else {
+      builder = new QueryModelBuilder(carbonTable).projectAllColumns().filterExpression(filterExpr)
+          .dataConverter(dataTypeConverter).enableForcedDetailRawQuery()
+          .convertToRangeFilter(false);
+    }
     if (enablePageLevelReaderForCompaction()) {
       builder.enableReadPageByPage();
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index c4b6843..f4a15bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -30,14 +30,28 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.log4j.Logger;
@@ -461,6 +475,132 @@ public class CarbonCompactionUtil {
     return false;
   }
 
+  // This method will return an Expression(And/Or) for each range based on the datatype
+  // This Expression will be passed to each task as a Filter Query to get the data
+  public static Expression getFilterExpressionForRange(CarbonColumn rangeColumn, Object minVal,
+      Object maxVal, DataType dataType) {
+    Expression finalExpr;
+    Expression exp1, exp2;
+    String colName = rangeColumn.getColName();
+
+    // In case of null values create an OrFilter expression and
+    // for other cases create and AndFilter Expression
+    if (null == minVal) {
+      // First task
+      exp1 = new EqualToExpression(new ColumnExpression(colName, dataType),
+          new LiteralExpression(null, dataType), true);
+      if (null == maxVal) {
+        // If both the min/max values are null, that means, if data contains only
+        // null value then pass only one expression as a filter expression
+        finalExpr = exp1;
+      } else {
+        exp2 = new LessThanEqualToExpression(new ColumnExpression(colName, dataType),
+            new LiteralExpression(maxVal, dataType));
+        if (rangeColumn.hasEncoding(Encoding.DICTIONARY)) {
+          exp2.setAlreadyResolved(true);
+        }
+        finalExpr = new OrExpression(exp1, exp2);
+      }
+    } else if (null == maxVal) {
+      // Last task
+      finalExpr = new GreaterThanExpression(new ColumnExpression(colName, dataType),
+          new LiteralExpression(minVal, dataType));
+      if (rangeColumn.hasEncoding(Encoding.DICTIONARY)) {
+        finalExpr.setAlreadyResolved(true);
+      }
+    } else {
+      // Remaining all intermediate ranges
+      exp1 = new GreaterThanExpression(new ColumnExpression(colName, dataType),
+          new LiteralExpression(minVal, dataType));
+      exp2 = new LessThanEqualToExpression(new ColumnExpression(colName, dataType),
+          new LiteralExpression(maxVal, dataType));
+      if (rangeColumn.hasEncoding(Encoding.DICTIONARY)) {
+        exp2.setAlreadyResolved(true);
+        exp1.setAlreadyResolved(true);
+      }
+      finalExpr = new AndExpression(exp1, exp2);
+    }
+    return finalExpr;
+  }
+
+  public static Object[] getOverallMinMax(CarbonInputSplit[] carbonInputSplits,
+      CarbonColumn rangeCol, boolean isSortCol) {
+    byte[] minVal = null;
+    byte[] maxVal = null;
+    int dictMinVal = Integer.MAX_VALUE;
+    int dictMaxVal = Integer.MIN_VALUE;
+    int idx = -1;
+    DataType dataType = rangeCol.getDataType();
+    Object[] minMaxVals = new Object[2];
+    boolean isDictEncode = rangeCol.hasEncoding(Encoding.DICTIONARY);
+    try {
+      for (CarbonInputSplit split : carbonInputSplits) {
+        DataFileFooter dataFileFooter = null;
+        dataFileFooter =
+            CarbonUtil.readMetadataFile(CarbonInputSplit.getTableBlockInfo(split), true);
+
+        if (-1 == idx) {
+          List<ColumnSchema> allColumns = dataFileFooter.getColumnInTable();
+          for (int i = 0; i < allColumns.size(); i++) {
+            if (allColumns.get(i).getColumnName().equalsIgnoreCase(rangeCol.getColName())) {
+              idx = i;
+              break;
+            }
+          }
+        }
+        if (isDictEncode) {
+          byte[] tempMin = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMinValues()[idx];
+          int tempMinVal = CarbonUtil.getSurrogateInternal(tempMin, 0, tempMin.length);
+          byte[] tempMax = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues()[idx];
+          int tempMaxVal = CarbonUtil.getSurrogateInternal(tempMax, 0, tempMax.length);
+          if (dictMinVal > tempMinVal) {
+            dictMinVal = tempMinVal;
+          }
+          if (dictMaxVal < tempMaxVal) {
+            dictMaxVal = tempMaxVal;
+          }
+        } else {
+          if (null == minVal) {
+            minVal = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMinValues()[idx];
+            maxVal = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues()[idx];
+          } else {
+            byte[] tempMin = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMinValues()[idx];
+            byte[] tempMax = dataFileFooter.getBlockletIndex().getMinMaxIndex().getMaxValues()[idx];
+            if (ByteUtil.compare(tempMin, minVal) <= 0) {
+              minVal = tempMin;
+            }
+            if (ByteUtil.compare(tempMax, maxVal) >= 0) {
+              maxVal = tempMax;
+            }
+          }
+        }
+      }
+
+      // Based on how min/max value is stored in the footer we change the data
+      if (isDictEncode) {
+        minMaxVals[0] = dictMinVal;
+        minMaxVals[1] = dictMaxVal;
+      } else {
+        if (!isSortCol && (dataType == DataTypes.INT || dataType == DataTypes.LONG)) {
+          minMaxVals[0] = ByteUtil.toLong(minVal, 0, minVal.length);
+          minMaxVals[1] = ByteUtil.toLong(maxVal, 0, maxVal.length);
+        } else if (dataType == DataTypes.DOUBLE) {
+          minMaxVals[0] = ByteUtil.toDouble(minVal, 0, minVal.length);
+          minMaxVals[1] = ByteUtil.toDouble(maxVal, 0, maxVal.length);
+        } else {
+          minMaxVals[0] =
+              DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minVal, dataType, true);
+          minMaxVals[1] =
+              DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxVal, dataType, true);
+        }
+      }
+
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+    return minMaxVals;
+  }
+
   /**
    * Returns if the DataFileFooter containing carbondata file contains
    * sorted data or not.
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 7234c33..bec51e6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -117,8 +117,10 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
 
       // add all iterators to the queue
       for (RawResultIterator leaftTupleIterator : finalIteratorList) {
-        this.recordHolderHeap.add(leaftTupleIterator);
-        index++;
+        if (leaftTupleIterator.hasNext()) {
+          this.recordHolderHeap.add(leaftTupleIterator);
+          index++;
+        }
       }
       RawResultIterator iterator = null;
       while (index > 1) {