You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/07/31 11:49:19 UTC

[5/6] carbondata git commit: Rebased with new master and fixed binary comparisions and comments.

Rebased with new master and fixed binary comparisions and comments.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4e835095
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4e835095
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4e835095

Branch: refs/heads/master
Commit: 4e8350956efbba338316e3ead698106af7222f09
Parents: 266c473
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Mon Jul 31 12:45:13 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Mon Jul 31 16:55:33 2017 +0530

----------------------------------------------------------------------
 .../core/datastore/block/SegmentProperties.java |  20 +++
 ...CompressedMeasureChunkFileBasedReaderV1.java |   3 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |  11 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |  10 +-
 .../core/datastore/page/ColumnPage.java         |   2 +-
 .../page/encoding/EncodingStrategy.java         |   2 +-
 .../statistics/ColumnPageStatsCollector.java    |   3 +
 .../statistics/PrimitivePageStatsCollector.java |  47 +++++--
 .../statistics/VarLengthPageStatsCollector.java |   6 +
 .../blockletindex/BlockletDataMap.java          |   6 +-
 .../core/metadata/ColumnPageCodecMeta.java      |  53 ++++++--
 .../core/metadata/ValueEncoderMeta.java         |  22 +---
 .../core/scan/filter/ColumnFilterInfo.java      |   6 +-
 .../carbondata/core/scan/filter/FilterUtil.java |  49 ++++---
 .../executer/ExcludeFilterExecuterImpl.java     |  13 +-
 .../executer/IncludeFilterExecuterImpl.java     |  66 ++++------
 .../MeasureColumnExecuterFilterInfo.java        |   6 +-
 .../executer/RestructureEvaluatorImpl.java      |  19 ++-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  74 +++++------
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  74 +++++------
 ...velRangeLessThanEqualFilterExecuterImpl.java |  70 ++++------
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  67 ++++------
 .../RowLevelRangeTypeExecuterFacory.java        |  16 ++-
 .../filter/partition/PartitionFilterUtil.java   |   2 -
 .../resolver/ConditionalFilterResolverImpl.java |   2 +-
 .../RowLevelRangeFilterResolverImpl.java        |  35 +++--
 .../core/util/CarbonMetadataUtil.java           |  24 ----
 .../carbondata/core/util/DataTypeUtil.java      | 128 ++++---------------
 .../core/util/comparator/Comparator.java        |  58 +++++++++
 .../util/comparator/SerializableComparator.java |   2 +-
 30 files changed, 413 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 02bd7bd..23d2129 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -657,6 +657,26 @@ public class SegmentProperties {
     return dimensionValueSize;
   }
 
+  public int[] getColumnsValueSize() {
+    int[] dimensionValueSize =
+        new int[eachDimColumnValueSize.length + eachComplexDimColumnValueSize.length + measures
+            .size()];
+    System
+        .arraycopy(eachDimColumnValueSize, 0, dimensionValueSize, 0, eachDimColumnValueSize.length);
+    System.arraycopy(eachComplexDimColumnValueSize, 0, dimensionValueSize,
+        eachDimColumnValueSize.length, eachComplexDimColumnValueSize.length);
+    int k = eachDimColumnValueSize.length + eachComplexDimColumnValueSize.length;
+    for (int i = 0; i < measures.size(); i++) {
+      DataType dataType = measures.get(i).getDataType();
+      if (dataType.equals(DataType.DECIMAL)) {
+        dimensionValueSize[k++] = -1;
+      } else {
+        dimensionValueSize[k++] = 8;
+      }
+    }
+    return dimensionValueSize;
+  }
+
   /**
    * @return the dimensionKeyGenerator
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 7df18db..f3c7067 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -97,8 +97,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChun
     int blockIndex = measureRawColumnChunk.getBlockletId();
     DataChunk dataChunk = measureColumnChunks.get(blockIndex);
     ValueEncoderMeta meta = dataChunk.getValueEncoderMeta().get(0);
-
-    ColumnPageCodec codec = strategy.newCodec(meta, -1, -1);
+    ColumnPageCodec codec = strategy.newCodec(meta);
     ColumnPage page = codec.decode(measureRawColumnChunk.getRawData().array(),
         measureRawColumnChunk.getOffSet(), dataChunk.getDataPageLength());
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index f2679ae..a1c1267 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -136,16 +136,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     byte[] encodedMeta = encoder_meta.get(0).array();
 
     ValueEncoderMeta meta = CarbonUtil.deserializeEncoderMetaV3(encodedMeta);
-    int scale = -1;
-    int precision = -1;
-    if (encoder_meta.size() > 1) {
-      ByteBuffer decimalInfo = encoder_meta.get(1);
-      scale = decimalInfo.getInt();
-      precision = decimalInfo.getInt();
-    }
-
-
-    ColumnPageCodec codec = strategy.newCodec(meta, scale, precision);
+    ColumnPageCodec codec = strategy.newCodec(meta);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 354aa38..bde9803 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -232,17 +232,9 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     assert (encoder_meta.size() > 0);
     byte[] encodedMeta = encoder_meta.get(0).array();
 
-    int scale = -1;
-    int precision = -1;
-    if (encoder_meta.size() > 1) {
-      ByteBuffer decimalInfo = encoder_meta.get(1);
-      scale = decimalInfo.getInt();
-      precision = decimalInfo.getInt();
-    }
-
     ColumnPageCodecMeta meta = new ColumnPageCodecMeta();
     meta.deserialize(encodedMeta);
-    ColumnPageCodec codec = strategy.newCodec(meta, scale, precision);
+    ColumnPageCodec codec = strategy.newCodec(meta);
     byte[] rawData = measureRawColumnChunk.getRawData().array();
     return codec.decode(rawData, copyPoint, measureColumnChunk.data_page_length);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 245e95b..3912f45 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -326,7 +326,7 @@ public abstract class ColumnPage {
         break;
       case DECIMAL:
         putDecimal(rowId, (BigDecimal) value);
-        statsCollector.update(((BigDecimal) value).unscaledValue().longValue());
+        statsCollector.update((BigDecimal) value);
         break;
       case BYTE_ARRAY:
         putBytes(rowId, (byte[]) value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
index 53c565d..b122615 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -54,7 +54,7 @@ public abstract class EncodingStrategy {
   /**
    * create codec based on the page data type and statistics contained by ValueEncoderMeta
    */
