You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/15 07:09:13 UTC

[28/52] [partial] incubator-carbondata git commit: Renamed packages to org.apache.carbondata and fixed errors

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
new file mode 100644
index 0000000..edceb82
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.executor.util.QueryUtil;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+
+/**
+ * It checks if filter is required on given block and if required, it does
+ * linear search on block data and set the bitset.
+ */
+public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(IncludeColGroupFilterExecuterImpl.class.getName());
+
+  /**
+   * @param dimColResolvedFilterInfo
+   * @param segmentProperties
+   */
+  public IncludeColGroupFilterExecuterImpl(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
+    super(dimColResolvedFilterInfo, segmentProperties);
+  }
+
+  /**
+   * It fills BitSet with row index which matches filter key
+   */
+  protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+
+    try {
+      KeyStructureInfo keyStructureInfo = getKeyStructureInfo();
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      for (int i = 0; i < filterValues.length; i++) {
+        byte[] filterVal = filterValues[i];
+        for (int rowId = 0; rowId < numerOfRows; rowId++) {
+          byte[] colData = new byte[keyStructureInfo.getMaskByteRanges().length];
+          dimensionColumnDataChunk.fillChunkData(colData, 0, rowId, keyStructureInfo);
+          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, colData) == 0) {
+            bitSet.set(rowId);
+          }
+        }
+      }
+
+    } catch (Exception e) {
+      LOGGER.error(e);
+    }
+
+    return bitSet;
+  }
+
+  /**
+   * It is required for extracting column data from columngroup chunk
+   *
+   * @return
+   * @throws KeyGenException
+   */
+  private KeyStructureInfo getKeyStructureInfo() throws KeyGenException {
+    int colGrpId = getColumnGroupId(dimColumnEvaluatorInfo.getColumnIndex());
+    KeyGenerator keyGenerator = segmentProperties.getColumnGroupAndItsKeygenartor().get(colGrpId);
+    List<Integer> mdKeyOrdinal = new ArrayList<Integer>();
+    mdKeyOrdinal.add(getMdkeyOrdinal(dimColumnEvaluatorInfo.getColumnIndex(), colGrpId));
+    int[] maskByteRanges = QueryUtil.getMaskedByteRangeBasedOrdinal(mdKeyOrdinal, keyGenerator);
+    byte[] maxKey = QueryUtil.getMaxKeyBasedOnOrinal(mdKeyOrdinal, keyGenerator);
+    int[] maksedByte = QueryUtil.getMaskedByte(keyGenerator.getKeySizeInBytes(), maskByteRanges);
+    KeyStructureInfo restructureInfos = new KeyStructureInfo();
+    restructureInfos.setKeyGenerator(keyGenerator);
+    restructureInfos.setMaskByteRanges(maskByteRanges);
+    restructureInfos.setMaxKey(maxKey);
+    restructureInfos.setMaskedBytes(maksedByte);
+    return restructureInfos;
+  }
+
+  /**
+   * Check if scan is required on given block based on min and max value
+   */
+  public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+    int[] cols = getAllColumns(columnIndex);
+    byte[] maxValue = getMinMaxData(cols, blkMaxVal[blockIndex], columnIndex);
+    byte[] minValue = getMinMaxData(cols, blkMinVal[blockIndex], columnIndex);
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], maxValue);
+      // and filter-min should be positive
+      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], minValue);
+
+      // if any filter value is in range than this block needs to be
+      // scanned
+      if (maxCompare <= 0 && minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+  /**
+   * It extract min and max data for given column from stored min max value
+   *
+   * @param colGrpColumns
+   * @param minMaxData
+   * @param columnIndex
+   * @return
+   */
+  private byte[] getMinMaxData(int[] colGrpColumns, byte[] minMaxData, int columnIndex) {
+    int startIndex = 0;
+    int endIndex = 0;
+    if (null != colGrpColumns) {
+      for (int i = 0; i < colGrpColumns.length; i++) {
+        int colGrpId = getColumnGroupId(colGrpColumns[i]);
+        int mdKeyOrdinal = getMdkeyOrdinal(colGrpColumns[i], colGrpId);
+        int[] byteRange = getKeyGenerator(colGrpId).getKeyByteOffsets(mdKeyOrdinal);
+        int colSize = 0;
+        for (int j = byteRange[0]; j <= byteRange[1]; j++) {
+          colSize++;
+        }
+        if (colGrpColumns[i] == columnIndex) {
+          endIndex = startIndex + colSize;
+          break;
+        }
+        startIndex += colSize;
+      }
+    }
+    byte[] data = new byte[endIndex - startIndex];
+    System.arraycopy(minMaxData, startIndex, data, 0, data.length);
+    return data;
+  }
+
+  /**
+   * It returns column groups which have provided column ordinal
+   *
+   * @param columnIndex
+   * @return column group array
+   */
+  private int[] getAllColumns(int columnIndex) {
+    int[][] colGroups = segmentProperties.getColumnGroups();
+    for (int i = 0; i < colGroups.length; i++) {
+      if (QueryUtil.searchInArray(colGroups[i], columnIndex)) {
+        return colGroups[i];
+      }
+    }
+    return null;
+  }
+
+  private int getMdkeyOrdinal(int ordinal, int colGrpId) {
+    return segmentProperties.getColumnGroupMdKeyOrdinal(colGrpId, ordinal);
+  }
+
+  private int getColumnGroupId(int ordinal) {
+    int[][] columnGroups = segmentProperties.getColumnGroups();
+    int colGrpId = -1;
+    for (int i = 0; i < columnGroups.length; i++) {
+      if (columnGroups[i].length > 1) {
+        colGrpId++;
+        if (QueryUtil.searchInArray(columnGroups[i], ordinal)) {
+          break;
+        }
+      }
+    }
+    return colGrpId;
+  }
+
+  public KeyGenerator getKeyGenerator(int colGrpId) {
+    return segmentProperties.getColumnGroupAndItsKeygenartor().get(colGrpId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
new file mode 100644
index 0000000..8e645b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class IncludeFilterExecuterImpl implements FilterExecuter {
+
+  protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
+  protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected SegmentProperties segmentProperties;
+
+  public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
+      SegmentProperties segmentProperties) {
+    this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+    this.segmentProperties = segmentProperties;
+    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+    FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
+        segmentProperties.getDimensionKeyGenerator(), dimColumnEvaluatorInfo.getDimension(),
+        dimColumnExecuterInfo);
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColumnEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (dimensionColumnDataChunk.getAttributes().isNoDictionary()
+        && dimensionColumnDataChunk instanceof VariableLengthDimensionDataChunk) {
+      return setDirectKeyFilterIndexToBitSet(
+          (VariableLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    } else if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  private BitSet setDirectKeyFilterIndexToBitSet(
+      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    List<byte[]> listOfColumnarKeyBlockDataForNoDictionaryVals =
+        dimensionColumnDataChunk.getCompleteDataChunk();
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int[] columnIndexArray = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int[] columnReverseIndexArray =
+        dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse();
+    for (int i = 0; i < filterValues.length; i++) {
+      byte[] filterVal = filterValues[i];
+      if (null != listOfColumnarKeyBlockDataForNoDictionaryVals) {
+        if (null != columnIndexArray) {
+          for (int index : columnIndexArray) {
+            byte[] noDictionaryVal =
+                listOfColumnarKeyBlockDataForNoDictionaryVals.get(columnReverseIndexArray[index]);
+            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
+              bitSet.set(index);
+            }
+          }
+        } else if (null != columnReverseIndexArray) {
+          for (int index : columnReverseIndexArray) {
+            byte[] noDictionaryVal =
+                listOfColumnarKeyBlockDataForNoDictionaryVals.get(columnReverseIndexArray[index]);
+            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
+              bitSet.set(index);
+            }
+          }
+        } else {
+          for (int index = 0;
+               index < listOfColumnarKeyBlockDataForNoDictionaryVals.size(); index++) {
+            if (ByteUtil.UnsafeComparer.INSTANCE
+                .compareTo(filterVal, listOfColumnarKeyBlockDataForNoDictionaryVals.get(index))
+                == 0) {
+              bitSet.set(index);
+            }
+          }
+        }
+      }
+    }
+    return bitSet;
+
+  }
+
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      if (start < 0) {
+        continue;
+      }
+      bitSet.set(columnIndex[start]);
+      last = start;
+      for (int j = start + 1; j < numerOfRows; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE
+            .compareTo(dimensionColumnDataChunk.getCompleteDataChunk(), j * filterValues[i].length,
+                filterValues[i].length, filterValues[i], 0, filterValues[i].length) == 0) {
+          bitSet.set(columnIndex[j]);
+          last++;
+        } else {
+          break;
+        }
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      FixedLengthDimensionDataChunk fixedDimensionChunk =
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk;
+      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int j = 0; j < numerOfRows; j++) {
+          if (ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(fixedDimensionChunk.getCompleteDataChunk(), j * filterValues[k].length,
+                  filterValues[k].length, filterValues[k], 0, filterValues[k].length) == 0) {
+            bitSet.set(j);
+          }
+        }
+      }
+    }
+    return bitSet;
+  }
+
+  public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMaxVal[blockIndex]);
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMinVal[blockIndex]);
+
+      // if any filter value is in range than this block needs to be
+      // scanned
+      if (maxCompare <= 0 && minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/OrFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/OrFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/OrFilterExecuterImpl.java
new file mode 100644
index 0000000..8fd5431
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/OrFilterExecuterImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class OrFilterExecuterImpl implements FilterExecuter {
+
+  private FilterExecuter leftExecuter;
+  private FilterExecuter rightExecuter;
+
+  public OrFilterExecuterImpl(FilterExecuter leftExecuter, FilterExecuter rightExecuter) {
+    this.leftExecuter = leftExecuter;
+    this.rightExecuter = rightExecuter;
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    BitSet leftFilters = leftExecuter.applyFilter(blockChunkHolder);
+    BitSet rightFilters = rightExecuter.applyFilter(blockChunkHolder);
+    leftFilters.or(rightFilters);
+
+    return leftFilters;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet leftFilters = leftExecuter.isScanRequired(blockMaxValue, blockMinValue);
+    BitSet rightFilters = rightExecuter.isScanRequired(blockMaxValue, blockMinValue);
+    leftFilters.or(rightFilters);
+    return leftFilters;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RestructureFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RestructureFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RestructureFilterExecuterImpl.java
new file mode 100644
index 0000000..84d7898
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RestructureFilterExecuterImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+
+public class RestructureFilterExecuterImpl implements FilterExecuter {
+
+  DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+
+  public RestructureFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnResolvedFilterInfo,
+      KeyGenerator blockKeyGenerator) {
+    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+    FilterUtil
+        .prepareKeysFromSurrogates(dimColumnResolvedFilterInfo.getFilterValues(), blockKeyGenerator,
+            dimColumnResolvedFilterInfo.getDimension(), dimColumnExecuterInfo);
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blocksChunkHolder) {
+    BitSet bitSet = new BitSet(blocksChunkHolder.getDataBlock().nodeSize());
+    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
+    if (null != filterValues && filterValues.length > 0) {
+      bitSet.set(0, blocksChunkHolder.getDataBlock().nodeSize());
+    }
+    return bitSet;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    bitSet.set(0);
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
new file mode 100644
index 0000000..00f80bb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.executor.util.QueryUtil;
+import org.apache.carbondata.scan.expression.Expression;
+import org.apache.carbondata.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.filter.intf.RowImpl;
+import org.apache.carbondata.scan.filter.intf.RowIntf;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class RowLevelFilterExecuterImpl implements FilterExecuter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowLevelFilterExecuterImpl.class.getName());
+  protected List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList;
+  protected List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList;
+  protected Expression exp;
+  protected AbsoluteTableIdentifier tableIdentifier;
+  protected SegmentProperties segmentProperties;
+  /**
+   * it has index at which given dimension is stored in file
+   */
+  private int[] blocksIndex;
+
+  private Map<Integer, GenericQueryType> complexDimensionInfoMap;
+
+  public RowLevelFilterExecuterImpl(List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, SegmentProperties segmentProperties,
+      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+    this.dimColEvaluatorInfoList = dimColEvaluatorInfoList;
+    this.segmentProperties = segmentProperties;
+    this.blocksIndex = new int[dimColEvaluatorInfoList.size()];
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      this.blocksIndex[i] = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColEvaluatorInfoList.get(i).getColumnIndex());
+    }
+    if (null == msrColEvalutorInfoList) {
+      this.msrColEvalutorInfoList = new ArrayList<MeasureColumnResolvedFilterInfo>(20);
+    } else {
+      this.msrColEvalutorInfoList = msrColEvalutorInfoList;
+    }
+    this.exp = exp;
+    this.tableIdentifier = tableIdentifier;
+    this.complexDimensionInfoMap = complexDimensionInfoMap;
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i);
+      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
+          && dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.STRUCT) {
+        if (null == blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]]) {
+          blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]] = blockChunkHolder.getDataBlock()
+              .getDimensionChunk(blockChunkHolder.getFileReader(), blocksIndex[i]);
+        }
+      } else {
+        GenericQueryType complexType = complexDimensionInfoMap.get(blocksIndex[i]);
+        complexType.fillRequiredBlockData(blockChunkHolder);
+      }
+    }
+
+    // CHECKSTYLE:OFF Approval No:Approval-V1R2C10_001
+    if (null != msrColEvalutorInfoList) {
+      for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : msrColEvalutorInfoList) {
+        if (msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice() && null == blockChunkHolder
+            .getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]) {
+          blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] =
+              blockChunkHolder.getDataBlock().getMeasureChunk(blockChunkHolder.getFileReader(),
+                  msrColumnEvalutorInfo.getColumnIndex());
+        }
+      }
+    }
+    // CHECKSTYLE:ON
+
+    int numberOfRows = blockChunkHolder.getDataBlock().nodeSize();
+    BitSet set = new BitSet(numberOfRows);
+    RowIntf row = new RowImpl();
+    boolean invalidRowsPresent = false;
+    for (int index = 0; index < numberOfRows; index++) {
+      try {
+        createRow(blockChunkHolder, row, index);
+      } catch (QueryExecutionException e) {
+        FilterUtil.logError(e, invalidRowsPresent);
+      }
+      Boolean rslt = false;
+      try {
+        rslt = exp.evaluate(row).getBoolean();
+      }
+      // Any invalid member while evaluation shall be ignored, system will log the
+      // error only once since all rows the evaluation happens so inorder to avoid
+      // too much log inforation only once the log will be printed.
+      catch (FilterIllegalMemberException e) {
+        FilterUtil.logError(e, invalidRowsPresent);
+      }
+      if (null != rslt && rslt) {
+        set.set(index);
+      }
+    }
+    return set;
+  }
+
+  /**
+   * Method will read the members of particular dimension block and create
+   * a row instance for further processing of the filters
+   *
+   * @param blockChunkHolder
+   * @param row
+   * @param index
+   * @throws QueryExecutionException
+   */
+  private void createRow(BlocksChunkHolder blockChunkHolder, RowIntf row, int index)
+      throws QueryExecutionException {
+    Object[] record = new Object[dimColEvaluatorInfoList.size() + msrColEvalutorInfoList.size()];
+    String memberString = null;
+    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i);
+      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
+          && dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.STRUCT) {
+        if (!dimColumnEvaluatorInfo.isDimensionExistsInCurrentSilce()) {
+          record[dimColumnEvaluatorInfo.getRowIndex()] = dimColumnEvaluatorInfo.getDefaultValue();
+        }
+        if (!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
+            && blockChunkHolder
+            .getDimensionDataChunk()[blocksIndex[i]] instanceof VariableLengthDimensionDataChunk) {
+
+          VariableLengthDimensionDataChunk dimensionColumnDataChunk =
+              (VariableLengthDimensionDataChunk) blockChunkHolder
+                  .getDimensionDataChunk()[blocksIndex[i]];
+          if (null != dimensionColumnDataChunk.getCompleteDataChunk()) {
+            memberString =
+                readMemberBasedOnNoDictionaryVal(dimColumnEvaluatorInfo, dimensionColumnDataChunk,
+                    index);
+            if (null != memberString) {
+              if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+                memberString = null;
+              }
+            }
+            record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
+                .getDataBasedOnDataType(memberString,
+                    dimColumnEvaluatorInfo.getDimension().getDataType());
+          } else {
+            continue;
+          }
+        } else {
+          int dictionaryValue =
+              readSurrogatesFromColumnBlock(blockChunkHolder, index, dimColumnEvaluatorInfo,
+                  blocksIndex[i]);
+          Dictionary forwardDictionary = null;
+          if (dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
+              && !dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+            memberString =
+                getFilterActualValueFromDictionaryValue(dimColumnEvaluatorInfo, dictionaryValue,
+                    forwardDictionary);
+            record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
+                .getDataBasedOnDataType(memberString,
+                    dimColumnEvaluatorInfo.getDimension().getDataType());
+          } else if (dimColumnEvaluatorInfo.getDimension()
+              .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+
+            Object member = getFilterActualValueFromDirectDictionaryValue(dimColumnEvaluatorInfo,
+                dictionaryValue);
+            record[dimColumnEvaluatorInfo.getRowIndex()] = member;
+          }
+        }
+      } else {
+        try {
+          GenericQueryType complexType = complexDimensionInfoMap.get(blocksIndex[i]);
+          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+          DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
+          complexType
+              .parseBlocksAndReturnComplexColumnByteArray(blockChunkHolder.getDimensionDataChunk(),
+                  index, dataOutputStream);
+          record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
+              .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+          byteStream.close();
+        } catch (IOException e) {
+          LOGGER.info(e.getMessage());
+        }
+      }
+    }
+
+    DataType msrType;
+
+    for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : msrColEvalutorInfoList) {
+      switch (msrColumnEvalutorInfo.getType()) {
+        case INT:
+        case LONG:
+          msrType = DataType.LONG;
+          break;
+        case DECIMAL:
+          msrType = DataType.DECIMAL;
+          break;
+        default:
+          msrType = DataType.DOUBLE;
+      }
+      // if measure doesnt exist then set the default value.
+      if (!msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice()) {
+        record[msrColumnEvalutorInfo.getRowIndex()] = msrColumnEvalutorInfo.getDefaultValue();
+      } else {
+        Object msrValue;
+        switch (msrType) {
+          case INT:
+          case LONG:
+            msrValue =
+                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                    .getMeasureDataHolder().getReadableLongValueByIndex(index);
+            break;
+          case DECIMAL:
+            msrValue =
+                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                    .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+            break;
+          default:
+            msrValue =
+                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                    .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+        }
+        record[msrColumnEvalutorInfo.getRowIndex()] =
+            blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
+                .getNullValueIndexHolder().getBitSet().get(index) ? null : msrValue;
+
+      }
+    }
+    row.setValues(record);
+  }
+
+  /**
+   * method will read the actual data from the direct dictionary generator
+   * by passing direct dictionary value.
+   *
+   * @param dimColumnEvaluatorInfo
+   * @param dictionaryValue
+   * @return
+   */
+  private Object getFilterActualValueFromDirectDictionaryValue(
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int dictionaryValue) {
+    Object memberString = null;
+    DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+        .getDirectDictionaryGenerator(dimColumnEvaluatorInfo.getDimension().getDataType());
+    if (null != directDictionaryGenerator) {
+      memberString = directDictionaryGenerator.getValueFromSurrogate(dictionaryValue);
+    }
+    return memberString;
+  }
+
+  /**
+   * Read the actual filter member by passing the dictionary value from
+   * the forward dictionary cache which which holds column wise cache
+   *
+   * @param dimColumnEvaluatorInfo
+   * @param dictionaryValue
+   * @param forwardDictionary
+   * @return
+   * @throws QueryExecutionException
+   */
+  private String getFilterActualValueFromDictionaryValue(
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int dictionaryValue,
+      Dictionary forwardDictionary) throws QueryExecutionException {
+    String memberString;
+    try {
+      forwardDictionary = FilterUtil
+          .getForwardDictionaryCache(tableIdentifier, dimColumnEvaluatorInfo.getDimension());
+    } catch (QueryExecutionException e) {
+      throw new QueryExecutionException(e);
+    }
+
+    memberString = forwardDictionary.getDictionaryValueForKey(dictionaryValue);
+    if (null != memberString) {
+      if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+        memberString = null;
+      }
+    }
+    return memberString;
+  }
+
+  /**
+   * read the filter member dictionary data from the block corresponding to
+   * applied filter column
+   *
+   * @param blockChunkHolder
+   * @param index
+   * @param dimColumnEvaluatorInfo
+   * @return
+   */
+  private int readSurrogatesFromColumnBlock(BlocksChunkHolder blockChunkHolder, int index,
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
+    if (dimColumnEvaluatorInfo.getDimension().isColumnar()) {
+      byte[] rawData = blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
+      ByteBuffer byteBuffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      int dictionaryValue = CarbonUtil.getSurrogateKey(rawData, byteBuffer);
+      return dictionaryValue;
+    } else {
+      return readSurrogatesFromColumnGroupBlock(blockChunkHolder, index, dimColumnEvaluatorInfo,
+          blockIndex);
+    }
+
+  }
+
+  /**
+   * @param blockChunkHolder
+   * @param index
+   * @param dimColumnEvaluatorInfo
+   * @return read surrogate of given row of given column group dimension
+   */
+  private int readSurrogatesFromColumnGroupBlock(BlocksChunkHolder blockChunkHolder, int index,
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
+    try {
+      KeyStructureInfo keyStructureInfo =
+          QueryUtil.getKeyStructureInfo(segmentProperties, dimColumnEvaluatorInfo);
+      byte[] colData = blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
+      long[] result = keyStructureInfo.getKeyGenerator().getKeyArray(colData);
+      int colGroupId =
+          QueryUtil.getColumnGroupId(segmentProperties, dimColumnEvaluatorInfo.getColumnIndex());
+      int dictionaryValue = (int) result[segmentProperties
+          .getColumnGroupMdKeyOrdinal(colGroupId, dimColumnEvaluatorInfo.getColumnIndex())];
+      return dictionaryValue;
+    } catch (KeyGenException e) {
+      LOGGER.error(e);
+    }
+    return 0;
+  }
+
+  /**
+   * Reading the blocks for no dictionary data, in no dictionary case
+   * directly the filter data will read, no need to scan the dictionary
+   * or read the dictionary value.
+   *
+   * @param dimColumnEvaluatorInfo
+   * @param dimensionColumnDataChunk
+   * @param index
+   * @return
+   */
+  private String readMemberBasedOnNoDictionaryVal(
+      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
+      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int index) {
+    byte[] noDictionaryVals;
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse()) {
+      // Getting the data for direct surrogates.
+      noDictionaryVals = dimensionColumnDataChunk.getCompleteDataChunk()
+          .get(dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse()[index]);
+    } else {
+      noDictionaryVals = dimensionColumnDataChunk.getCompleteDataChunk().get(index);
+    }
+    return new String(noDictionaryVals, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    bitSet.set(0);
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
new file mode 100644
index 0000000..8a215dc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.expression.Expression;
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
+  private byte[][] filterRangeValues;
+
+  public RowLevelRangeGrtThanFiterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
+      // if any filter value is in range than this block needs to be
+      // scanned means always less than block max range.
+      if (maxCompare < 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i], true);
+      if (start >= 0) {
+        start = CarbonUtil.nextGreaterValueToTarget(start,
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[i], numerOfRows);
+      }
+      // Logic will handle the case where the range filter member is not present in block
+      // in this case the binary search will return the index from where the bit sets will be
+      // set inorder to apply filters. this is greater than filter so the range will be taken
+      // from the next element which is greater than filter member.
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its > filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            > 0) {
+          start = start + 1;
+        }
+      }
+
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+        bitSet.set(columnIndex[j]);
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k], true);
+        start = CarbonUtil.nextGreaterValueToTarget(start,
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k], numerOfRows);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its > filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) > 0) {
+            start = start + 1;
+          }
+        }
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
new file mode 100644
index 0000000..808cb51
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.expression.Expression;
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilterExecuterImpl {
+
+  protected byte[][] filterRangeValues;
+
+  public RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // filter value should be in range of max and min value i.e
+      // max>filtervalue>min
+      // so filter-max should be negative
+      int maxCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
+      // if any filter value is in range than this block needs to be
+      // scanned less than equal to max range.
+      if (maxCompare <= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its >= filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            >= 0) {
+          start = start + 1;
+        }
+      }
+      last = start;
+      for (int j = start; j < numerOfRows; j++) {
+
+        bitSet.set(columnIndex[j]);
+        last++;
+      }
+      startIndex = last;
+      if (startIndex >= numerOfRows) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k], false);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its >= filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
+              >= 0) {
+            start = start + 1;
+          }
+        }
+
+        last = start;
+        for (int j = start; j < numerOfRows; j++) {
+          bitSet.set(j);
+          last++;
+        }
+        startIndex = last;
+        if (startIndex >= numerOfRows) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
new file mode 100644
index 0000000..367ebbd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.expression.Expression;
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilterExecuterImpl {
+  protected byte[][] filterRangeValues;
+
+  public RowLevelRangeLessThanEqualFilterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
+
+      // if any filter applied is not in range of min and max of block
+      // then since its a less than equal to fiter validate whether the block
+      // min range is less than equal to applied filter member
+      if (minCompare >= 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    byte[] defaultValue = null;
+    if (dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(
+              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
+      defaultValue = FilterUtil.getMaskKey(key, dimColEvaluatorInfoList.get(0).getDimension(),
+          this.segmentProperties.getDimensionKeyGenerator());
+    }
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows, defaultValue);
+
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, defaultValue);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
+      byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int skip = 0;
+    int startIndex = 0;
+    byte[][] filterValues = this.filterRangeValues;
+    //find the number of default values to skip the null value in case of direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              defaultValue, true);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
+        }
+      } else {
+        skip = start;
+      }
+      startIndex = skip;
+    }
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i], true);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its >= filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            <= 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(columnIndex[j]);
+        last--;
+      }
+      startIndex = last;
+      if (startIndex <= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @param defaultValue
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows, byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      int skip = 0;
+      //find the number of default values to skip the null value in case of direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            defaultValue, true);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          skip = start;
+        }
+        startIndex = skip;
+      }
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k], true);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start == numerOfRows) {
+            start = start - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its <= filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
+              <= 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
new file mode 100644
index 0000000..af7f135
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.expression.Expression;
+import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.scan.filter.FilterUtil;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.scan.processor.BlocksChunkHolder;
+
+public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
+  private byte[][] filterRangeValues;
+
+  public RowLevelRangeLessThanFiterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[][] filterValues = this.filterRangeValues;
+    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // and filter-min should be positive
+      int minCompare =
+          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
+
+      // if any filter applied is not in range of min and max of block
+      // then since its a less than fiter validate whether the block
+      // min range is less  than applied filter member
+      if (minCompare > 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+
+  }
+
+  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
+      throws FilterUnsupportedException {
+    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+      return super.applyFilter(blockChunkHolder);
+    }
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
+    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
+        blockChunkHolder.getDataBlock().nodeSize());
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows) {
+    byte[] defaultValue = null;
+    if (dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(
+              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
+      defaultValue = FilterUtil.getMaskKey(key, dimColEvaluatorInfoList.get(0).getDimension(),
+          this.segmentProperties.getDimensionKeyGenerator());
+    }
+    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
+        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      return setFilterdIndexToBitSetWithColumnIndex(
+          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows, defaultValue);
+    }
+    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, defaultValue);
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
+      byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    int skip = 0;
+    byte[][] filterValues = this.filterRangeValues;
+
+    //find the number of default values to skip the null value in case of direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              defaultValue, false);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
+        }
+      } else {
+        skip = start;
+      }
+      startIndex = skip;
+    }
+
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      // Logic will handle the case where the range filter member is not present in block
+      // in this case the binary search will return the index from where the bit sets will be
+      // set inorder to apply filters. this is Lesser than filter so the range will be taken
+      // from the prev element which is Lesser than filter member.
+      start = CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[i]);
+      if (start < 0) {
+        start = -(start + 1);
+        if (start == numerOfRows) {
+          start = start - 1;
+        }
+        // Method will compare the tentative index value after binary search, this tentative
+        // index needs to be compared by the filter member if its < filter then from that
+        // index the bitset will be considered for filtering process.
+        if (ByteUtil
+            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
+            < 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(columnIndex[j]);
+        last--;
+      }
+      startIndex = last;
+      if (startIndex >= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnDataChunk
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
+      int numerOfRows, byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      int skip = 0;
+      byte[][] filterValues = this.filterRangeValues;
+      //find the number of default values to skip the null value in case of direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            defaultValue, false);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          skip = start;
+        }
+        startIndex = skip;
+      }
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil.getFirstIndexUsingBinarySearch(
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
+            filterValues[k], false);
+        start = CarbonUtil.nextLesserValueToTarget(start,
+            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k]);
+        if (start < 0) {
+          start = -(start + 1);
+          if (start >= numerOfRows) {
+            start = numerOfRows - 1;
+          }
+          // Method will compare the tentative index value after binary search, this tentative
+          // index needs to be compared by the filter member if its < filter then from that
+          // index the bitset will be considered for filtering process.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) < 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    }
+    return bitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cd6a4ff3/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
new file mode 100644
index 0000000..4c46f6d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.filter.executer;
+
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.scan.filter.intf.FilterExecuterType;
+import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
+
+public class RowLevelRangeTypeExecuterFacory {
+
+  private RowLevelRangeTypeExecuterFacory() {
+
+  }
+
+  /**
+   * The method returns the Row Level Range fiter type instance based on
+   * filter tree resolver type.
+   *
+   * @param filterExpressionResolverTree
+   * @param segmentProperties
+   * @param dataType                     DataType
+   * @return the generator instance
+   */
+  public static RowLevelFilterExecuterImpl getRowLevelRangeTypeExecuter(
+      FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree,
+      SegmentProperties segmentProperties) {
+    switch (filterExecuterType) {
+
+      case ROWLEVEL_LESSTHAN:
+        return new RowLevelRangeLessThanFiterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      case ROWLEVEL_LESSTHAN_EQUALTO:
+        return new RowLevelRangeLessThanEqualFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      case ROWLEVEL_GREATERTHAN_EQUALTO:
+        return new RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      case ROWLEVEL_GREATERTHAN:
+        return new RowLevelRangeGrtThanFiterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties), segmentProperties);
+      default:
+        // Scenario wont come logic must break
+        return null;
+
+    }
+  }
+
+}