You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:15 UTC

[28/50] [abbrv] incubator-carbondata git commit: [CARBONDATA-13] Direct surrogate key range filters not able to prune blocks (#721)

[CARBONDATA-13] Direct surrogate key range filters not able to prune blocks (#721)

* When Range filters(>,<,>=,<=) is applied Direct Surrogate Column block and blocklet pruning is supported
* Binary search inside block let for range filters.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/462eb393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/462eb393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/462eb393

Branch: refs/heads/master
Commit: 462eb393baf99d3b0a7f49869f5a3d95f0e2fd9e
Parents: 9d846e4
Author: sujith71955 <su...@gmail.com>
Authored: Mon Jun 27 22:34:51 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Mon Jun 27 22:34:51 2016 +0530

----------------------------------------------------------------------
 .../org/carbondata/core/util/CarbonUtil.java    |  45 +++++-
 .../executer/ExcludeFilterExecuterImpl.java     |   4 +-
 .../executer/IncludeFilterExecuterImpl.java     |   4 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  | 147 ++++++++++++++++++-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java | 139 +++++++++++++++++-
 ...velRangeLessThanEqualFilterExecuterImpl.java | 137 ++++++++++++++++-
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 141 +++++++++++++++++-
 .../RowLevelRangeTypeExecuterFacory.java        |  13 +-
 .../filter/resolver/AndFilterResolverImpl.java  |   5 +-
 .../resolver/ConditionalFilterResolverImpl.java |  19 +--
 .../filter/resolver/FilterResolverIntf.java     |   5 +-
 .../resolver/LogicalFilterResolverImpl.java     |   3 +-
 .../RowLevelRangeFilterResolverImpl.java        |  55 ++++++-
 .../filters/FilterExpressionProcessor.java      |  15 +-
 .../filters/measurefilter/util/FilterUtil.java  |   3 +-
 integration/spark/src/test/resources/data2.csv  |   6 +-
 .../HighCardinalityDataTypesTestCase.scala      |   4 +
 .../filterexpr/FilterProcessorTestCase.scala    |  78 +++++++---
 18 files changed, 745 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
index 886e136..12cbb6b 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
@@ -888,7 +888,50 @@ public final class CarbonUtil {
         return currentIndex;
       }
     }
-    return -1;
+    return -(low + 1);
+  }
+
+  /**
+   * Method will identify the value which is lesser than the pivot element
+   * on which range filter is been applied.
+   *
+   * @param currentIndex
+   * @param dimColumnDataChunk
+   * @param compareValue
+   * @return index value
+   */
+  public static int nextLesserValueToTarget(int currentIndex,
+      FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue) {
+    while (currentIndex - 1 >= 0 && ByteUtil.UnsafeComparer.INSTANCE
+        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
+            (currentIndex - 1) * compareValue.length, compareValue.length, compareValue, 0,
+            compareValue.length) >= 0) {
+      --currentIndex;
+    }
+
+    return --currentIndex;
+  }
+
+  /**
+   * Method will identify the value which is greater than the pivot element
+   * on which range filter is been applied.
+   *
+   * @param currentIndex
+   * @param dimColumnDataChunk
+   * @param compareValue
+   * @param numerOfRows
+   * @return index value
+   */
+  public static int nextGreaterValueToTarget(int currentIndex,
+      FixedLengthDimensionDataChunk dimColumnDataChunk, byte[] compareValue, int numerOfRows) {
+    while (currentIndex + 1 < numerOfRows && ByteUtil.UnsafeComparer.INSTANCE
+        .compareTo(dimColumnDataChunk.getCompleteDataChunk(),
+            (currentIndex + 1) * compareValue.length, compareValue.length, compareValue, 0,
+            compareValue.length) <= 0) {
+      ++currentIndex;
+    }
+
+    return ++currentIndex;
   }
 
   public static int[] getUnCompressColumnIndex(int totalLength, byte[] columnIndexData,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/ExcludeFilterExecuterImpl.java
index 9de8396..f2c6de3 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/ExcludeFilterExecuterImpl.java
@@ -140,7 +140,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
       startKey = CarbonUtil
           .getFirstIndexUsingBinarySearch(dimColumnDataChunk, startIndex, numerOfRows - 1,
               filterValues[i]);
-      if (startKey == -1) {
+      if (startKey < 0) {
         continue;
       }
       bitSet.flip(columnIndex[startKey]);
@@ -175,7 +175,7 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
       startKey = CarbonUtil
           .getFirstIndexUsingBinarySearch(dimColumnDataChunk, startIndex, numerOfRows - 1,
               filterValues[k]);
-      if (startKey == -1) {
+      if (startKey < 0) {
         continue;
       }
       bitSet.flip(startKey);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/IncludeFilterExecuterImpl.java
index a15733c..5123ce7 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/IncludeFilterExecuterImpl.java
@@ -131,7 +131,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
       start = CarbonUtil
           .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
               filterValues[i]);
-      if (start == -1) {
+      if (start < 0) {
         continue;
       }
       bitSet.set(columnIndex[start]);
@@ -168,7 +168,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
         start = CarbonUtil.getFirstIndexUsingBinarySearch(
             (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
             filterValues[k]);
-        if (start == -1) {
+        if (start < 0) {
           continue;
         }
         bitSet.set(start);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 88290d9..571f046 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -22,20 +22,30 @@ import java.util.BitSet;
 import java.util.List;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.carbon.processor.BlocksChunkHolder;
 import org.carbondata.query.expression.Expression;
+import org.carbondata.query.expression.exception.FilterUnsupportedException;
 import org.carbondata.query.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.carbondata.query.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 
 public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
   private byte[][] filterRangeValues;
+  private SegmentProperties segmentProperties;
 
   public RowLevelRangeGrtThanFiterExecuterImpl(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
     this.filterRangeValues = filterRangeValues;
+    this.segmentProperties = segmentProperties;
   }
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
@@ -49,7 +59,6 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       // so filter-max should be negative
       int maxCompare =
           ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
-
       // if any filter value is in range than this block needs to be
       // scanned means always less than block max range.
       if (maxCompare < 0) {
@@ -63,4 +72,138 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     return bitSet;
 
   }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i]);
+      start = CarbonUtil
+          .nextGreaterValueToTarget(start, (FixedLengthDimensionDataChunk) dimensionColumnDataChunk,
+              filterValues[i], numerOfRows);
+      // Logic will handle the case where the range filter member is not present in block
+      // in this case the binary search will return the index from where the bit sets will be
+      // set inorder to apply filters. this is greater than filter so the range will be taken
+      // from the next element which is greater than filter member.
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its > filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            > 0) {
+          start = start + 1;
+        }
+      }
+
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+        bitSet.set(columnIndex[j]);
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k]);
+        start = CarbonUtil.nextGreaterValueToTarget(start,
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k], numerOfRows);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its > filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) > 0) {
+            start = start + 1;
+          }
+        }
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 4d7d962..e390b8d 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -22,21 +22,31 @@ import java.util.BitSet;
 import java.util.List;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.carbon.processor.BlocksChunkHolder;
 import org.carbondata.query.expression.Expression;
