You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/09 07:19:13 UTC

[1/2] carbondata git commit: Like Filter Pushdown

Repository: carbondata
Updated Branches:
  refs/heads/master 2c1265ddb -> b16ab636c


Like Filter Pushdown

No Dictionary Handling in Greater and Less Than Expression


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a5b92876
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a5b92876
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a5b92876

Branch: refs/heads/master
Commit: a5b92876eac8a2b64ebcd1a6d9607b59e321a601
Parents: 2c1265d
Author: sounakr <so...@gmail.com>
Authored: Thu May 4 10:16:19 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Tue May 9 15:18:11 2017 +0800

----------------------------------------------------------------------
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  80 ++++++-------
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  75 ++++++------
 ...velRangeLessThanEqualFilterExecuterImpl.java | 112 ++++++++----------
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 115 +++++++++----------
 .../resources/hiverangenodictionarycompare.csv  |  20 ++++
 .../detailquery/RangeFilterTestCase.scala       |  84 +++++++++++++-
 .../spark/sql/optimizer/CarbonFilters.scala     | 103 +++++++++++++----
 .../execution/CarbonLateDecodeStrategy.scala    |   2 +
 .../spark/sql/optimizer/CarbonFilters.scala     |  24 ++++
 9 files changed, 383 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 97535a6..6f8651a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -116,11 +115,8 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
           .createBitSetGroupWithDefaultValue(blockChunkHolder.getDataBlock().numberOfPages(),
               numberOfRows, true);
     }
-    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
-      return super.applyFilter(blockChunkHolder);
-    }
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimensionBlocksIndex[0]);
+    int blockIndex =
+        segmentProperties.getDimensionOrdinalToBlockMapping().get(dimensionBlocksIndex[0]);
     if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
       blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
@@ -154,10 +150,8 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
       int numerOfRows) {
-    if (dimensionColumnDataChunk.isExplicitSorted()
-        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      return setFilterdIndexToBitSetWithColumnIndex(
-          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    if (dimensionColumnDataChunk.isExplicitSorted()) {
+      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows);
     }
     return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
   }
@@ -173,7 +167,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
    * @return BitSet.
    */
   private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+      DimensionColumnDataChunk dimensionColumnDataChunk, int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     int start = 0;
     int last = 0;
@@ -234,42 +228,40 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      int start = 0;
-      int last = 0;
-      int startIndex = 0;
-      byte[][] filterValues = this.filterRangeValues;
-      for (int k = 0; k < filterValues.length; k++) {
-        start = CarbonUtil.getFirstIndexUsingBinarySearch(
-            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-            filterValues[k], true);
-        if (start >= 0) {
-          start = CarbonUtil.nextGreaterValueToTarget(start,
-              (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k],
-              numerOfRows);
-        }
-        if (start < 0) {
-          start = -(start + 1);
-          if (start == numerOfRows) {
-            start = start - 1;
-          }
-          // Method will compare the tentative index value after binary search, this tentative
-          // index needs to be compared by the filter member if its > filter then from that
-          // index the bitset will be considered for filtering process.
-          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) > 0) {
-            start = start + 1;
-          }
-        }
-        last = start;
-        for (int j = start; j < numerOfRows; j++) {
-          bitSet.set(j);
-          last++;
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int k = 0; k < filterValues.length; k++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[k], true);
+      if (start >= 0) {
+        start = CarbonUtil
+            .nextGreaterValueToTarget(start, dimensionColumnDataChunk, filterValues[k],
+                numerOfRows);
+      }
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
         }
-        startIndex = last;
-        if (startIndex >= numerOfRows) {
-          break;
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its > filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) > 0) {
+          start = start + 1;
         }
       }
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+        bitSet.set(j);
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
     }
     return bitSet;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 6eb2a6f..fbc9b30 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -117,11 +116,8 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
           .createBitSetGroupWithDefaultValue(blockChunkHolder.getDataBlock().numberOfPages(),
               numberOfRows, true);
     }