-  public ColumnPageCodec newCodec(ValueEncoderMeta meta, int scale, int precision) {
+  public ColumnPageCodec newCodec(ValueEncoderMeta meta) {
     if (meta instanceof ColumnPageCodecMeta) {
       ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta;
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
index 5439a29..2440e33 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsCollector.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page.statistics;
 
+import java.math.BigDecimal;
+
 public interface ColumnPageStatsCollector {
   void updateNull(int rowId);
   void update(byte value);
@@ -24,6 +26,7 @@ public interface ColumnPageStatsCollector {
   void update(int value);
   void update(long value);
   void update(double value);
+  void update(BigDecimal value);
   void update(byte[] value);
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index af40f03..294d699 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -32,6 +32,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   private int minInt, maxInt;
   private long minLong, maxLong;
   private double minDouble, maxDouble;
+  private BigDecimal minDecimal, maxDecimal;
   private int scale, precision;
 
   // scale of the double value
@@ -40,6 +41,9 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   // The index of the rowId whose value is null, will be set to 1
   private BitSet nullBitSet;
 
+  private boolean isFirst = true;
+  private BigDecimal zeroDecimal;
+
   // this is for encode flow
   public static PrimitivePageStatsCollector newInstance(DataType dataType, int pageSize, int
       scale, int precision) {
@@ -78,8 +82,8 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         instance.decimal = meta.getDecimal();
         break;
       case DECIMAL:
-        instance.minLong = (long) meta.getMinValue();
-        instance.maxLong = (long) meta.getMaxValue();
+        instance.minDecimal = (BigDecimal) meta.getMinValue();
+        instance.maxDecimal = (BigDecimal) meta.getMaxValue();
         instance.decimal = meta.getDecimal();
         instance.scale = meta.getScale();
         instance.precision = meta.getPrecision();
@@ -90,7 +94,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) {
     PrimitivePageStatsCollector instance =
-        new PrimitivePageStatsCollector(meta.getType(), 0, meta.getScale(), meta.getPrecision());
+        new PrimitivePageStatsCollector(meta.getType(), 0, -1, -1);
     // set min max from meta
     switch (meta.getType()) {
       case BYTE:
@@ -115,11 +119,11 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         instance.decimal = meta.getDecimal();
         break;
       case DECIMAL:
-        instance.minDouble = (double) meta.getMinValue();
-        instance.maxDouble = (double) meta.getMaxValue();
+        instance.minDecimal = (BigDecimal) meta.getMinValue();
+        instance.maxDecimal = (BigDecimal) meta.getMaxValue();
         instance.decimal = meta.getDecimal();
-        instance.scale = meta.getScale();
-        instance.precision = meta.getPrecision();
+        instance.scale = -1;
+        instance.precision = -1;
         break;
     }
     return instance;
@@ -151,8 +155,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
         decimal = 0;
         break;
       case DECIMAL:
-        minLong = Long.MAX_VALUE;
-        maxLong = Long.MIN_VALUE;
+        this.zeroDecimal = BigDecimal.ZERO;
         decimal = scale;
         this.scale = scale;
         this.precision = precision;
@@ -179,6 +182,16 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       case DOUBLE:
         update(0d);
         break;
+      case DECIMAL:
+        if (isFirst) {
+          maxDecimal = zeroDecimal;
+          minDecimal = zeroDecimal;
+          isFirst = false;
+        } else {
+          maxDecimal = (maxDecimal.compareTo(zeroDecimal) > 0) ? maxDecimal : zeroDecimal;
+          minDecimal = (minDecimal.compareTo(zeroDecimal) < 0) ? minDecimal : zeroDecimal;
+        }
+        break;
     }
   }
 
@@ -239,6 +252,18 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   }
 
   @Override
+  public void update(BigDecimal decimalValue) {
+    if (isFirst) {
+      maxDecimal = decimalValue;
+      minDecimal = decimalValue;
+      isFirst = false;
+    } else {
+      maxDecimal = (decimalValue.compareTo(maxDecimal) > 0) ? decimalValue : maxDecimal;
+      minDecimal = (decimalValue.compareTo(maxDecimal) < 0) ? decimalValue : minDecimal;
+    }
+  }
+
+  @Override
   public void update(byte[] value) {
   }
 
@@ -278,7 +303,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       case DOUBLE:
         return minDouble;
       case DECIMAL:
-        return minLong;
+        return minDecimal;
     }
     return null;
   }
@@ -297,7 +322,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       case DOUBLE:
         return maxDouble;
       case DECIMAL:
-        return maxLong;
+        return maxDecimal;
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
index 0fe5960..dffd9ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page.statistics;
 
+import java.math.BigDecimal;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -64,6 +65,11 @@ public class VarLengthPageStatsCollector implements ColumnPageStatsCollector {
   }
 
   @Override
+  public void update(BigDecimal value) {
+
+  }
+
+  @Override
   public void update(byte[] value) {
     if (min == null && max == null) {
       min = value;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 680852d..4b5be11 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -118,7 +118,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private void loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties,
       String filePath) {
-    int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
     DataMapSchema[] schema = unsafeMemoryDMStore.getSchema();
     for (int index = 0; index < blockletList.size(); index++) {
@@ -182,7 +182,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
     // Index key
     indexSchemas.add(new DataMapSchema.VariableDataMapSchema(DataType.BYTE_ARRAY));
-    int[] minMaxLen = segmentProperties.getEachDimColumnValueSize();
+    int[] minMaxLen = segmentProperties.getColumnsValueSize();
     // do it 2 times, one for min and one for max.
     for (int k = 0; k < 2; k++) {
       DataMapSchema[] mapSchemas = new DataMapSchema[minMaxLen.length];
@@ -229,7 +229,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     }
     List<Blocklet> blocklets = new ArrayList<>();
     Comparator<DataMapRow> comparator =
-        new BlockletDMComparator(segmentProperties.getEachDimColumnValueSize(),
+        new BlockletDMComparator(segmentProperties.getColumnsValueSize(),
             segmentProperties.getNumberOfSortColumns(),
             segmentProperties.getNumberOfNoDictSortColumns());
     List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
index 6322670..591899e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
@@ -18,12 +18,14 @@
 package org.apache.carbondata.core.metadata;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * It holds metadata for one column page
@@ -36,6 +38,10 @@ public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializabl
 
   private DataType targetDataType;
 
+  private int scale;
+
+  private int precision;
+
   public static final char BYTE_VALUE_MEASURE = 'c';
   public static final char SHORT_VALUE_MEASURE = 'j';
   public static final char INT_VALUE_MEASURE = 'k';
@@ -171,17 +177,22 @@ public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializabl
         buffer.putDouble((Double) 0d); // unique value is obsoleted, maintain for compatibility
         break;
       case DECIMAL:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + (CarbonCommonConstants
-                .INT_SIZE_IN_BYTE * 3)
-                + 3);
+        byte[] maxAsBytes = getMaxAsBytes();
+        byte[] minAsBytes = getMinAsBytes();
+        byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
+        buffer = ByteBuffer.allocate(maxAsBytes.length + minAsBytes.length + unique.length
+            + 3 * CarbonCommonConstants.SHORT_SIZE_IN_BYTE
+            + CarbonCommonConstants.INT_SIZE_IN_BYTE * 3 + 3);
         buffer.putChar(getSrcDataTypeInChar());
-        buffer.putLong((Long) getMaxValue());
-        buffer.putLong((Long) getMinValue());
-        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
+        buffer.putShort((short) maxAsBytes.length);
+        buffer.put(maxAsBytes);
+        buffer.putShort((short)minAsBytes.length);
+        buffer.put(minAsBytes);
+        // unique value is obsoleted, maintain for compatibility
+        buffer.putShort((short) unique.length);
+        buffer.put(unique);
         buffer.putInt(getScale());
         buffer.putInt(getPrecision());
-
         break;
     }
     buffer.putInt(getDecimal());
@@ -201,9 +212,13 @@ public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializabl
         buffer.getDouble(); // for non exist value which is obsoleted, it is backward compatibility;
         break;
       case BIG_DECIMAL_MEASURE:
-        this.setMaxValue(buffer.getLong());
-        this.setMinValue(buffer.getLong());
-        buffer.getLong();
+        byte[] max = new byte[buffer.getShort()];
+        buffer.get(max);
+        this.setMaxValue(DataTypeUtil.byteToBigDecimal(max));
+        byte[] min = new byte[buffer.getShort()];
+        buffer.get(min);
+        this.setMinValue(DataTypeUtil.byteToBigDecimal(min));
+        buffer.get(new byte[buffer.getShort()]);
         this.setScale(buffer.getInt());
         this.setPrecision(buffer.getInt());
         break;
@@ -274,6 +289,7 @@ public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializabl
         b.flip();
         return b.array();
       case DECIMAL:
+        return DataTypeUtil.bigDecimalToByte((BigDecimal)value);
       case BYTE_ARRAY:
         return new byte[8];
       default:
@@ -281,4 +297,19 @@ public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializabl
     }
   }
 
+  public int getScale() {
+    return scale;
+  }
+
+  public void setScale(int scale) {
+    this.scale = scale;
+  }
+
+  public int getPrecision() {
+    return precision;
+  }
+
+  public void setPrecision(int precision) {
+    this.precision = precision;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
index f1ebbd3..741b999 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
@@ -49,10 +49,6 @@ public class ValueEncoderMeta implements Serializable {
 
   private byte dataTypeSelected;
 
-  private int scale;
-
-  private int precision;
-
   public Object getMaxValue() {
     return maxValue;
   }
@@ -113,20 +109,4 @@ public class ValueEncoderMeta implements Serializable {
   public void setDataTypeSelected(byte dataTypeSelected) {
     this.dataTypeSelected = dataTypeSelected;
   }
-
-  public int getScale() {
-    return scale;
-  }
-
-  public void setScale(int scale) {
-    this.scale = scale;
-  }
-
-  public int getPrecision() {
-    return precision;
-  }
-
-  public void setPrecision(int precision) {
-    this.precision = precision;
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
index 008d908..ce137ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
@@ -38,7 +38,7 @@ public class ColumnFilterInfo implements Serializable {
    */
   private List<byte[]> noDictionaryFilterValuesList;
 
-  private List<byte[]> measuresFilterValuesList;
+  private List<Object> measuresFilterValuesList;
 
   public List<byte[]> getNoDictionaryFilterValuesList() {
     return noDictionaryFilterValuesList;
@@ -78,11 +78,11 @@ public class ColumnFilterInfo implements Serializable {
     this.implicitColumnFilterList = implicitColumnFilterList;
   }
 
-  public List<byte[]> getMeasuresFilterValuesList() {
+  public List<Object> getMeasuresFilterValuesList() {
     return measuresFilterValuesList;
   }
 
-  public void setMeasuresFilterValuesList(List<byte[]> measuresFilterValuesList) {
+  public void setMeasuresFilterValuesList(List<Object> measuresFilterValuesList) {
     this.measuresFilterValuesList = measuresFilterValuesList;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 6d531ae..9752e1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
@@ -92,7 +91,10 @@ import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public final class FilterUtil {
   private static final LogService LOGGER =
@@ -420,7 +422,7 @@ public final class FilterUtil {
       throw new FilterUnsupportedException("Unsupported Filter condition: " + result, ex);
     }
 
-    Comparator<byte[]> filterNoDictValueComaparator = new Comparator<byte[]>() {
+    java.util.Comparator<byte[]> filterNoDictValueComaparator = new java.util.Comparator<byte[]>() {
 
       @Override public int compare(byte[] filterMember1, byte[] filterMember2) {
         // TODO Auto-generated method stub
@@ -450,36 +452,27 @@ public final class FilterUtil {
   public static ColumnFilterInfo getMeasureValKeyMemberForFilter(
       List<String> evaluateResultListFinal, boolean isIncludeFilter, DataType dataType,
       CarbonMeasure carbonMeasure) throws FilterUnsupportedException {
-    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    List<Object> filterValuesList = new ArrayList<>(20);
     String result = null;
     try {
       int length = evaluateResultListFinal.size();
       for (int i = 0; i < length; i++) {
         result = evaluateResultListFinal.get(i);
         if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(result)) {
-          filterValuesList.add(new byte[0]);
+          filterValuesList.add(null);
           continue;
         }
-        // TODO have to understand what method to be used for measures.
-        // filterValuesList
-        //  .add(DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(result, dataType));
 
         filterValuesList
-            .add(DataTypeUtil.getMeasureByteArrayBasedOnDataTypes(result, dataType, carbonMeasure));
+            .add(DataTypeUtil.getMeasureValueBasedOnDataType(result, dataType, carbonMeasure));
 
       }
     } catch (Throwable ex) {
       throw new FilterUnsupportedException("Unsupported Filter condition: " + result, ex);
     }
 
-    Comparator<byte[]> filterMeasureComaparator = new Comparator<byte[]>() {
-
-      @Override public int compare(byte[] filterMember1, byte[] filterMember2) {
-        // TODO Auto-generated method stub
-        return ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterMember1, filterMember2);
-      }
-
-    };
+    SerializableComparator filterMeasureComaparator =
+        Comparator.getComparatorByDataTypeForMeasure(dataType);
     Collections.sort(filterValuesList, filterMeasureComaparator);
     ColumnFilterInfo columnFilterInfo = null;
     if (filterValuesList.size() > 0) {
@@ -614,7 +607,7 @@ public final class FilterUtil {
 
   private static void sortFilterModelMembers(final ColumnExpression columnExpression,
       List<String> evaluateResultListFinal) {
-    Comparator<String> filterActualValueComaparator = new Comparator<String>() {
+    java.util.Comparator<String> filterActualValueComaparator = new java.util.Comparator<String>() {
 
       @Override public int compare(String filterMember1, String filterMember2) {
         return compareFilterMembersBasedOnActualDataType(filterMember1, filterMember2,
@@ -735,12 +728,7 @@ public final class FilterUtil {
    * @return
    */
   public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo,
-      CarbonDimension carbonDimension, CarbonMeasure carbonMeasure,
-      SegmentProperties segmentProperties) {
-    if (null != carbonMeasure) {
-      return columnFilterInfo.getMeasuresFilterValuesList()
-          .toArray((new byte[columnFilterInfo.getMeasuresFilterValuesList().size()][]));
-    }
+      CarbonDimension carbonDimension, SegmentProperties segmentProperties) {
     if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
       return columnFilterInfo.getNoDictionaryFilterValuesList()
           .toArray((new byte[columnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
@@ -1149,14 +1137,25 @@ public final class FilterUtil {
       DimColumnExecuterFilterInfo dimColumnExecuterInfo, CarbonMeasure measures,
       MeasureColumnExecuterFilterInfo msrColumnExecuterInfo) {
     if (null != measures) {
-      byte[][] keysBasedOnFilter = getKeyArray(filterValues, null, measures, segmentProperties);
+      DataTypeConverterImpl converter = new DataTypeConverterImpl();
+      Object[] keysBasedOnFilter = filterValues.getMeasuresFilterValuesList()
+          .toArray((new Object[filterValues.getMeasuresFilterValuesList().size()]));
+      for (int i = 0; i < keysBasedOnFilter.length; i++) {
+        if (keysBasedOnFilter[i] != null) {
+          keysBasedOnFilter[i] = DataTypeUtil
+              .getDataBasedOnDataType(keysBasedOnFilter[i].toString(), measures.getDataType(),
+                  converter);
+        }
+      }
       msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
     } else {
-      byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, null, segmentProperties);
+      byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, segmentProperties);
       dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
     }
   }
 
+
+
   /**
    * method will create a default end key in case of no end key is been derived using existing
    * filter or in case of non filter queries.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 21e8447..6601797 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
 import java.util.BitSet;
-import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
@@ -27,13 +26,14 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class ExcludeFilterExecuterImpl implements FilterExecuter {
 
@@ -140,24 +140,23 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
     // the filter values. The one that matches sets it Bitset.
     BitSet bitSet = new BitSet(numerOfRows);
     bitSet.flip(0, numerOfRows);
-    byte[][] filterValues = msrColumnExecutorInfo.getFilterKeys();
-    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i].length == 0) {
+      if (filterValues[i] == null) {
         BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.flip(j);
         }
         continue;
       }
-      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         // Check if filterValue[i] matches with measure Values.
         Object msrValue = DataTypeUtil
             .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
                 msrType, msrColumnEvaluatorInfo.getMeasure());
 
-        if (comparator.compare(msrValue, filter) == 0) {
+        if (comparator.compare(msrValue, filterValues[i]) == 0) {
           // This is a match.
           bitSet.flip(startIndex);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 0fa42ae..f848e07 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -17,10 +17,7 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
@@ -29,7 +26,6 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -37,6 +33,8 @@ import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class IncludeFilterExecuterImpl implements FilterExecuter {
 
@@ -47,6 +45,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
   protected SegmentProperties segmentProperties;
   protected boolean isDimensionPresentInCurrentBlock = false;
   protected boolean isMeasurePresentInCurrentBlock = false;
+  protected SerializableComparator comparator;
   /**
    * is dimension column data is natural sorted
    */
@@ -57,7 +56,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       boolean isMeasure) {
 
     this.segmentProperties = segmentProperties;
-    if (isMeasure == false) {
+    if (!isMeasure) {
       this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
       dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
       FilterUtil
@@ -71,6 +70,8 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     } else {
       this.msrColumnEvaluatorInfo = msrColumnEvaluatorInfo;
       msrColumnExecutorInfo = new MeasureColumnExecuterFilterInfo();
+      comparator =
+          Comparator.getComparatorByDataTypeForMeasure(getMeasureDataType(msrColumnEvaluatorInfo));
       FilterUtil
           .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
               null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
@@ -160,25 +161,24 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     // Get the measure values from the chunk. compare sequentially with the
     // the filter values. The one that matches sets it Bitset.
     BitSet bitSet = new BitSet(rowsInPage);
-    byte[][] filterValues = msrColumnExecutorInfo.getFilterKeys();
+    Object[] filterValues = msrColumnExecutorInfo.getFilterKeys();
 
-    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i].length == 0) {
+      if (filterValues[i] == null) {
         BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.set(j);
         }
         continue;
       }
-      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
       for (int startIndex = 0; startIndex < rowsInPage; startIndex++) {
         // Check if filterValue[i] matches with measure Values.
         Object msrValue = DataTypeUtil
             .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
                 msrType, msrColumnEvaluatorInfo.getMeasure());
 
-        if (comparator.compare(msrValue, filter) == 0) {
+        if (comparator.compare(msrValue, filterValues[i]) == 0) {
           // This is a match.
           bitSet.set(startIndex);
         }
@@ -274,7 +274,6 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
           isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
 
     } else if (isMeasurePresentInCurrentBlock) {
-      filterValues = msrColumnExecutorInfo.getFilterKeys();
       columnIndex = msrColumnEvaluatorInfo.getColumnIndex();
       // blockIndex =
       // segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex) + segmentProperties
@@ -282,7 +281,8 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       blockIndex =
           segmentProperties.getMeasuresOrdinalToBlockMapping().get(columnIndex) + segmentProperties
               .getLastDimensionColOrdinal();
-      isScanRequired = isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues,
+      isScanRequired = isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex],
+          msrColumnExecutorInfo.getFilterKeys(),
           msrColumnEvaluatorInfo.getType());
     }
 
@@ -314,40 +314,18 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     return isScanRequired;
   }
 
-  private boolean isScanRequired(byte[] maxValue, byte[] minValue, byte[][] filterValue,
+  private boolean isScanRequired(byte[] maxValue, byte[] minValue, Object[] filterValue,
       DataType dataType) {
+    Object maxObject = DataTypeUtil.getMeasureObjectFromDataType(maxValue, dataType);
+    Object minObject = DataTypeUtil.getMeasureObjectFromDataType(minValue, dataType);
     for (int i = 0; i < filterValue.length; i++) {
-      if (filterValue[i].length == 0 || maxValue.length == 0 || minValue.length == 0) {
-        return isScanRequired(maxValue, minValue, filterValue);
-      } else {
-        switch (dataType) {
-          case DOUBLE:
-            double maxValueDouble = ByteBuffer.wrap(maxValue).getDouble();
-            double minValueDouble = ByteBuffer.wrap(minValue).getDouble();
-            double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
-            if (filterValueDouble <= maxValueDouble && filterValueDouble >= minValueDouble) {
-              return true;
-            }
-            break;
-          case INT:
-          case SHORT:
-          case LONG:
-            long maxValueLong = ByteBuffer.wrap(maxValue).getLong();
-            long minValueLong = ByteBuffer.wrap(minValue).getLong();
-            long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
-            if (filterValueLong <= maxValueLong && filterValueLong >= minValueLong) {
-              return true;
-            }
-            break;
-          case DECIMAL:
-            BigDecimal maxDecimal = DataTypeUtil.byteToBigDecimal(maxValue);
-            BigDecimal minDecimal = DataTypeUtil.byteToBigDecimal(minValue);
-            BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
-            if (filterDecimal.compareTo(maxDecimal) <= 0
-                && filterDecimal.compareTo(minDecimal) >= 0) {
-              return true;
-            }
-        }
+      // TODO handle min and max for null values.
+      if (filterValue[i] == null) {
+        return true;
+      }
+      if (comparator.compare(filterValue[i], maxObject) <= 0
+          && comparator.compare(filterValue[i], minObject) >= 0) {
+        return true;
       }
     }
     return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
index cc7e837..a19e617 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
@@ -18,13 +18,13 @@ package org.apache.carbondata.core.scan.filter.executer;
 
 public class MeasureColumnExecuterFilterInfo {
 
-  byte[][] filterKeys;
+  Object[] filterKeys;
 
-  public void setFilterKeys(byte[][] filterKeys) {
+  public void setFilterKeys(Object[] filterKeys) {
     this.filterKeys = filterKeys;
   }
 
-  public byte[][] getFilterKeys() {
+  public Object[] getFilterKeys() {
     return filterKeys;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
index 8f3d2b1..d72b955 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
@@ -28,6 +28,9 @@ import org.apache.carbondata.core.scan.filter.ColumnFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 /**
  * Abstract class for restructure
@@ -93,14 +96,17 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
     boolean isDefaultValuePresentInFilterValues = false;
     ColumnFilterInfo filterValues = measureColumnResolvedFilterInfo.getFilterValues();
     CarbonMeasure measure = measureColumnResolvedFilterInfo.getMeasure();
-    byte[] defaultValue = measure.getDefaultValue();
-    if (null == defaultValue) {
+    SerializableComparator comparator =
+        Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
+    Object defaultValue = null;
+    if (null != measure.getDefaultValue()) {
       // default value for case where user gives is Null condition
-      defaultValue = new byte[0];
+      defaultValue = DataTypeUtil
+          .getMeasureObjectFromDataType(measure.getDefaultValue(), measure.getDataType());
     }
-    List<byte[]> measureFilterValuesList = filterValues.getMeasuresFilterValuesList();
-    for (byte[] filterValue : measureFilterValuesList) {
-      int compare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(defaultValue, filterValue);
+    List<Object> measureFilterValuesList = filterValues.getMeasuresFilterValuesList();
+    for (Object filterValue : measureFilterValuesList) {
+      int compare = comparator.compare(defaultValue, filterValue);
       if (compare == 0) {
         isDefaultValuePresentInFilterValues = true;
         break;
@@ -108,4 +114,5 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
     }
     return isDefaultValuePresentInFilterValues;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 47c854e..a1774dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -17,10 +17,7 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -37,7 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -45,9 +41,14 @@ import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
   private byte[][] filterRangeValues;
+  private Object[] msrFilterRangeValues;
+  private SerializableComparator comparator;
+
 
   /**
    * flag to check whether default values is present in the filter value list
@@ -58,12 +59,18 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      Object[] msrFilterRangeValues,
       SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
         null);
     this.filterRangeValues = filterRangeValues;
+    this.msrFilterRangeValues = msrFilterRangeValues;
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
-    if (isDimensionPresentInCurrentBlock[0] == true) {
+    if (isMeasurePresentInCurrentBlock[0]) {
+      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
+      comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
+    }
+    if (isDimensionPresentInCurrentBlock[0]) {
       isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
           && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
     }
@@ -91,9 +98,9 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       byte[] defaultValue = measure.getDefaultValue();
       if (null != defaultValue) {
-        for (int k = 0; k < filterRangeValues.length; k++) {
-          int maxCompare =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
+        for (int k = 0; k < msrFilterRangeValues.length; k++) {
+          int maxCompare = comparator.compare(msrFilterRangeValues[k],
+              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType()));
           if (maxCompare < 0) {
             isDefaultValuePresentInFilter = true;
             break;
@@ -111,7 +118,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       if (isMeasurePresentInCurrentBlock[0]) {
         maxValue = blockMaxValue[measureBlocksIndex[0] + lastDimensionColOrdinal];
         isScanRequired =
-            isScanRequired(maxValue, filterRangeValues, msrColEvalutorInfoList.get(0).getType());
+            isScanRequired(maxValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         maxValue = blockMaxValue[dimensionBlocksIndex[0]];
         isScanRequired = isScanRequired(maxValue, filterRangeValues);
@@ -144,35 +151,16 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     return isScanRequired;
   }
 
-  private boolean isScanRequired(byte[] maxValue, byte[][] filterValue,
+  private boolean isScanRequired(byte[] maxValue, Object[] filterValue,
       DataType dataType) {
+    Object value = DataTypeUtil.getMeasureObjectFromDataType(maxValue, dataType);
     for (int i = 0; i < filterValue.length; i++) {
-      if (filterValue[i].length == 0 || maxValue.length == 0) {
-        return isScanRequired(maxValue, filterValue);
+      // TODO handle min and max for null values.
+      if (filterValue[i] == null) {
+        return true;
       }
-      switch (dataType) {
-        case DOUBLE:
-          double maxValueDouble = ByteBuffer.wrap(maxValue).getDouble();
-          double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
-          if (filterValueDouble < maxValueDouble) {
-            return true;
-          }
-          break;
-        case INT:
-        case SHORT:
-        case LONG:
-          long maxValueLong = ByteBuffer.wrap(maxValue).getLong();
-          long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
-          if (filterValueLong < maxValueLong) {
-            return true;
-          }
-          break;
-        case DECIMAL:
-          BigDecimal maxDecimal = DataTypeUtil.byteToBigDecimal(maxValue);
-          BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
-          if (filterDecimal.compareTo(maxDecimal) < 0) {
-            return true;
-          }
+      if (comparator.compare(filterValue[i], value) < 0) {
+        return true;
       }
     }
     return false;
@@ -230,10 +218,11 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMaxValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues,
+          if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.msrFilterRangeValues,
               msrColEvalutorInfoList.get(0).getType())) {
-            int compare = ByteUtil.UnsafeComparer.INSTANCE
-                .compareTo(filterRangeValues[0], rawColumnChunk.getMinValues()[i]);
+            int compare = comparator.compare(msrFilterRangeValues[0], DataTypeUtil
+                .getMeasureObjectFromDataType(rawColumnChunk.getMinValues()[i],
+                    msrColEvalutorInfoList.get(0).getType()));
             if (compare < 0) {
               BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
               bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
@@ -260,24 +249,23 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
   private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    byte[][] filterValues = this.filterRangeValues;
+    Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
-    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i].length == 0) {
+      if (filterValues[i] == null) {
         BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.set(j);
         }
         continue;
       }
-      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         Object msrValue = DataTypeUtil
             .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
                 msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
-        if (comparator.compare(msrValue, filter) > 0) {
+        if (comparator.compare(msrValue, filterValues[i]) > 0) {
           // This is a match.
           bitSet.set(startIndex);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 6b665b2..7823e34 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -17,10 +17,7 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -37,7 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -45,11 +41,14 @@ import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilterExecuterImpl {
 
   protected byte[][] filterRangeValues;
-
+  private Object[] msrFilterRangeValues;
+  private SerializableComparator comparator;
   /**
    * flag to check whether default values is present in the filter value list
    */
@@ -60,12 +59,16 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
-      SegmentProperties segmentProperties) {
+      Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
         null);
     this.filterRangeValues = filterRangeValues;
+    this.msrFilterRangeValues = msrFilterRangeValues;
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
-
+    if (isMeasurePresentInCurrentBlock[0]) {
+      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
+      comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
+    }
     if (isDimensionPresentInCurrentBlock[0] == true) {
       isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
           && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
@@ -94,9 +97,9 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       byte[] defaultValue = measure.getDefaultValue();
       if (null != defaultValue) {
-        for (int k = 0; k < filterRangeValues.length; k++) {
-          int maxCompare =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
+        for (int k = 0; k < msrFilterRangeValues.length; k++) {
+          int maxCompare = comparator.compare(msrFilterRangeValues[k],
+              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType()));
           if (maxCompare <= 0) {
             isDefaultValuePresentInFilter = true;
             break;
@@ -114,7 +117,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       if (isMeasurePresentInCurrentBlock[0]) {
         maxValue = blockMaxValue[measureBlocksIndex[0] + lastDimensionColOrdinal];
         isScanRequired =
-            isScanRequired(maxValue, filterRangeValues, msrColEvalutorInfoList.get(0).getType());
+            isScanRequired(maxValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         maxValue = blockMaxValue[dimensionBlocksIndex[0]];
         isScanRequired = isScanRequired(maxValue, filterRangeValues);
@@ -146,35 +149,16 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     return isScanRequired;
   }
 
-  private boolean isScanRequired(byte[] maxValue, byte[][] filterValue,
+  private boolean isScanRequired(byte[] maxValue, Object[] filterValue,
       DataType dataType) {
+    Object value = DataTypeUtil.getMeasureObjectFromDataType(maxValue, dataType);
     for (int i = 0; i < filterValue.length; i++) {
-      if (filterValue[i].length == 0 || maxValue.length == 0) {
-        return isScanRequired(maxValue, filterValue);
+      // TODO handle min and max for null values.
+      if (filterValue[i] == null) {
+        return true;
       }
-      switch (dataType) {
-        case DOUBLE:
-          double maxValueDouble = ByteBuffer.wrap(maxValue).getDouble();
-          double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
-          if (filterValueDouble <= maxValueDouble) {
-            return true;
-          }
-          break;
-        case INT:
-        case SHORT:
-        case LONG:
-          long maxValueLong = ByteBuffer.wrap(maxValue).getLong();
-          long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
-          if (filterValueLong <= maxValueLong) {
-            return true;
-          }
-          break;
-        case DECIMAL:
-          BigDecimal maxDecimal = DataTypeUtil.byteToBigDecimal(maxValue);
-          BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
-          if (filterDecimal.compareTo(maxDecimal) <= 0) {
-            return true;
-          }
+      if (comparator.compare(filterValue[i], value) <= 0) {
+        return true;
       }
     }
     return false;
@@ -233,10 +217,11 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMaxValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues,
+          if (isScanRequired(rawColumnChunk.getMaxValues()[i], this.msrFilterRangeValues,
               msrColEvalutorInfoList.get(0).getType())) {
-            int compare = ByteUtil.UnsafeComparer.INSTANCE
-                .compareTo(filterRangeValues[0], rawColumnChunk.getMinValues()[i]);
+            int compare = comparator.compare(msrFilterRangeValues[0], DataTypeUtil
+                .getMeasureObjectFromDataType(rawColumnChunk.getMinValues()[i],
+                    msrColEvalutorInfoList.get(0).getType()));
             if (compare <= 0) {
               BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
               bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
@@ -263,24 +248,23 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
   private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    byte[][] filterValues = this.filterRangeValues;
+    Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
-    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i].length == 0) {
+      if (filterValues[i] == null) {
         BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.set(j);
         }
         continue;
       }
-      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         Object msrValue = DataTypeUtil
             .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
                 msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
-        if (comparator.compare(msrValue, filter) >= 0) {
+        if (comparator.compare(msrValue, filterValues[i]) >= 0) {
           // This is a match.
           bitSet.set(startIndex);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 6d6ca19..422a099 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -17,10 +17,7 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -39,7 +36,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -47,9 +43,13 @@ import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilterExecuterImpl {
   protected byte[][] filterRangeValues;
+  protected Object[] msrFilterRangeValues;
+  protected SerializableComparator comparator;
 
   /**
    * flag to check whether default values is present in the filter value list
@@ -60,13 +60,18 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
-      SegmentProperties segmentProperties) {
+      Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
         null);
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
     this.filterRangeValues = filterRangeValues;
+    this.msrFilterRangeValues = msrFilterRangeValues;
+    if (isMeasurePresentInCurrentBlock[0]) {
+      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
+      comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
+    }
     ifDefaultValueMatchesFilter();
-    if (isDimensionPresentInCurrentBlock[0] == true) {
+    if (isDimensionPresentInCurrentBlock[0]) {
       isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
           && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
     }
@@ -93,9 +98,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       byte[] defaultValue = measure.getDefaultValue();
       if (null != defaultValue) {
-        for (int k = 0; k < filterRangeValues.length; k++) {
-          int maxCompare =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
+        for (int k = 0; k < msrFilterRangeValues.length; k++) {
+          int maxCompare = comparator.compare(msrFilterRangeValues[k],
+              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType()));
           if (maxCompare >= 0) {
             isDefaultValuePresentInFilter = true;
             break;
@@ -113,7 +118,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       if (isMeasurePresentInCurrentBlock[0]) {
         minValue = blockMinValue[measureBlocksIndex[0] + lastDimensionColOrdinal];
         isScanRequired =
-            isScanRequired(minValue, filterRangeValues, msrColEvalutorInfoList.get(0).getType());
+            isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         minValue = blockMinValue[dimensionBlocksIndex[0]];
         isScanRequired = isScanRequired(minValue, filterRangeValues);
@@ -144,35 +149,17 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     return isScanRequired;
   }
 
-  private boolean isScanRequired(byte[] minValue, byte[][] filterValue,
+  private boolean isScanRequired(byte[] minValue, Object[] filterValue,
       DataType dataType) {
+    Object value =
+        DataTypeUtil.getMeasureObjectFromDataType(minValue, dataType);
     for (int i = 0; i < filterValue.length; i++) {
-      if (filterValue[i].length == 0 || minValue.length == 0) {
-        return isScanRequired(minValue, filterValue);
+      // TODO handle min and max for null values.
+      if (filterValue[i] == null) {
+        return true;
       }
-      switch (dataType) {
-        case DOUBLE:
-          double minValueDouble = ByteBuffer.wrap(minValue).getDouble();
-          double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
-          if (filterValueDouble >= minValueDouble) {
-            return true;
-          }
-          break;
-        case INT:
-        case SHORT:
-        case LONG:
-          long minValueLong = ByteBuffer.wrap(minValue).getLong();
-          long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
-          if (filterValueLong >= minValueLong) {
-            return true;
-          }
-          break;
-        case DECIMAL:
-          BigDecimal minDecimal = DataTypeUtil.byteToBigDecimal(minValue);
-          BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
-          if (filterDecimal.compareTo(minDecimal) >= 0) {
-            return true;
-          }
+      if (comparator.compare(filterValue[i], value) >= 0) {
+        return true;
       }
     }
     return false;
@@ -222,7 +209,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMinValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues,
+          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.msrFilterRangeValues,
               msrColEvalutorInfoList.get(0).getType())) {
             BitSet bitSet =
                 getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
@@ -244,24 +231,23 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
   private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    byte[][] filterValues = this.filterRangeValues;
+    Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
-    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i].length == 0) {
+      if (filterValues[i] == null) {
         BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.set(j);
         }
         continue;
       }
-      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         Object msrValue = DataTypeUtil
             .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
                 msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
-        if (comparator.compare(msrValue, filter) <= 0) {
+        if (comparator.compare(msrValue, filterValues[i]) <= 0) {
           // This is a match.
           bitSet.set(startIndex);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4e835095/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 98e4a78..4f5f1e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -17,10 +17,7 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -39,7 +36,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
@@ -47,9 +43,13 @@ import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
   private byte[][] filterRangeValues;
+  private Object[] msrFilterRangeValues;
+  private SerializableComparator comparator;
 
   /**
    * flag to check whether default values is present in the filter value list
@@ -60,11 +60,16 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
-      SegmentProperties segmentProperties) {
+      Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
         null);
     this.filterRangeValues = filterRangeValues;
+    this.msrFilterRangeValues = msrFilterRangeValues;
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
+    if (isMeasurePresentInCurrentBlock[0]) {
+      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
+      comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
+    }
     ifDefaultValueMatchesFilter();
     if (isDimensionPresentInCurrentBlock[0] == true) {
       isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
@@ -93,9 +98,11 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       byte[] defaultValue = measure.getDefaultValue();
       if (null != defaultValue) {
-        for (int k = 0; k < filterRangeValues.length; k++) {
+        for (int k = 0; k < msrFilterRangeValues.length; k++) {
+          Object convertedValue =
+              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType());
           int maxCompare =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
+              comparator.compare(msrFilterRangeValues[k], convertedValue);
           if (maxCompare > 0) {
             isDefaultValuePresentInFilter = true;
             break;
@@ -113,7 +120,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       if (isMeasurePresentInCurrentBlock[0]) {
         minValue = blockMinValue[measureBlocksIndex[0] + lastDimensionColOrdinal];
         isScanRequired =
-            isScanRequired(minValue, filterRangeValues, msrColEvalutorInfoList.get(0).getType());
+            isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
       } else {
         minValue = blockMinValue[dimensionBlocksIndex[0]];
         isScanRequired = isScanRequired(minValue, filterRangeValues);
@@ -145,35 +152,16 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     return isScanRequired;
   }
 
-  private boolean isScanRequired(byte[] minValue, byte[][] filterValue,
+  private boolean isScanRequired(byte[] minValue, Object[] filterValue,
       DataType dataType) {
+    Object value = DataTypeUtil.getMeasureObjectFromDataType(minValue, dataType);
     for (int i = 0; i < filterValue.length; i++) {
-      if (filterValue[i].length == 0 || minValue.length == 0) {
-        return isScanRequired(minValue, filterValue);
+      // TODO handle min and max for null values.
+      if (filterValue[i] == null) {
+        return true;
       }
-      switch (dataType) {
-        case DOUBLE:
-          double minValueDouble = ByteBuffer.wrap(minValue).getDouble();
-          double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
-          if (filterValueDouble > minValueDouble) {
-            return true;
-          }
-          break;
-        case INT:
-        case SHORT:
-        case LONG:
-          long minValueLong = ByteBuffer.wrap(minValue).getLong();
-          long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
-          if (filterValueLong > minValueLong) {
-            return true;
-          }
-          break;
-        case DECIMAL:
-          BigDecimal minDecimal = DataTypeUtil.byteToBigDecimal(minValue);
-          BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
-          if (filterDecimal.compareTo(minDecimal) > 0) {
-            return true;
-          }
+      if (comparator.compare(filterValue[i], value) > 0) {
+        return true;
       }
     }
     return false;
@@ -223,7 +211,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
       for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
         if (rawColumnChunk.getMinValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues,
+          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.msrFilterRangeValues,
               msrColEvalutorInfoList.get(0).getType())) {
             BitSet bitSet =
                 getFilteredIndexesForMeasures(rawColumnChunk.convertToMeasureColDataChunk(i),
@@ -245,24 +233,23 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
   private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    byte[][] filterValues = this.filterRangeValues;
+    Object[] filterValues = this.msrFilterRangeValues;
     DataType msrType = msrColEvalutorInfoList.get(0).getType();
-    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
     for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i].length == 0) {
+      if (filterValues[i] == null) {
         BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
         for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
           bitSet.set(j);
         }
         continue;
       }
-      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
       for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
         Object msrValue = DataTypeUtil
             .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
                 msrType, msrColEvalutorInfoList.get(0).getMeasure());
 
-        if (comparator.compare(msrValue, filter) < 0) {
+        if (comparator.compare(msrValue, filterValues[i]) < 0) {
           // This is a match.
           bitSet.set(startIndex);
         }