+import org.carbondata.query.expression.exception.FilterUnsupportedException;
 import org.carbondata.query.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.carbondata.query.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 
 public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilterExecuterImpl {
 
-  private byte[][] filterRangeValues;
+  protected byte[][] filterRangeValues;
+  private SegmentProperties segmentProperties;
 
   public RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
     this.filterRangeValues = filterRangeValues;
+    this.segmentProperties = segmentProperties;
   }
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
@@ -63,4 +73,129 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     return bitSet;
 
   }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i]);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its >= filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            >= 0) {
+          start = start + 1;
+        }
+      }
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+
+        bitSet.set(columnIndex[j]);
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k]);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its >= filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k],dimensionColumnDataChunk.getChunkData(start))
+              >= 0) {
+            start = start + 1;
+          }
+        }
+
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index e5f10ef..1f33c8c 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -22,20 +22,30 @@ import java.util.BitSet;
 import java.util.List;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.carbon.processor.BlocksChunkHolder;
 import org.carbondata.query.expression.Expression;
+import org.carbondata.query.expression.exception.FilterUnsupportedException;
 import org.carbondata.query.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.carbondata.query.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 
 public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilterExecuterImpl {
-  private byte[][] filterRangeValues;
+  protected byte[][] filterRangeValues;
+  private SegmentProperties segmentProperties;
 
   public RowLevelRangeLessThanEqualFilterExecuterImpl(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
     this.filterRangeValues = filterRangeValues;
+    this.segmentProperties = segmentProperties;
   }
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
@@ -63,4 +73,127 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
 
   }
 
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i]);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its >= filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            <= 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= 0; j--) {
+        bitSet.set(columnIndex[j]);
+        last--;
+      }
+      startIndex = last;
+      if (startIndex <= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k]);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its <= filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
+              <= 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= 0; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 56f7393..9bb0420 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -22,20 +22,30 @@ import java.util.BitSet;
 import java.util.List;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.carbondata.core.util.ByteUtil;