-    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
-      return super.applyFilter(blockChunkHolder);
-    }
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimensionBlocksIndex[0]);
+    int blockIndex =
+        segmentProperties.getDimensionOrdinalToBlockMapping().get(dimensionBlocksIndex[0]);
     if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
       blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
@@ -155,10 +151,8 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
 
   private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
       int numerOfRows) {
-    if (dimensionColumnDataChunk.isExplicitSorted()
-        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      return setFilterdIndexToBitSetWithColumnIndex(
-          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    if (dimensionColumnDataChunk.isExplicitSorted()) {
+      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows);
     }
     return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
   }
@@ -174,7 +168,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
    * @return BitSet.
    */
   private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+      DimensionColumnDataChunk dimensionColumnDataChunk, int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
     int start = 0;
     int last = 0;
@@ -194,7 +188,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
         // index the bitset will be considered for filtering process.
         if (ByteUtil.compare(filterValues[i],
             dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
-            >= 0) {
+            > 0) {
           start = start + 1;
         }
       }
@@ -224,39 +218,36 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
       int numerOfRows) {
     BitSet bitSet = new BitSet(numerOfRows);
-    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      int start = 0;
-      int last = 0;
-      int startIndex = 0;
-      byte[][] filterValues = this.filterRangeValues;
-      for (int k = 0; k < filterValues.length; k++) {
-        start = CarbonUtil.getFirstIndexUsingBinarySearch(
-            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-            filterValues[k], false);
-        if (start < 0) {
-          start = -(start + 1);
-          if (start == numerOfRows) {
-            start = start - 1;
-          }
-          // Method will compare the tentative index value after binary search, this tentative
-          // index needs to be compared by the filter member if its >= filter then from that
-          // index the bitset will be considered for filtering process.
-          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
-              >= 0) {
-            start = start + 1;
-          }
-        }
-
-        last = start;
-        for (int j = start; j < numerOfRows; j++) {
-          bitSet.set(j);
-          last++;
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int k = 0; k < filterValues.length; k++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[k], false);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
         }
-        startIndex = last;
-        if (startIndex >= numerOfRows) {
-          break;
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its >= filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) > 0) {
+          start = start + 1;
         }
       }
+
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+        bitSet.set(j);
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
     }
     return bitSet;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 306becf..99f5700 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -118,11 +117,8 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
           .createBitSetGroupWithDefaultValue(blockChunkHolder.getDataBlock().numberOfPages(),
               numberOfRows, true);
     }
-    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
-      return super.applyFilter(blockChunkHolder);
-    }
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimensionBlocksIndex[0]);
+    int blockIndex =
+        segmentProperties.getDimensionOrdinalToBlockMapping().get(dimensionBlocksIndex[0]);
     if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
       blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
@@ -159,12 +155,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
           this.segmentProperties.getDimensionKeyGenerator());
     }
-    if (dimensionColumnDataChunk.isExplicitSorted()
-        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-
-      return setFilterdIndexToBitSetWithColumnIndex(
-          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows, defaultValue);
-
+    if (dimensionColumnDataChunk.isExplicitSorted()) {
+      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows,
+          defaultValue);
     }
     return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, defaultValue);
   }
@@ -180,7 +173,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
    * @return BitSet.
    */
   private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
+      DimensionColumnDataChunk dimensionColumnDataChunk, int numerOfRows,
       byte[] defaultValue) {
     BitSet bitSet = new BitSet(numerOfRows);
     int start = 0;
@@ -213,12 +206,12 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
         if (start >= numerOfRows) {
           start = start - 1;
         }
-        // Method will compare the tentative index value after binary search, this tentative
-        // index needs to be compared by the filter member if its >= filter then from that
-        // index the bitset will be considered for filtering process.
+        // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
+        // will be pointing to the next consecutive position. So compare it again and point to the
+        // previous value returned from getFirstIndexUsingBinarySearch.
         if (ByteUtil.compare(filterValues[i],
             dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
-            <= 0) {
+            < 0) {
           start = start - 1;
         }
       }
@@ -249,55 +242,52 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
       int numerOfRows, byte[] defaultValue) {
     BitSet bitSet = new BitSet(numerOfRows);
-    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      int start = 0;
-      int last = 0;
-      int startIndex = 0;
-      byte[][] filterValues = this.filterRangeValues;
-      int skip = 0;
-      //find the number of default values to skip the null value in case of direct dictionary
-      if (null != defaultValue) {
-        start = CarbonUtil.getFirstIndexUsingBinarySearch(
-            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-            defaultValue, true);
-        if (start < 0) {
-          skip = -(start + 1);
-          // end of block
-          if (skip == numerOfRows) {
-            return bitSet;
-          }
-        } else {
-          skip = start;
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    int skip = 0;
+    //find the number of default values to skip the null value in case of direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              defaultValue, true);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
         }
-        startIndex = skip;
+      } else {
+        skip = start;
       }
-      for (int k = 0; k < filterValues.length; k++) {
-        start = CarbonUtil.getFirstIndexUsingBinarySearch(
-            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-            filterValues[k], true);
-        if (start < 0) {
-          start = -(start + 1);
-          if (start >= numerOfRows) {
-            start = start - 1;
-          }
-          // Method will compare the tentative index value after binary search, this tentative
-          // index needs to be compared by the filter member if its <= filter then from that
-          // index the bitset will be considered for filtering process.
-          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
-              <= 0) {
-            start = start - 1;
-          }
-        }
-        last = start;
-        for (int j = start; j >= skip; j--) {
-          bitSet.set(j);
-          last--;
+      startIndex = skip;
+    }
+    for (int k = 0; k < filterValues.length; k++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[k], true);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start >= numerOfRows) {
+          start = start - 1;
         }
-        startIndex = last;
-        if (startIndex <= 0) {
-          break;
+        // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
+        // will be pointing to the next consecutive position. So compare it again and point to the
+        // previous value returned from getFirstIndexUsingBinarySearch.
+        if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) < 0) {
+          start = start - 1;
         }
       }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(j);
+        last--;
+      }
+      startIndex = last;
+      if (startIndex <= 0) {
+        break;
+      }
     }
     return bitSet;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index f2a49d9..1883607 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -118,11 +117,8 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
           .createBitSetGroupWithDefaultValue(blockChunkHolder.getDataBlock().numberOfPages(),
               numberOfRows, true);
     }
-    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
-      return super.applyFilter(blockChunkHolder);
-    }
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimensionBlocksIndex[0]);
+    int blockIndex =
+        segmentProperties.getDimensionOrdinalToBlockMapping().get(dimensionBlocksIndex[0]);
     if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
       blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
           .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
@@ -159,10 +155,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
       defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
           this.segmentProperties.getDimensionKeyGenerator());
     }
-    if (dimensionColumnDataChunk.isExplicitSorted()
-        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      return setFilterdIndexToBitSetWithColumnIndex(
-          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows, defaultValue);
+    if (dimensionColumnDataChunk.isExplicitSorted()) {
+      return setFilterdIndexToBitSetWithColumnIndex(dimensionColumnDataChunk, numerOfRows,
+          defaultValue);
     }
     return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, defaultValue);
   }
@@ -178,7 +173,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
    * @return BitSet.
    */
   private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