+import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.query.carbon.processor.BlocksChunkHolder;
 import org.carbondata.query.expression.Expression;
+import org.carbondata.query.expression.exception.FilterUnsupportedException;
 import org.carbondata.query.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.carbondata.query.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 
 public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
   private byte[][] filterRangeValues;
+  private SegmentProperties segmentProperties;
 
   public RowLevelRangeLessThanFiterExecuterImpl(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
     this.filterRangeValues = filterRangeValues;
+    this.segmentProperties = segmentProperties;
   }
 
   @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
@@ -62,4 +72,133 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     return bitSet;
 
   }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i]);
+      // Logic will handle the case where the range filter member is not present in block
+      // in this case the binary search will return the index from where the bit sets will be
+      // set inorder to apply filters. this is Lesser than filter so the range will be taken
+      // from the prev element which is Lesser than filter member.
+      start = CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[i]);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its < filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            < 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= 0; j--) {
+        bitSet.set(columnIndex[j]);
+        last--;
+      }
+      startIndex = last;
+      if (startIndex >= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k]);
+        start = CarbonUtil.nextLesserValueToTarget(start,
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k]);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its < filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) < 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= 0; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeTypeExecuterFacory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeTypeExecuterFacory.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeTypeExecuterFacory.java
index 5a5f2d2..25e8a3f 100644
--- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeTypeExecuterFacory.java
+++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelRangeTypeExecuterFacory.java
@@ -18,6 +18,7 @@
  */
 package org.carbondata.query.filter.executer;
 
+import org.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.carbondata.query.carbonfilterinterface.FilterExecuterType;
 import org.carbondata.query.filter.resolver.FilterResolverIntf;
 import org.carbondata.query.filter.resolver.RowLevelRangeFilterResolverImpl;