+      DimensionColumnDataChunk dimensionColumnDataChunk, int numerOfRows,
       byte[] defaultValue) {
     BitSet bitSet = new BitSet(numerOfRows);
     int start = 0;
@@ -218,9 +213,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
         if (start >= numerOfRows) {
           start = start - 1;
         }
-        // Method will compare the tentative index value after binary search, this tentative
-        // index needs to be compared by the filter member if its < filter then from that
-        // index the bitset will be considered for filtering process.
+        // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
+        // will be pointing to the next consecutive position. So compare it again and point to the
+        // previous value returned from getFirstIndexUsingBinarySearch.
         if (ByteUtil.compare(filterValues[i],
             dimensionColumnDataChunk.getChunkData(dimensionColumnDataChunk.getInvertedIndex(start)))
             < 0) {
@@ -253,59 +248,57 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
   private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
       int numerOfRows, byte[] defaultValue) {
     BitSet bitSet = new BitSet(numerOfRows);
-    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
-      int start = 0;
-      int last = 0;
-      int startIndex = 0;
-      int skip = 0;
-      byte[][] filterValues = this.filterRangeValues;
-      //find the number of default values to skip the null value in case of direct dictionary
-      if (null != defaultValue) {
-        start = CarbonUtil.getFirstIndexUsingBinarySearch(
-            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-            defaultValue, false);
-        if (start < 0) {
-          skip = -(start + 1);
-          // end of block
-          if (skip == numerOfRows) {
-            return bitSet;
-          }
-        } else {
-          skip = start;
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    int skip = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    //find the number of default values to skip the null value in case of direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              defaultValue, false);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
         }
-        startIndex = skip;
+      } else {
+        skip = start;
       }
-      for (int k = 0; k < filterValues.length; k++) {
-        start = CarbonUtil.getFirstIndexUsingBinarySearch(
-            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-            filterValues[k], false);
-        if (start >= 0) {
-          start = CarbonUtil.nextLesserValueToTarget(start,
-              (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k]);
-        }
-        if (start < 0) {
-          start = -(start + 1);
+      startIndex = skip;
+    }
+    for (int k = 0; k < filterValues.length; k++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[k], false);
+      if (start >= 0) {
+        start =
+            CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[k]);
+      }
+      if (start < 0) {
+        start = -(start + 1);
 
-          if (start >= numerOfRows) {
-            start = numerOfRows - 1;
-          }
-          // Method will compare the tentative index value after binary search, this tentative
-          // index needs to be compared by the filter member if its < filter then from that
-          // index the bitset will be considered for filtering process.
-          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) < 0) {
-            start = start - 1;
-          }
-        }
-        last = start;
-        for (int j = start; j >= skip; j--) {
-          bitSet.set(j);
-          last--;
+        if (start >= numerOfRows) {
+          start = numerOfRows - 1;
         }
-        startIndex = last;
-        if (startIndex <= 0) {
-          break;
+        // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
+        // will be pointing to the next consecutive position. So compare it again and point to the
+        // previous value returned from getFirstIndexUsingBinarySearch.
+        if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) < 0) {
+          start = start - 1;
         }
       }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(j);
+        last--;
+      }
+      startIndex = last;
+      if (startIndex <= 0) {
+        break;
+      }
     }
     return bitSet;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/integration/spark-common-test/src/test/resources/hiverangenodictionarycompare.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/hiverangenodictionarycompare.csv b/integration/spark-common-test/src/test/resources/hiverangenodictionarycompare.csv
new file mode 100644
index 0000000..cb3bf11
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/hiverangenodictionarycompare.csv
@@ -0,0 +1,20 @@
+11,100,SE,17-01-2007,1,developer,10,network,928478,17-02-2007,29-11-2016,96,96,5040
+12,101,SSE,29-05-2008,1,developer,11,protocol,928378,29-06-2008,30-12-2016,85,95,7124
+13,108,TPL,07-07-2009,2,tester,10,network,928478,07-08-2009,30-12-2016,88,99,9054
+14,109,SA,29-12-2010,3,manager,11,protocol,928278,29-01-2011,29-06-2016,77,92,11248
+15,11111,SSA,09-11-2011,1,developer,12,security,928375,09-12-2011,29-05-2016,99,91,13245
+16,107,SE,14-10-2012,1,developer,13,configManagement,928478,14-11-2012,29-12-2016,86,93,5040
+17,1111119,PL,22-09-2013,2,tester,12,security,928778,22-10-2013,15-11-2016,78,97,9574
+18,1111118,TL,15-08-2014,2,tester,14,Learning,928176,15-09-2014,29-05-2016,84,98,7245
+19,131,PL,12-05-2015,1,developer,10,network,928977,12-06-2015,12-11-2016,88,91,11254
+20,129,PM,01-12-2015,3,manager,14,Learning,928479,01-01-2016,30-11-2016,75,94,13547
+21,124,SE,17-01-2007,1,developer,10,network,928478,17-02-2007,29-11-2016,96,96,5040
+22,106,SSE,29-05-2008,1,developer,11,protocol,928378,29-06-2008,30-12-2016,85,95,7124
+23,122222,TPL,07-07-2009,2,tester,10,network,928478,07-08-2009,30-12-2016,88,99,9054
+24,13333,SA,29-12-2010,3,manager,11,protocol,928278,29-01-2011,29-06-2016,77,92,11248
+25,140,SSA,09-11-2011,1,developer,12,security,928375,09-12-2011,29-05-2016,99,91,13245
+26,149,SE,14-10-2012,1,developer,13,configManagement,928478,14-11-2012,29-12-2016,86,93,5040
+27,132,PL,22-09-2013,2,tester,12,security,928778,22-10-2013,15-11-2016,78,97,9574
+28,13888888,TL,15-08-2014,2,tester,14,Learning,928176,15-09-2014,29-05-2016,84,98,7245
+29,126,PL,12-05-2015,1,developer,10,network,928977,12-06-2015,12-11-2016,88,91,11254
+30,116,PM,01-12-2015,3,manager,14,Learning,928479,01-01-2016,30-11-2016,75,94,13547

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/RangeFilterTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/RangeFilterTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/RangeFilterTestCase.scala
index 46b514a..a73c98a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/RangeFilterTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/RangeFilterTestCase.scala
@@ -133,7 +133,7 @@ class RangeFilterTestCase extends QueryTest with BeforeAndAfterAll {
     )
 
     sql(
-      s"load data local inpath '$resourcesPath/rangenodictionarycompare.csv' into table " +
+      s"load data local inpath '$resourcesPath/hiverangenodictionarycompare.csv' into table " +
       "NO_DICTIONARY_HIVE_8"
     );
 
@@ -507,6 +507,88 @@ class RangeFilterTestCase extends QueryTest with BeforeAndAfterAll {
       sql("select empname from NO_DICTIONARY_HIVE_8 where empname > '10' and empno <= '14'"))
   }
 
+  test("Range with name comparision 11") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname like '1%'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname like '1%'"))
+  }
+
+
+  test("Range with name comparision 12") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname like '12%'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname like '12%'"))
+  }
+
+  test("Range with name comparision 13") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname like '11%'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname like '11%'"))
+  }
+
+  test("Range with name comparision 14") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname like '%1%'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname like '%1%'"))
+  }
+
+  test("Range with name comparision 15") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname like '1111%'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname like '1111%'"))
+  }
+  // Greater Than Less Than test cases
+  test("No Range with name comparision 1") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname > '11'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname > '11'"))
+  }
+
+  test("No Range with name comparision 2") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname >= '11'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname >= '11'"))
+  }
+
+  test("No Range with name comparision 3") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname < '126'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname < '126'"))
+  }
+
+  test("No Range with name comparision 4") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname <= '126'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname <= '126'"))
+  }
+
+  test("No Range with name comparision 5") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname > '107'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname > '107'"))
+  }
+
+
+  test("No Range with name comparision 6") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname >= '107'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname >= '107'"))
+  }
+
+
+  test("No Range with name comparision 7") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname < '107'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname < '107'"))
+  }
+
+
+  test("No Range with name comparision 8") {
+    checkAnswer(
+      sql("select empname from NO_DICTIONARY_CARBON_7 where empname <= '107'"),
+      sql("select empname from NO_DICTIONARY_HIVE_8 where empname <= '107'"))
+  }
+
 
   override def afterAll {
     sql("drop table if exists filtertestTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index cf8689c..47c8928 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -19,16 +19,16 @@ package org.apache.carbondata.spark
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, StartsWith, _}
 import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.conditional.{GreaterThanEqualToExpression, LessThanExpression, _}
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -100,7 +100,14 @@ object CarbonFilters {
           } yield {
             new OrExpression(lhsFilter, rhsFilter)
           }
-
+        case sources.StringStartsWith(name, value) if value.length > 0 =>
+          val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value))
+          val maxValueLimit = value.substring(0, value.length - 1) +
+                              (value.charAt(value.length - 1).toInt + 1).toChar
+          val r = new LessThanExpression(
+            getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
+          Some(new AndExpression(l, r))
         case _ => None
       }
     }