@@ -33,11 +34,13 @@ public class RowLevelRangeTypeExecuterFacory {
    * filter tree resolver type.
    *
    * @param filterExpressionResolverTree
+   * @param segmentProperties
    * @param dataType                     DataType
    * @return the generator instance
    */
   public static RowLevelFilterExecuterImpl getRowLevelRangeTypeExecuter(
-      FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree) {
+      FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree,
+      SegmentProperties segmentProperties) {
     switch (filterExecuterType) {
 
       case ROWLEVEL_LESSTHAN:
@@ -49,7 +52,7 @@ public class RowLevelRangeTypeExecuterFacory {
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues());
+                .getFilterRangeValues(segmentProperties), segmentProperties);
       case ROWLEVEL_LESSTHAN_EQUALTO:
         return new RowLevelRangeLessThanEqualFilterExecuterImpl(
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
@@ -59,7 +62,7 @@ public class RowLevelRangeTypeExecuterFacory {
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues());
+                .getFilterRangeValues(segmentProperties), segmentProperties);
       case ROWLEVEL_GREATERTHAN_EQUALTO:
         return new RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
@@ -69,7 +72,7 @@ public class RowLevelRangeTypeExecuterFacory {
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues());
+                .getFilterRangeValues(segmentProperties), segmentProperties);
       case ROWLEVEL_GREATERTHAN:
         return new RowLevelRangeGrtThanFiterExecuterImpl(
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
@@ -79,7 +82,7 @@ public class RowLevelRangeTypeExecuterFacory {
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
             ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues());
+                .getFilterRangeValues(segmentProperties), segmentProperties);
       default:
         // Scenario wont come logic must break
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/resolver/AndFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/AndFilterResolverImpl.java b/core/src/main/java/org/carbondata/query/filter/resolver/AndFilterResolverImpl.java
index 1bebae2..442b17e 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/AndFilterResolverImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/AndFilterResolverImpl.java
@@ -22,12 +22,13 @@ import java.util.SortedMap;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbonfilterinterface.ExpressionType;
 
 public class AndFilterResolverImpl extends LogicalFilterResolverImpl {
 
   /**
-   *
+   *i
    */
   private static final long serialVersionUID = -761688076874662001L;
 
@@ -44,7 +45,7 @@ public class AndFilterResolverImpl extends LogicalFilterResolverImpl {
 
   @Override public void getEndKey(SegmentProperties segmentProperties,
       AbsoluteTableIdentifier tableIdentifier, long[] endKeys,
-      SortedMap<Integer, byte[]> noDicEndKeys) {
+      SortedMap<Integer, byte[]> noDicEndKeys) throws QueryExecutionException {
     leftEvalutor.getEndKey(segmentProperties, tableIdentifier, endKeys, noDicEndKeys);
     rightEvalutor.getEndKey(segmentProperties, tableIdentifier, endKeys, noDicEndKeys);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/resolver/ConditionalFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/ConditionalFilterResolverImpl.java b/core/src/main/java/org/carbondata/query/filter/resolver/ConditionalFilterResolverImpl.java
index 59ef1be..af17609 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/ConditionalFilterResolverImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/ConditionalFilterResolverImpl.java
@@ -132,8 +132,7 @@ public class ConditionalFilterResolverImpl implements FilterResolverIntf {
       metadata.setColumnExpression(columnList.get(0));
       metadata.setExpression(exp);
       metadata.setIncludeFilter(isIncludeFilter);
-      if (!columnList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY) || columnList.get(0)
-          .getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      if (!columnList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
         dimColResolvedFilterInfo.populateFilterInfoBasedOnColumnType(
             FilterInfoTypeVisitorFactory.getResolvedFilterInfoVisitor(columnList.get(0)), metadata);
 
@@ -199,20 +198,16 @@ public class ConditionalFilterResolverImpl implements FilterResolverIntf {
    * method will get the start key based on the filter surrogates
    *
    * @return end IndexKey
+   * @throws QueryExecutionException
    */
   @Override public void getEndKey(SegmentProperties segmentProperties,
       AbsoluteTableIdentifier absoluteTableIdentifier, long[] endKeys,
-      SortedMap<Integer, byte[]> setOfEndKeyByteArray) {
+      SortedMap<Integer, byte[]> setOfEndKeyByteArray) throws QueryExecutionException {
     if (null == dimColResolvedFilterInfo.getEndIndexKey()) {
-      try {
-        FilterUtil.getEndKey(dimColResolvedFilterInfo.getDimensionResolvedFilterInstance(),
-            absoluteTableIdentifier, endKeys, segmentProperties);
-        FilterUtil.getEndKeyForNoDictionaryDimension(dimColResolvedFilterInfo, segmentProperties,
-            setOfEndKeyByteArray);
-      } catch (QueryExecutionException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+      FilterUtil.getEndKey(dimColResolvedFilterInfo.getDimensionResolvedFilterInstance(),
+          absoluteTableIdentifier, endKeys, segmentProperties);
+      FilterUtil.getEndKeyForNoDictionaryDimension(dimColResolvedFilterInfo, segmentProperties,
+          setOfEndKeyByteArray);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/resolver/FilterResolverIntf.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/FilterResolverIntf.java b/core/src/main/java/org/carbondata/query/filter/resolver/FilterResolverIntf.java
index a8f1df5..4537ba6 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/FilterResolverIntf.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/FilterResolverIntf.java
@@ -23,6 +23,7 @@ import java.util.SortedMap;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbonfilterinterface.FilterExecuterType;
 import org.carbondata.query.expression.Expression;
 import org.carbondata.query.expression.exception.FilterUnsupportedException;
@@ -81,9 +82,11 @@ public interface FilterResolverIntf extends Serializable {
    * @param setOfEndKeyByteArray
    * @param endKeys
    * @return
+   * @throws QueryExecutionException
    */
   void getEndKey(SegmentProperties segmentProperties, AbsoluteTableIdentifier tableIdentifier,
-      long[] endKeys, SortedMap<Integer, byte[]> setOfEndKeyByteArray);
+      long[] endKeys, SortedMap<Integer, byte[]> setOfEndKeyByteArray)
+      throws QueryExecutionException;
 
   /**
    * API will return the filter executer type which will be used to evaluate

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/resolver/LogicalFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/LogicalFilterResolverImpl.java b/core/src/main/java/org/carbondata/query/filter/resolver/LogicalFilterResolverImpl.java
index 8bf1395..46b03e4 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/LogicalFilterResolverImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/LogicalFilterResolverImpl.java
@@ -22,6 +22,7 @@ import java.util.SortedMap;
 
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbonfilterinterface.ExpressionType;
 import org.carbondata.query.carbonfilterinterface.FilterExecuterType;
 import org.carbondata.query.expression.Expression;
@@ -87,7 +88,7 @@ public class LogicalFilterResolverImpl implements FilterResolverIntf {
 
   @Override public void getEndKey(SegmentProperties segmentProperties,
       AbsoluteTableIdentifier tableIdentifier, long[] endKeys,
-      SortedMap<Integer, byte[]> setOfEndKeyByteArray) {
+      SortedMap<Integer, byte[]> setOfEndKeyByteArray) throws QueryExecutionException {
   }
 
   @Override public FilterExecuterType getFilterExecuterType() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filter/resolver/RowLevelRangeFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/carbondata/query/filter/resolver/RowLevelRangeFilterResolverImpl.java
index a9add19..edb4ece 100644
--- a/core/src/main/java/org/carbondata/query/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/carbondata/query/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -28,8 +28,11 @@ import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.carbondata.core.util.ByteUtil;
 import org.carbondata.query.carbon.executor.exception.QueryExecutionException;
 import org.carbondata.query.carbonfilterinterface.FilterExecuterType;
@@ -38,6 +41,7 @@ import org.carbondata.query.expression.Expression;
 import org.carbondata.query.expression.ExpressionResult;
 import org.carbondata.query.expression.conditional.BinaryConditionalExpression;
 import org.carbondata.query.expression.exception.FilterIllegalMemberException;
+import org.carbondata.query.expression.exception.FilterUnsupportedException;
 import org.carbondata.query.expression.logical.BinaryLogicalExpression;
 import org.carbondata.query.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.carbondata.query.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
@@ -67,19 +71,25 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
   }
 
   /**
-   * This method will return the filter values which is present in the range leve
+   * This method will return the filter values which is present in the range level
    * conditional expressions.
    *
    * @return
    */
-  public byte[][] getFilterRangeValues() {
-    List<byte[]> filterValuesList = new ArrayList<byte[]>();
-    if (null != dimColEvaluatorInfoList.get(0).getFilterValues()) {
-      filterValuesList =
+  public byte[][] getFilterRangeValues(SegmentProperties segmentProperties) {
+
+    if (null != dimColEvaluatorInfoList.get(0).getFilterValues() && !dimColEvaluatorInfoList.get(0)
+        .getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      List<byte[]> noDictFilterValuesList =
           dimColEvaluatorInfoList.get(0).getFilterValues().getNoDictionaryFilterValuesList();
-      return filterValuesList.toArray((new byte[filterValuesList.size()][]));
+      return noDictFilterValuesList.toArray((new byte[noDictFilterValuesList.size()][]));
+    } else if (null != dimColEvaluatorInfoList.get(0).getFilterValues() && dimColEvaluatorInfoList
+        .get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(),
+          this.dimColEvaluatorInfoList.get(0).getDimension(),
+          segmentProperties.getDimensionKeyGenerator());
     }
-    return filterValuesList.toArray((new byte[filterValuesList.size()][]));
+    return null;
 
   }
 
@@ -91,6 +101,7 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
   public void getStartKey(SegmentProperties segmentProperties, long[] startKey,
       SortedMap<Integer, byte[]> noDictStartKeys) {
     if (null == dimColEvaluatorInfoList.get(0).getStarIndexKey()) {
+      FilterUtil.getStartKey(dimColEvaluatorInfoList.get(0), segmentProperties, startKey);
       FilterUtil
           .getStartKeyForNoDictionaryDimension(dimColEvaluatorInfoList.get(0), segmentProperties,
               noDictStartKeys);
@@ -171,7 +182,11 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
           dimColumnEvaluatorInfo.setRowIndex(index++);
           dimColumnEvaluatorInfo.setDimension(columnExpression.getDimension());
           dimColumnEvaluatorInfo.setDimensionExistsInCurrentSilce(false);
-          filterInfo.setFilterListForNoDictionaryCols(getNoDictionaryRangeValues());
+          if (columnExpression.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            filterInfo.setFilterList(getDirectSurrogateValues(columnExpression));
+          } else {
+            filterInfo.setFilterListForNoDictionaryCols(getNoDictionaryRangeValues());
+          }
           filterInfo.setIncludeFilter(isIncludeFilter);
           dimColumnEvaluatorInfo.setFilterValues(filterInfo);
           dimColumnEvaluatorInfo
@@ -191,6 +206,30 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
     }
   }
 
+  private List<Integer> getDirectSurrogateValues(ColumnExpression columnExpression) {
+    List<ExpressionResult> listOfExpressionResults = new ArrayList<ExpressionResult>(20);
+    DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+        .getDirectDictionaryGenerator(columnExpression.getDimension().getDataType());
+
+    if (this.getFilterExpression() instanceof BinaryConditionalExpression) {
+      listOfExpressionResults =
+          ((BinaryConditionalExpression) this.getFilterExpression()).getLiterals();
+    }
+    List<Integer> filterValuesList = new ArrayList<Integer>(20);
+    try {
+      // if any filter member provided by user is invalid throw error else
+      // system can display inconsistent result.
+      for (ExpressionResult result : listOfExpressionResults) {
+        filterValuesList.add(directDictionaryGenerator
+            .generateDirectSurrogateKey(result.getString(),
+                CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+      }
+    } catch (FilterIllegalMemberException e) {
+      new FilterUnsupportedException(e);
+    }
+    return filterValuesList;
+  }
+
   /**
    * Method will return the DimColumnResolvedFilterInfo instance which consists
    * the mapping of the respective dimension and its surrogates involved in

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filters/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filters/FilterExpressionProcessor.java b/core/src/main/java/org/carbondata/query/filters/FilterExpressionProcessor.java
index eea27e9..dded529 100644
--- a/core/src/main/java/org/carbondata/query/filters/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/carbondata/query/filters/FilterExpressionProcessor.java
@@ -123,7 +123,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
     DataRefNode startBlock = blockFinder.findFirstDataBlock(btreeNode, searchStartKey);
     DataRefNode endBlock = blockFinder.findLastDataBlock(btreeNode, searchEndKey);
     FilterExecuter filterExecuter =
-            FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties());
+        FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties());
     while (startBlock != endBlock) {
       addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock,
           tableSegment.getSegmentProperties());
@@ -185,6 +185,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
    *
    * @param filterResolverTree
    * @param tableIdentifier
+   * @throws FilterUnsupportedException
    * @throws QueryExecutionException
    */
   private void traverseAndResolveTree(FilterResolverIntf filterResolverTree,
@@ -285,11 +286,6 @@ public class FilterExpressionProcessor implements FilterProcessor {
                 == ExpressionType.GREATERTHAN_EQUALTO
                 || currentCondExpression.getFilterExpressionType()
                 == ExpressionType.LESSTHAN_EQUALTO) {
-              if (currentCondExpression.getColumnList().get(0).getCarbonColumn()
-                  .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-                return new RowLevelFilterResolverImpl(expression, isExpressionResolve, true,
-                    tableIdentifier);
-              }
               return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, true,
                   tableIdentifier);
             }
@@ -308,10 +304,9 @@ public class FilterExpressionProcessor implements FilterProcessor {
           if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
               .hasEncoding(Encoding.DICTIONARY)) {
             if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
-                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight())
-                || (FilterUtil
-                    .checkIfExpressionContainsUnknownExp(currentCondExpression.getRight())
-                || FilterUtil
+                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
+                FilterUtil.checkIfExpressionContainsUnknownExp(currentCondExpression.getRight())
+                    || FilterUtil
                     .checkIfExpressionContainsUnknownExp(currentCondExpression.getLeft()))) {
               return new RowLevelFilterResolverImpl(expression, isExpressionResolve, false,
                   tableIdentifier);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java b/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
index 810cbaf..ee72f62 100644
--- a/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
+++ b/core/src/main/java/org/carbondata/query/filters/measurefilter/util/FilterUtil.java
@@ -133,7 +133,8 @@ public final class FilterUtil {
         case ROWLEVEL_GREATERTHAN_EQUALTO:
         case ROWLEVEL_GREATERTHAN:
           return RowLevelRangeTypeExecuterFacory
-              .getRowLevelRangeTypeExecuter(filterExecuterType, filterExpressionResolverTree);
+              .getRowLevelRangeTypeExecuter(filterExecuterType, filterExpressionResolverTree,
+                  segmentProperties);
         case ROWLEVEL:
         default:
           return new RowLevelFilterExecuterImpl(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/integration/spark/src/test/resources/data2.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/data2.csv b/integration/spark/src/test/resources/data2.csv
index 6187ac3..2e9200d 100644
--- a/integration/spark/src/test/resources/data2.csv
+++ b/integration/spark/src/test/resources/data2.csv
@@ -1,4 +1,4 @@
 ID,date,country,name,phonetype,serialname,salary
-4,21-01-2014,china,aaa4,phone2435,ASD66902,15003
-abc,,china,aaa5,phone2441,ASD90633,15004
-6,21-01-2014,china,aaa6,phone294,ASD59961,15005
+4,2014-01-21 00:00:00,china,aaa4,phone2435,ASD66902,15003
+abc,2014-01-22 00:00:00,china,aaa5,phone2441,ASD90633,15004
+6,2014-03-07 00:00:00,china,aaa6,phone294,ASD59961,15005

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/HighCardinalityDataTypesTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/HighCardinalityDataTypesTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/HighCardinalityDataTypesTestCase.scala
index 44195d2..65e125e 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/HighCardinalityDataTypesTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/detailquery/HighCardinalityDataTypesTestCase.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.sql.Row
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
 
 
 /**
@@ -76,6 +78,8 @@ class NO_DICTIONARY_COL_TestCase extends QueryTest with BeforeAndAfterAll {
       "name String, phonetype String, serialname String, salary Int) " +
         "STORED BY 'org.apache.carbondata.format' " +  "TBLPROPERTIES('DICTIONARY_EXCLUDE'='ID')"
     )
+        CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
     sql(
       s"LOAD DATA LOCAL INPATH './src/test/resources/data2.csv' INTO TABLE filtertestTable OPTIONS"+
         s"('DELIMITER'= ',', " +

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462eb393/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
index afbda24..c272e2a 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
@@ -40,38 +40,31 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists filtertestTables")
     sql("drop table if exists filtertestTablesWithDecimal")
     sql("drop table if exists filtertestTablesWithNull")
-    sql("drop table if exists filterWithTimeStamp")
+    sql("drop table if exists filterTimestampDataType")
     sql("drop table if exists noloadtable")
     sql("CREATE TABLE filtertestTables (ID int, date Timestamp, country String, " +
       "name String, phonetype String, serialname String, salary int) " +
-      "STORED BY 'org.apache.carbondata.format'"
+        "STORED BY 'org.apache.carbondata.format'"
     )
     sql("CREATE TABLE noloadtable (ID int, date Timestamp, country String, " +
       "name String, phonetype String, serialname String, salary int) " +
       "STORED BY 'org.apache.carbondata.format'"
     )
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss")
-
-    sql("CREATE TABLE filterWithTimeStamp (ID int, date Timestamp, country String, " +
+     CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss")
+        
+     sql("CREATE TABLE filterTimestampDataType (ID int, date Timestamp, country String, " +
       "name String, phonetype String, serialname String, salary int) " +
-      "STORED BY 'org.apache.carbondata.format'"
+        "STORED BY 'org.apache.carbondata.format'"
     )
+       CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss")
     sql(
       s"LOAD DATA LOCAL INPATH './src/test/resources/data2_DiffTimeFormat.csv' INTO TABLE " +
-        s"filterWithTimeStamp " +
+        s"filterTimestampDataType " +
         s"OPTIONS('DELIMITER'= ',', " +
         s"'FILEHEADER'= '')"
     )
-
-    test("Time stamp filter with diff time format for load ") {
-      checkAnswer(
-        sql("select date  from filterWithTimeStamp where date > '2014-07-10 00:00:00'"),
-        Seq(Row(Timestamp.valueOf("2014-07-20 00:00:00.0")),
-          Row(Timestamp.valueOf("2014-07-25 00:00:00.0"))
-        )
-      )
-    }
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
     sql(
@@ -83,7 +76,7 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
       "CREATE TABLE filtertestTablesWithDecimal (ID decimal, date Timestamp, country " +
         "String, " +
         "name String, phonetype String, serialname String, salary int) " +
-        "STORED BY 'org.apache.carbondata.format'"
+      "STORED BY 'org.apache.carbondata.format'"
     )
     sql(
       s"LOAD DATA LOCAL INPATH './src/test/resources/dataDiff.csv' INTO TABLE " +
@@ -95,10 +88,10 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
       "CREATE TABLE filtertestTablesWithNull (ID int, date Timestamp, country " +
         "String, " +
         "name String, phonetype String, serialname String,salary int) " +
-        "STORED BY 'org.apache.carbondata.format'"
+      "STORED BY 'org.apache.carbondata.format'"
     )
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
     sql(
       s"LOAD DATA LOCAL INPATH './src/test/resources/data2.csv' INTO TABLE " +
         s"filtertestTablesWithNull " +
@@ -107,14 +100,21 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
     )
   }
 
-
+    
   test("Is not null filter") {
     checkAnswer(
       sql("select id from filtertestTablesWithNull " + "where id is not null"),
       Seq(Row(4), Row(6))
     )
   }
-  test("Multi column with invalid member filter") {
+  
+    test("Between  filter") {
+    checkAnswer(
+      sql("select date from filtertestTablesWithNull " + " where date between '2014-01-20 00:00:00' and '2014-01-28 00:00:00'"),
+      Seq(Row(Timestamp.valueOf("2014-01-21 00:00:00")), Row(Timestamp.valueOf("2014-01-22 00:00:00")))
+    )
+  }
+    test("Multi column with invalid member filter") {
     checkAnswer(
       sql("select id from filtertestTablesWithNull " + "where id = salary"),
       Seq()
@@ -168,7 +168,7 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
       Seq(Row(1, "china"))
     )
   }
-
+  
   test("filter query over table having no data") {
     checkAnswer(
       sql("select * from noloadtable " + "where country='china' and name='aaa1'"),
@@ -176,6 +176,38 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
     )
   }
 
+
+    
+     test("Time stamp filter with diff time format for load greater") {
+    checkAnswer(
+      sql("select date  from filterTimestampDataType where date > '2014-07-10 00:00:00'"),
+      Seq(Row(Timestamp.valueOf("2014-07-20 00:00:00.0")),
+        Row(Timestamp.valueOf("2014-07-25 00:00:00.0"))
+      )
+    )
+  }
+    test("Time stamp filter with diff time format for load less") {
+    checkAnswer(
+      sql("select date  from filterTimestampDataType where date < '2014-07-20 00:00:00'"),
+      Seq(Row(Timestamp.valueOf("2014-07-10 00:00:00.0"))
+      )
+    )
+  }
+   test("Time stamp filter with diff time format for load less than equal") {
+    checkAnswer(
+      sql("select date  from filterTimestampDataType where date <= '2014-07-20 00:00:00'"),
+      Seq(Row(Timestamp.valueOf("2014-07-10 00:00:00.0")),Row(Timestamp.valueOf("2014-07-20 00:00:00.0"))
+      )
+    )
+  }
+      test("Time stamp filter with diff time format for load greater than equal") {
+    checkAnswer(
+      sql("select date  from filterTimestampDataType where date >= '2014-07-20 00:00:00'"),
+      Seq(Row(Timestamp.valueOf("2014-07-20 00:00:00.0")),Row(Timestamp.valueOf("2014-07-25 00:00:00.0"))
+      )
+    )
+  }
+
   override def afterAll {
     sql("drop table noloadtable")
     CarbonProperties.getInstance()