@@ -197,6 +204,8 @@ object CarbonFilters {
           Some(sources.LessThanOrEqual(a.name, v))
         case LessThanOrEqual(Literal(v, t), a: Attribute) =>
           Some(sources.GreaterThanOrEqual(a.name, v))
+        case StartsWith(a: Attribute, Literal(v, t)) =>
+          Some(sources.StringStartsWith(a.name, v.toString))
 
         case others =>
           if (!or) {
@@ -211,13 +220,30 @@ object CarbonFilters {
     filters.flatMap(translate(_, false)).toArray
   }
 
+  def isCarbonSupportedDataTypes(expr: Expression): Boolean = {
+    expr.dataType match {
+      case StringType => true
+      case IntegerType => true
+      case LongType => true
+      case DoubleType => true
+      case FloatType => true
+      case BooleanType => true
+      case TimestampType => true
+      case ArrayType(_, _) => true
+      case StructType(_) => true
+      case DecimalType() => true
+      case _ => false
+    }
+  }
+
   def processExpression(exprs: Seq[Expression],
       attributesNeedToDecode: java.util.HashSet[AttributeReference],
       unprocessedExprs: ArrayBuffer[Expression],
       carbonTable: CarbonTable): Option[CarbonExpression] = {
     def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
       expr match {
-        case orFilter@ Or(left, right) =>
+        case orFilter@ Or(left, right)
+          if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
           val leftFilter = transformExpression(left, or = true)
           val rightFilter = transformExpression(right, or = true)
           if (leftFilter.isDefined && rightFilter.isDefined) {
@@ -232,7 +258,8 @@ object CarbonFilters {
             None
           }
 
-        case And(left, right) =>
+        case And(left, right) if (isCarbonSupportedDataTypes(left) &&
+                                  isCarbonSupportedDataTypes(right)) =>
           val leftFilter = transformExpression(left, or)
           val rightFilter = transformExpression(right, or)
           if (or) {
@@ -246,14 +273,16 @@ object CarbonFilters {
           }
 
 
-        case EqualTo(a: Attribute, l@Literal(v, t)) =>
+        case EqualTo(a: Attribute, l@Literal(v, t)) if (isCarbonSupportedDataTypes(a) &&
+                                                        isCarbonSupportedDataTypes(l)) =>
           Some(
             new EqualToExpression(
               transformExpression(a).get,
               transformExpression(l).get
             )
           )
-        case EqualTo(l@Literal(v, t), a: Attribute) =>
+        case EqualTo(l@Literal(v, t), a: Attribute) if (isCarbonSupportedDataTypes(l) &&
+                                                        isCarbonSupportedDataTypes(a)) =>
           Some(
             new EqualToExpression(
               transformExpression(a).get,
@@ -261,59 +290,71 @@ object CarbonFilters {
             )
           )
 
-        case Not(EqualTo(a: Attribute, l@Literal(v, t))) =>
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) if (isCarbonSupportedDataTypes(a) &&
+                                                             isCarbonSupportedDataTypes(l)) =>
           Some(
             new NotEqualsExpression(
               transformExpression(a).get,
               transformExpression(l).get
             )
           )
-        case Not(EqualTo(l@Literal(v, t), a: Attribute)) =>
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) if (isCarbonSupportedDataTypes(l) &&
+                                                             isCarbonSupportedDataTypes(a)) =>
           Some(
             new NotEqualsExpression(
               transformExpression(a).get,
               transformExpression(l).get
             )
           )
-        case IsNotNull(child: Attribute) =>
+        case IsNotNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) =>
           Some(new NotEqualsExpression(transformExpression(child).get,
             transformExpression(Literal(null)).get, true))
-        case IsNull(child: Attribute) =>
+        case IsNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) =>
           Some(new EqualToExpression(transformExpression(child).get,
             transformExpression(Literal(null)).get, true))
         case Not(In(a: Attribute, list))
-          if !list.exists(!_.isInstanceOf[Literal]) =>
+          if !list.exists(!_.isInstanceOf[Literal]) && isCarbonSupportedDataTypes(a) =>
           if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
             Some(new FalseExpression(transformExpression(a).get))
           } else {
             Some(new NotInExpression(transformExpression(a).get,
               new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
           }
-        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) &&
+                                       isCarbonSupportedDataTypes(a) =>
           Some(new InExpression(transformExpression(a).get,
-            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+            new ListExpression(convertToJavaList(list
+              .map(transformExpression(_).get)))))
 
-        case GreaterThan(a: Attribute, l@Literal(v, t)) =>
+        case GreaterThan(a: Attribute, l@Literal(v, t))
+          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
           Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case GreaterThan(l@Literal(v, t), a: Attribute) =>
+        case GreaterThan(l@Literal(v, t), a: Attribute)
+          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
           Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
 
-        case LessThan(a: Attribute, l@Literal(v, t)) =>
+        case LessThan(a: Attribute, l@Literal(v, t))
+          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
           Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case LessThan(l@Literal(v, t), a: Attribute) =>
+        case LessThan(l@Literal(v, t), a: Attribute)
+          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
           Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
 
-        case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+        case GreaterThanOrEqual(a: Attribute, l@Literal(v, t))
+          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
           Some(new GreaterThanEqualToExpression(transformExpression(a).get,
             transformExpression(l).get))
-        case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) =>
+        case GreaterThanOrEqual(l@Literal(v, t), a: Attribute)
+          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
           Some(new LessThanEqualToExpression(transformExpression(a).get,
             transformExpression(l).get))
 
-        case LessThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+        case LessThanOrEqual(a: Attribute, l@Literal(v, t))
+          if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) =>
           Some(new LessThanEqualToExpression(transformExpression(a).get,
             transformExpression(l).get))
-        case LessThanOrEqual(l@Literal(v, t), a: Attribute) =>
+        case LessThanOrEqual(l@Literal(v, t), a: Attribute)
+          if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) =>
           Some(new GreaterThanEqualToExpression(transformExpression(a).get,
             transformExpression(l).get))
 
@@ -323,6 +364,22 @@ object CarbonFilters {
               getActualCarbonDataType(name, carbonTable))))
         case Literal(name, dataType) => Some(new
             CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+        case StartsWith(left : Attribute, right@Literal(pattern, dataType)) if
+        pattern.toString.size > 0 &&
+        isCarbonSupportedDataTypes
+        (left) &&
+        isCarbonSupportedDataTypes
+        (right) =>
+          val l = new GreaterThanEqualToExpression(transformExpression(left).get,
+            transformExpression(right).get)
+          val maxValueLimit = pattern.toString.substring(0, pattern.toString.length - 1) +
+                              (pattern.toString.charAt(pattern.toString.length - 1).toInt + 1)
+                                .toChar
+          val r = new LessThanExpression(
+            transformExpression(left).get,
+            new CarbonLiteralExpression(maxValueLimit,
+              CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+          Some(new AndExpression(l, r))
         case others =>
           if (!or) {
             others.collect {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 595c173..346e105 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -491,6 +491,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
       case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
         CastExpressionOptimization.checkIfCastCanBeRemove(c)
+      case StartsWith(a: Attribute, Literal(v, t)) =>
+        Some(sources.StringStartsWith(a.name, v.toString))
       case others => None
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a5b92876/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 88343c0..f8abd67 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -97,6 +97,14 @@ object CarbonFilters {
           } yield {
             new OrExpression(lhsFilter, rhsFilter)
           }
+        case sources.StringStartsWith(name, value) if value.length > 0 =>
+          val l = new GreaterThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value))
+          val maxValueLimit = value.substring(0, value.length - 1) +
+                              (value.charAt(value.length - 1).toInt + 1).toChar
+          val r = new LessThanExpression(
+            getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit))
+          Some(new AndExpression(l, r))
         case CastExpr(expr: Expression) =>
           Some(transformExpression(expr))
         case _ => None
@@ -221,6 +229,8 @@ object CarbonFilters {
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
         case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
           CastExpressionOptimization.checkIfCastCanBeRemove(c)
+        case StartsWith(a: Attribute, Literal(v, t)) =>
+          Some(sources.StringStartsWith(a.name, v.toString))
         case c@Cast(a: Attribute, _) =>
           Some(CastExpr(c))
         case others =>
@@ -310,6 +320,20 @@ object CarbonFilters {
           CarbonScalaUtil.convertSparkToCarbonDataType(dataType))
       case Literal(name, dataType) => new
           CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType))
+      case StartsWith(left, right@Literal(pattern, dataType)) if pattern.toString.size > 0 &&
+                                                                 isCarbonSupportedDataTypes(left) &&
+                                                                 isCarbonSupportedDataTypes
+                                                                 (right) =>
+        val l = new GreaterThanEqualToExpression(transformExpression(left),
+          transformExpression(right))
+        val maxValueLimit = pattern.toString.substring(0, pattern.toString.length - 1) +
+                            (pattern.toString.charAt(pattern.toString.length - 1).toInt + 1)
+                              .toChar
+        val r = new LessThanExpression(
+          transformExpression(left),
+          new CarbonLiteralExpression(maxValueLimit,
+            CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+        new AndExpression(l, r)
       case StringTrim(child) => transformExpression(child)
       case _ =>
         new SparkUnknownExpression(expr.transform {


[2/2] carbondata git commit: [CARBONDATA-1019]Like Filter Pushdown This closes #878

Posted by ch...@apache.org.
[CARBONDATA-1019]Like Filter Pushdown This closes #878


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b16ab636
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b16ab636
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b16ab636

Branch: refs/heads/master
Commit: b16ab636cb42337bc047f65b5fe6a38bc4fffd70
Parents: 2c1265d a5b9287
Author: chenliang613 <ch...@huawei.com>
Authored: Tue May 9 15:19:00 2017 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Tue May 9 15:19:00 2017 +0800

----------------------------------------------------------------------
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  80 ++++++-------
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  75 ++++++------
 ...velRangeLessThanEqualFilterExecuterImpl.java | 112 ++++++++----------
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 115 +++++++++----------
 .../resources/hiverangenodictionarycompare.csv  |  20 ++++
 .../detailquery/RangeFilterTestCase.scala       |  84 +++++++++++++-
 .../spark/sql/optimizer/CarbonFilters.scala     | 103 +++++++++++++----
 .../execution/CarbonLateDecodeStrategy.scala    |   2 +
 .../spark/sql/optimizer/CarbonFilters.scala     |  24 ++++
 9 files changed, 383 insertions(+), 232 deletions(-)
----------------------------------------------------------------------