You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:31 UTC

[44/50] [abbrv] incubator-carbondata git commit: Merge remote-tracking branch 'carbon_master/master' into apache/master

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
index 71567ea,0000000..ff6ecd2
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/IncludeFilterExecuterImpl.java
@@@ -1,224 -1,0 +1,224 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.core.carbon.datastore.block.SegmentProperties;
 +import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 +import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 +import org.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 +import org.carbondata.core.util.ByteUtil;
 +import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.scan.filter.FilterUtil;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.processor.BlocksChunkHolder;
 +
 +public class IncludeFilterExecuterImpl implements FilterExecuter {
 +
 +  protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
 +  protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
 +  protected SegmentProperties segmentProperties;
 +
 +  public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
 +      SegmentProperties segmentProperties) {
 +    this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
 +    this.segmentProperties = segmentProperties;
 +    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
 +    FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
 +        segmentProperties.getDimensionKeyGenerator(), dimColumnEvaluatorInfo.getDimension(),
 +        dimColumnExecuterInfo);
 +
 +  }
 +
 +  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) {
 +    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
 +        .get(dimColumnEvaluatorInfo.getColumnIndex());
 +    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
 +      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
 +          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
 +    }
 +    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
 +        blockChunkHolder.getDataBlock().nodeSize());
 +  }
 +
 +  protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
 +      int numerOfRows) {
 +    if (dimensionColumnDataChunk.getAttributes().isNoDictionary()
 +        && dimensionColumnDataChunk instanceof VariableLengthDimensionDataChunk) {
 +      return setDirectKeyFilterIndexToBitSet(
 +          (VariableLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
 +    } else if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
 +        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
 +      return setFilterdIndexToBitSetWithColumnIndex(
 +          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
 +    }
 +
 +    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
 +  }
 +
 +  private BitSet setDirectKeyFilterIndexToBitSet(
 +      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
 +    BitSet bitSet = new BitSet(numerOfRows);
 +    List<byte[]> listOfColumnarKeyBlockDataForNoDictionaryVals =
 +        dimensionColumnDataChunk.getCompleteDataChunk();
 +    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
 +    int[] columnIndexArray = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
 +    int[] columnReverseIndexArray =
 +        dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse();
 +    for (int i = 0; i < filterValues.length; i++) {
 +      byte[] filterVal = filterValues[i];
 +      if (null != listOfColumnarKeyBlockDataForNoDictionaryVals) {
 +        if (null != columnIndexArray) {
 +          for (int index : columnIndexArray) {
 +            byte[] noDictionaryVal =
 +                listOfColumnarKeyBlockDataForNoDictionaryVals.get(columnReverseIndexArray[index]);
 +            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
 +              bitSet.set(index);
 +            }
 +          }
 +        } else if (null != columnReverseIndexArray) {
 +          for (int index : columnReverseIndexArray) {
 +            byte[] noDictionaryVal =
 +                listOfColumnarKeyBlockDataForNoDictionaryVals.get(columnReverseIndexArray[index]);
 +            if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterVal, noDictionaryVal) == 0) {
 +              bitSet.set(index);
 +            }
 +          }
 +        } else {
 +          for (int index = 0;
 +               index < listOfColumnarKeyBlockDataForNoDictionaryVals.size(); index++) {
 +            if (ByteUtil.UnsafeComparer.INSTANCE
 +                .compareTo(filterVal, listOfColumnarKeyBlockDataForNoDictionaryVals.get(index))
 +                == 0) {
 +              bitSet.set(index);
 +            }
 +          }
 +        }
 +      }
 +    }
 +    return bitSet;
 +
 +  }
 +
 +  private BitSet setFilterdIndexToBitSetWithColumnIndex(
 +      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
 +    BitSet bitSet = new BitSet(numerOfRows);
 +    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
 +    int start = 0;
 +    int last = 0;
 +    int startIndex = 0;
 +    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
 +    for (int i = 0; i < filterValues.length; i++) {
 +      start = CarbonUtil
 +          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-               filterValues[i]);
-       if (start == -1) {
++              filterValues[i], false);
++      if (start < 0) {
 +        continue;
 +      }
 +      bitSet.set(columnIndex[start]);
 +      last = start;
 +      for (int j = start + 1; j < numerOfRows; j++) {
 +        if (ByteUtil.UnsafeComparer.INSTANCE
 +            .compareTo(dimensionColumnDataChunk.getCompleteDataChunk(), j * filterValues[i].length,
 +                filterValues[i].length, filterValues[i], 0, filterValues[i].length) == 0) {
 +          bitSet.set(columnIndex[j]);
 +          last++;
 +        } else {
 +          break;
 +        }
 +      }
 +      startIndex = last;
 +      if (startIndex >= numerOfRows) {
 +        break;
 +      }
 +    }
 +    return bitSet;
 +  }
 +
 +  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
 +      int numerOfRows) {
 +    BitSet bitSet = new BitSet(numerOfRows);
 +    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
 +      FixedLengthDimensionDataChunk fixedDimensionChunk =
 +          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk;
 +      int start = 0;
 +      int last = 0;
 +      int startIndex = 0;
 +      byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
 +      for (int k = 0; k < filterValues.length; k++) {
 +        start = CarbonUtil.getFirstIndexUsingBinarySearch(
 +            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
-             filterValues[k]);
-         if (start == -1) {
++            filterValues[k], false);
++        if (start < 0) {
 +          continue;
 +        }
 +        bitSet.set(start);
 +        last = start;
 +        for (int j = start + 1; j < numerOfRows; j++) {
 +          if (ByteUtil.UnsafeComparer.INSTANCE
 +              .compareTo(fixedDimensionChunk.getCompleteDataChunk(), j * filterValues[k].length,
 +                  filterValues[k].length, filterValues[k], 0, filterValues[k].length) == 0) {
 +            bitSet.set(j);
 +            last++;
 +          } else {
 +            break;
 +          }
 +        }
 +        startIndex = last;
 +        if (startIndex >= numerOfRows) {
 +          break;
 +        }
 +      }
 +    }
 +    return bitSet;
 +  }
 +
 +  public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
 +    BitSet bitSet = new BitSet(1);
 +    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
 +    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
 +    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
 +
 +    boolean isScanRequired = false;
 +    for (int k = 0; k < filterValues.length; k++) {
 +      // filter value should be in range of max and min value i.e
 +      // max>filtervalue>min
 +      // so filter-max should be negative
 +      int maxCompare =
 +          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMaxVal[blockIndex]);
 +      // and filter-min should be positive
 +      int minCompare =
 +          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blkMinVal[blockIndex]);
 +
 +      // if any filter value is in range than this block needs to be
 +      // scanned
 +      if (maxCompare <= 0 && minCompare >= 0) {
 +        isScanRequired = true;
 +        break;
 +      }
 +    }
 +    if (isScanRequired) {
 +      bitSet.set(0);
 +    }
 +    return bitSet;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 0463978,0000000..9199907
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@@ -1,331 -1,0 +1,379 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
 +import java.nio.ByteBuffer;
 +import java.nio.charset.Charset;
 +import java.util.ArrayList;
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.common.logging.LogService;
 +import org.carbondata.common.logging.LogServiceFactory;
 +import org.carbondata.core.cache.dictionary.Dictionary;
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
++import org.carbondata.core.carbon.datastore.block.SegmentProperties;
 +import org.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
 +import org.carbondata.core.carbon.metadata.datatype.DataType;
 +import org.carbondata.core.carbon.metadata.encoder.Encoding;
 +import org.carbondata.core.constants.CarbonCommonConstants;
++import org.carbondata.core.keygenerator.KeyGenException;
 +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 +import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.scan.executor.exception.QueryExecutionException;
++import org.carbondata.scan.executor.infos.KeyStructureInfo;
++import org.carbondata.scan.executor.util.QueryUtil;
 +import org.carbondata.scan.expression.Expression;
++import org.carbondata.scan.expression.exception.FilterIllegalMemberException;
 +import org.carbondata.scan.expression.exception.FilterUnsupportedException;
 +import org.carbondata.scan.filter.FilterUtil;
 +import org.carbondata.scan.filter.GenericQueryType;
 +import org.carbondata.scan.filter.intf.RowImpl;
 +import org.carbondata.scan.filter.intf.RowIntf;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 +import org.carbondata.scan.processor.BlocksChunkHolder;
 +import org.carbondata.scan.util.DataTypeUtil;
 +
 +public class RowLevelFilterExecuterImpl implements FilterExecuter {
 +
 +  private static final LogService LOGGER =
 +      LogServiceFactory.getLogService(RowLevelFilterExecuterImpl.class.getName());
 +  protected List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList;
 +  protected List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList;
 +  protected Expression exp;
 +  protected AbsoluteTableIdentifier tableIdentifier;
++  protected SegmentProperties segmentProperties;
++  /**
++   * it has index at which given dimension is stored in file
++   */
++  private int[] blocksIndex;
 +
 +  public RowLevelFilterExecuterImpl(List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
 +      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-       AbsoluteTableIdentifier tableIdentifier) {
++      AbsoluteTableIdentifier tableIdentifier, SegmentProperties segmentProperties) {
 +    this.dimColEvaluatorInfoList = dimColEvaluatorInfoList;
++    this.segmentProperties = segmentProperties;
++    this.blocksIndex = new int[dimColEvaluatorInfoList.size()];
++    for (int i=0;i<dimColEvaluatorInfoList.size();i++) {
++      this.blocksIndex[i] = segmentProperties.getDimensionOrdinalToBlockMapping()
++              .get(dimColEvaluatorInfoList.get(i).getColumnIndex());
++    }
 +    if (null == msrColEvalutorInfoList) {
 +      this.msrColEvalutorInfoList = new ArrayList<MeasureColumnResolvedFilterInfo>(20);
 +    } else {
 +      this.msrColEvalutorInfoList = msrColEvalutorInfoList;
 +    }
 +    this.exp = exp;
 +    this.tableIdentifier = tableIdentifier;
 +  }
 +
 +  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
 +      throws FilterUnsupportedException {
-     for (DimColumnResolvedFilterInfo dimColumnEvaluatorInfo : dimColEvaluatorInfoList) {
++    for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) {
++      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i);
 +      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
 +          && dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.STRUCT) {
-         if (null == blockChunkHolder.getDimensionDataChunk()[dimColumnEvaluatorInfo
-             .getColumnIndex()]) {
-           blockChunkHolder.getDimensionDataChunk()[dimColumnEvaluatorInfo.getColumnIndex()] =
-               blockChunkHolder.getDataBlock().getDimensionChunk(blockChunkHolder.getFileReader(),
-                   dimColumnEvaluatorInfo.getColumnIndex());
++        if (null == blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]]) {
++          blockChunkHolder.getDimensionDataChunk()[blocksIndex[i]] = blockChunkHolder.getDataBlock()
++              .getDimensionChunk(blockChunkHolder.getFileReader(), blocksIndex[i]);
 +        }
 +      } else {
 +        GenericQueryType complexType = dimColumnEvaluatorInfo.getComplexTypesWithBlockStartIndex()
-             .get(dimColumnEvaluatorInfo.getColumnIndex());
++            .get(blocksIndex[i]);
 +        complexType.fillRequiredBlockData(blockChunkHolder);
 +      }
 +    }
 +
 +    // CHECKSTYLE:OFF Approval No:Approval-V1R2C10_001
 +    if (null != msrColEvalutorInfoList) {
 +      for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : msrColEvalutorInfoList) {
 +        if (msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice() && null == blockChunkHolder
 +            .getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]) {
 +          blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] =
 +              blockChunkHolder.getDataBlock().getMeasureChunk(blockChunkHolder.getFileReader(),
 +                  msrColumnEvalutorInfo.getColumnIndex());
 +        }
 +      }
 +    }
 +    // CHECKSTYLE:ON
 +
 +    int numberOfRows = blockChunkHolder.getDataBlock().nodeSize();
 +    BitSet set = new BitSet(numberOfRows);
 +    RowIntf row = new RowImpl();
- 
-     // CHECKSTYLE:OFF Approval No:Approval-V1R2C10_007
++    boolean invalidRowsPresent = false;
 +    for (int index = 0; index < numberOfRows; index++) {
 +      try {
 +        createRow(blockChunkHolder, row, index);
-       } catch (QueryExecutionException e1) {
-         // TODO Auto-generated catch block
-         e1.printStackTrace();
++      } catch (QueryExecutionException e) {
++        FilterUtil.logError(e, invalidRowsPresent);
 +      }
++      Boolean rslt = false;
 +      try {
-         Boolean rslt = exp.evaluate(row).getBoolean();
-         if (null != rslt && rslt) {
-           set.set(index);
-         }
-       } catch (FilterUnsupportedException e) {
-         throw new FilterUnsupportedException(e.getMessage());
++        rslt = exp.evaluate(row).getBoolean();
++      }
++      // Any invalid member while evaluation shall be ignored, system will log the
++      // error only once since all rows the evaluation happens so inorder to avoid
++      // too much log inforation only once the log will be printed.
++      catch (FilterIllegalMemberException e) {
++        FilterUtil.logError(e, invalidRowsPresent);
++      }
++      if (null != rslt && rslt) {
++        set.set(index);
 +      }
 +    }
-     // CHECKSTYLE:ON
- 
 +    return set;
 +  }
 +
 +  /**
 +   * Method will read the members of particular dimension block and create
 +   * a row instance for further processing of the filters
 +   *
 +   * @param blockChunkHolder
 +   * @param row
 +   * @param index
 +   * @throws QueryExecutionException
 +   */
 +  private void createRow(BlocksChunkHolder blockChunkHolder, RowIntf row, int index)
 +      throws QueryExecutionException {
 +    Object[] record = new Object[dimColEvaluatorInfoList.size() + msrColEvalutorInfoList.size()];
 +    String memberString = null;
-     for (DimColumnResolvedFilterInfo dimColumnEvaluatorInfo : dimColEvaluatorInfoList) {
++    for (int i=0;i<dimColEvaluatorInfoList.size();i++) {
++      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i);
 +      if (dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.ARRAY
 +          && dimColumnEvaluatorInfo.getDimension().getDataType() != DataType.STRUCT) {
 +        if (!dimColumnEvaluatorInfo.isDimensionExistsInCurrentSilce()) {
 +          record[dimColumnEvaluatorInfo.getRowIndex()] = dimColumnEvaluatorInfo.getDefaultValue();
 +        }
 +        if (!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
-             && blockChunkHolder.getDimensionDataChunk()[dimColumnEvaluatorInfo
-             .getColumnIndex()] instanceof VariableLengthDimensionDataChunk) {
++            && blockChunkHolder
++            .getDimensionDataChunk()[blocksIndex[i]] instanceof VariableLengthDimensionDataChunk) {
 +
 +          VariableLengthDimensionDataChunk dimensionColumnDataChunk =
 +              (VariableLengthDimensionDataChunk) blockChunkHolder
-                   .getDimensionDataChunk()[dimColumnEvaluatorInfo.getColumnIndex()];
++                  .getDimensionDataChunk()[blocksIndex[i]];
 +          if (null != dimensionColumnDataChunk.getCompleteDataChunk()) {
 +            memberString =
 +                readMemberBasedOnNoDictionaryVal(dimColumnEvaluatorInfo, dimensionColumnDataChunk,
 +                    index);
 +            if (null != memberString) {
 +              if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
 +                memberString = null;
 +              }
 +            }
 +            record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
 +                .getDataBasedOnDataType(memberString,
 +                    dimColumnEvaluatorInfo.getDimension().getDataType());
 +          } else {
 +            continue;
 +          }
 +        } else {
 +          int dictionaryValue =
-               readSurrogatesFromColumnBlock(blockChunkHolder, index, dimColumnEvaluatorInfo);
++              readSurrogatesFromColumnBlock(blockChunkHolder, index, dimColumnEvaluatorInfo,
++                  blocksIndex[i]);
 +          Dictionary forwardDictionary = null;
 +          if (dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)
 +              && !dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
 +            memberString =
 +                getFilterActualValueFromDictionaryValue(dimColumnEvaluatorInfo, dictionaryValue,
 +                    forwardDictionary);
 +            record[dimColumnEvaluatorInfo.getRowIndex()] = DataTypeUtil
 +                .getDataBasedOnDataType(memberString,
 +                    dimColumnEvaluatorInfo.getDimension().getDataType());
 +          } else if (dimColumnEvaluatorInfo.getDimension()
 +              .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
 +
 +            Object member = getFilterActualValueFromDirectDictionaryValue(dimColumnEvaluatorInfo,
 +                dictionaryValue);
 +            record[dimColumnEvaluatorInfo.getRowIndex()] = member;
 +          }
 +        }
 +      }
 +    }
 +
 +    DataType msrType;
 +
 +    for (MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo : msrColEvalutorInfoList) {
 +      switch (msrColumnEvalutorInfo.getType()) {
 +        case LONG:
 +          msrType = DataType.LONG;
 +          break;
 +        case DECIMAL:
 +          msrType = DataType.DECIMAL;
 +          break;
 +        default:
 +          msrType = DataType.DOUBLE;
 +      }
 +      // if measure doesnt exist then set the default value.
 +      if (!msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice()) {
 +        record[msrColumnEvalutorInfo.getRowIndex()] = msrColumnEvalutorInfo.getDefaultValue();
 +      } else {
 +        Object msrValue;
 +        switch (msrType) {
 +          case LONG:
 +            msrValue =
 +                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
 +                    .getMeasureDataHolder().getReadableLongValueByIndex(index);
 +            break;
 +          case DECIMAL:
 +            msrValue =
 +                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
 +                    .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
 +            break;
 +          default:
 +            msrValue =
 +                blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
 +                    .getMeasureDataHolder().getReadableDoubleValueByIndex(index);
 +        }
-         record[msrColumnEvalutorInfo.getRowIndex()] = msrValue;
++        record[msrColumnEvalutorInfo.getRowIndex()] =
++            blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()]
++                .getNullValueIndexHolder().getBitSet().get(index) ? null : msrValue;
 +
 +      }
 +    }
 +    row.setValues(record);
 +  }
 +
 +  /**
 +   * method will read the actual data from the direct dictionary generator
 +   * by passing direct dictionary value.
 +   *
 +   * @param dimColumnEvaluatorInfo
 +   * @param dictionaryValue
 +   * @return
 +   */
 +  private Object getFilterActualValueFromDirectDictionaryValue(
 +      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int dictionaryValue) {
 +    Object memberString = null;
 +    DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
 +        .getDirectDictionaryGenerator(dimColumnEvaluatorInfo.getDimension().getDataType());
 +    if (null != directDictionaryGenerator) {
 +      memberString = directDictionaryGenerator.getValueFromSurrogate(dictionaryValue);
 +    }
 +    return memberString;
 +  }
 +
 +  /**
 +   * Read the actual filter member by passing the dictionary value from
 +   * the forward dictionary cache which which holds column wise cache
 +   *
 +   * @param dimColumnEvaluatorInfo
 +   * @param dictionaryValue
 +   * @param forwardDictionary
 +   * @return
 +   * @throws QueryExecutionException
 +   */
 +  private String getFilterActualValueFromDictionaryValue(
 +      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int dictionaryValue,
 +      Dictionary forwardDictionary) throws QueryExecutionException {
 +    String memberString;
 +    try {
 +      forwardDictionary = FilterUtil
 +          .getForwardDictionaryCache(tableIdentifier, dimColumnEvaluatorInfo.getDimension());
 +    } catch (QueryExecutionException e) {
 +      throw new QueryExecutionException(e);
 +    }
 +
 +    memberString = forwardDictionary.getDictionaryValueForKey(dictionaryValue);
 +    if (null != memberString) {
 +      if (memberString.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
 +        memberString = null;
 +      }
 +    }
 +    return memberString;
 +  }
 +
 +  /**
 +   * read the filter member dictionary data from the block corresponding to
 +   * applied filter column
 +   *
 +   * @param blockChunkHolder
 +   * @param index
 +   * @param dimColumnEvaluatorInfo
 +   * @return
 +   */
 +  private int readSurrogatesFromColumnBlock(BlocksChunkHolder blockChunkHolder, int index,
-       DimColumnResolvedFilterInfo dimColumnEvaluatorInfo) {
-     byte[] rawData =
-         blockChunkHolder.getDimensionDataChunk()[dimColumnEvaluatorInfo.getColumnIndex()]
-             .getChunkData(index);
-     ByteBuffer byteBuffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
-     int dictionaryValue = CarbonUtil.getSurrogateKey(rawData, byteBuffer);
-     return dictionaryValue;
++      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
++    if (dimColumnEvaluatorInfo.getDimension().isColumnar()) {
++      byte[] rawData = blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
++      ByteBuffer byteBuffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
++      int dictionaryValue = CarbonUtil.getSurrogateKey(rawData, byteBuffer);
++      return dictionaryValue;
++    } else {
++      return readSurrogatesFromColumnGroupBlock(blockChunkHolder, index, dimColumnEvaluatorInfo,
++          blockIndex);
++    }
++
++  }
++
++  /**
++   * @param blockChunkHolder
++   * @param index
++   * @param dimColumnEvaluatorInfo
++   * @return read surrogate of given row of given column group dimension
++   */
++  private int readSurrogatesFromColumnGroupBlock(BlocksChunkHolder blockChunkHolder, int index,
++      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo, int blockIndex) {
++    try {
++      KeyStructureInfo keyStructureInfo =
++          QueryUtil.getKeyStructureInfo(segmentProperties, dimColumnEvaluatorInfo);
++      byte[] colData = blockChunkHolder.getDimensionDataChunk()[blockIndex].getChunkData(index);
++      long[] result = keyStructureInfo.getKeyGenerator().getKeyArray(colData);
++      int colGroupId =
++          QueryUtil.getColumnGroupId(segmentProperties, dimColumnEvaluatorInfo.getColumnIndex());
++      int dictionaryValue = (int) result[segmentProperties
++          .getColumnGroupMdKeyOrdinal(colGroupId, dimColumnEvaluatorInfo.getColumnIndex())];
++      return dictionaryValue;
++    } catch (KeyGenException e) {
++      LOGGER.error(e);
++    }
++    return 0;
 +  }
 +
 +  /**
 +   * Reading the blocks for no dictionary data, in no dictionary case
 +   * directly the filter data will read, no need to scan the dictionary
 +   * or read the dictionary value.
 +   *
 +   * @param dimColumnEvaluatorInfo
 +   * @param dimensionColumnDataChunk
 +   * @param index
 +   * @return
 +   */
 +  private String readMemberBasedOnNoDictionaryVal(
 +      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
 +      VariableLengthDimensionDataChunk dimensionColumnDataChunk, int index) {
 +    byte[] noDictionaryVals;
 +    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse()) {
 +      // Getting the data for direct surrogates.
 +      noDictionaryVals = dimensionColumnDataChunk.getCompleteDataChunk()
 +          .get(dimensionColumnDataChunk.getAttributes().getInvertedIndexesReverse()[index]);
 +    } else {
 +      noDictionaryVals = dimensionColumnDataChunk.getCompleteDataChunk().get(index);
 +    }
 +    return new String(noDictionaryVals, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
 +  }
 +
 +  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
 +    BitSet bitSet = new BitSet(1);
 +    bitSet.set(0);
 +    return bitSet;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 2681ccc,0000000..0f2eda1
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@@ -1,66 -1,0 +1,208 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
++import org.carbondata.core.carbon.datastore.block.SegmentProperties;
++import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
++import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
++import org.carbondata.core.carbon.metadata.encoder.Encoding;
 +import org.carbondata.core.util.ByteUtil;
++import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.scan.expression.Expression;
++import org.carbondata.scan.expression.exception.FilterUnsupportedException;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
++import org.carbondata.scan.processor.BlocksChunkHolder;
 +
 +public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
 +  private byte[][] filterRangeValues;
 +
 +  public RowLevelRangeGrtThanFiterExecuterImpl(
 +      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
 +      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
-     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
++      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
++      SegmentProperties segmentProperties) {
++    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties);
 +    this.filterRangeValues = filterRangeValues;
 +  }
 +
 +  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
 +    BitSet bitSet = new BitSet(1);
 +    byte[][] filterValues = this.filterRangeValues;
 +    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
 +    boolean isScanRequired = false;
 +    for (int k = 0; k < filterValues.length; k++) {
 +      // filter value should be in range of max and min value i.e
 +      // max>filtervalue>min
 +      // so filter-max should be negative
 +      int maxCompare =
 +          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
- 
 +      // if any filter value is in range than this block needs to be
 +      // scanned means always less than block max range.
 +      if (maxCompare < 0) {
 +        isScanRequired = true;
 +        break;
 +      }
 +    }
 +    if (isScanRequired) {
 +      bitSet.set(0);
 +    }
 +    return bitSet;
 +
 +  }
++
++  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
++      throws FilterUnsupportedException {
++    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
++      return super.applyFilter(blockChunkHolder);
++    }
++    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
++        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
++    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
++      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
++          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
++    }
++    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
++        blockChunkHolder.getDataBlock().nodeSize());
++  }
++
++  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows) {
++    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
++        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      return setFilterdIndexToBitSetWithColumnIndex(
++          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
++    }
++    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all members
++   * will be considered for applying range filters. this method will be called if the
++   * column is not supported by default so column index mapping  will be present for
++   * accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSetWithColumnIndex(
++      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
++    int start = 0;
++    int last = 0;
++    int startIndex = 0;
++    byte[][] filterValues = this.filterRangeValues;
++    for (int i = 0; i < filterValues.length; i++) {
++      start = CarbonUtil
++          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++              filterValues[i], true);
++      if (start >= 0) {
++        start = CarbonUtil.nextGreaterValueToTarget(start,
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[i], numerOfRows);
++      }
++      // Logic will handle the case where the range filter member is not present in block
++      // in this case the binary search will return the index from where the bit sets will be
++      // set inorder to apply filters. this is greater than filter so the range will be taken
++      // from the next element which is greater than filter member.
++      if (start < 0) {
++        start = -(start + 1);
++        if (start == numerOfRows) {
++          start = start - 1;
++        }
++        // Method will compare the tentative index value after binary search, this tentative
++        // index needs to be compared by the filter member if its > filter then from that
++        // index the bitset will be considered for filtering process.
++        if (ByteUtil
++            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
++            > 0) {
++          start = start + 1;
++        }
++      }
++
++      last = start;
++      for (int j = start; j < numerOfRows; j++) {
++        bitSet.set(columnIndex[j]);
++        last++;
++      }
++      startIndex = last;
++      if (startIndex >= numerOfRows) {
++        break;
++      }
++    }
++
++    return bitSet;
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all
++   * members will be considered for applying range filters. this method will
++   * be called if the column is sorted default so column index
++   * mapping will be present for accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      int start = 0;
++      int last = 0;
++      int startIndex = 0;
++      byte[][] filterValues = this.filterRangeValues;
++      for (int k = 0; k < filterValues.length; k++) {
++        start = CarbonUtil.getFirstIndexUsingBinarySearch(
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++            filterValues[k], true);
++        start = CarbonUtil.nextGreaterValueToTarget(start,
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k], numerOfRows);
++        if (start < 0) {
++          start = -(start + 1);
++          if (start == numerOfRows) {
++            start = start - 1;
++          }
++          // Method will compare the tentative index value after binary search, this tentative
++          // index needs to be compared by the filter member if its > filter then from that
++          // index the bitset will be considered for filtering process.
++          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) > 0) {
++            start = start + 1;
++          }
++        }
++        last = start;
++        for (int j = start; j < numerOfRows; j++) {
++          bitSet.set(j);
++          last++;
++        }
++        startIndex = last;
++        if (startIndex >= numerOfRows) {
++          break;
++        }
++      }
++    }
++    return bitSet;
++  }
++
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index ef11b4a,0000000..e715261
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@@ -1,66 -1,0 +1,199 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
++import org.carbondata.core.carbon.datastore.block.SegmentProperties;
++import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
++import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
++import org.carbondata.core.carbon.metadata.encoder.Encoding;
 +import org.carbondata.core.util.ByteUtil;
++import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.scan.expression.Expression;
++import org.carbondata.scan.expression.exception.FilterUnsupportedException;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
++import org.carbondata.scan.processor.BlocksChunkHolder;
 +
 +public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilterExecuterImpl {
 +
-   private byte[][] filterRangeValues;
++  protected byte[][] filterRangeValues;
 +
 +  public RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
 +      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
 +      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
-     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
++      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
++      SegmentProperties segmentProperties) {
++    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties);
 +    this.filterRangeValues = filterRangeValues;
 +  }
 +
 +  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
 +    BitSet bitSet = new BitSet(1);
 +    byte[][] filterValues = this.filterRangeValues;
 +    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
 +    boolean isScanRequired = false;
 +    for (int k = 0; k < filterValues.length; k++) {
 +      // filter value should be in range of max and min value i.e
 +      // max>filtervalue>min
 +      // so filter-max should be negative
 +      int maxCompare =
 +          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMaxValue[columnIndex]);
 +      // if any filter value is in range than this block needs to be
 +      // scanned less than equal to max range.
 +      if (maxCompare <= 0) {
 +        isScanRequired = true;
 +        break;
 +      }
 +    }
 +    if (isScanRequired) {
 +      bitSet.set(0);
 +    }
 +    return bitSet;
 +
 +  }
++
++  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
++      throws FilterUnsupportedException {
++    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
++      return super.applyFilter(blockChunkHolder);
++    }
++    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
++        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
++    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
++      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
++          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
++    }
++    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
++        blockChunkHolder.getDataBlock().nodeSize());
++  }
++
++  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows) {
++    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
++        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      return setFilterdIndexToBitSetWithColumnIndex(
++          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows);
++    }
++    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows);
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all members
++   * will be considered for applying range filters. this method will be called if the
++   * column is not supported by default so column index mapping  will be present for
++   * accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSetWithColumnIndex(
++      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
++    int start = 0;
++    int last = 0;
++    int startIndex = 0;
++    byte[][] filterValues = this.filterRangeValues;
++    for (int i = 0; i < filterValues.length; i++) {
++      start = CarbonUtil
++          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++              filterValues[i], false);
++      if (start < 0) {
++        start = -(start + 1);
++        if (start == numerOfRows) {
++          start = start - 1;
++        }
++        // Method will compare the tentative index value after binary search, this tentative
++        // index needs to be compared by the filter member if its >= filter then from that
++        // index the bitset will be considered for filtering process.
++        if (ByteUtil
++            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
++            >= 0) {
++          start = start + 1;
++        }
++      }
++      last = start;
++      for (int j = start; j < numerOfRows; j++) {
++
++        bitSet.set(columnIndex[j]);
++        last++;
++      }
++      startIndex = last;
++      if (startIndex >= numerOfRows) {
++        break;
++      }
++    }
++    return bitSet;
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all
++   * members will be considered for applying range filters. this method will
++   * be called if the column is sorted default so column index
++   * mapping will be present for accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      int start = 0;
++      int last = 0;
++      int startIndex = 0;
++      byte[][] filterValues = this.filterRangeValues;
++      for (int k = 0; k < filterValues.length; k++) {
++        start = CarbonUtil.getFirstIndexUsingBinarySearch(
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++            filterValues[k], false);
++        if (start < 0) {
++          start = -(start + 1);
++          if (start == numerOfRows) {
++            start = start - 1;
++          }
++          // Method will compare the tentative index value after binary search, this tentative
++          // index needs to be compared by the filter member if its >= filter then from that
++          // index the bitset will be considered for filtering process.
++          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
++              >= 0) {
++            start = start + 1;
++          }
++        }
++
++        last = start;
++        for (int j = start; j < numerOfRows; j++) {
++          bitSet.set(j);
++          last++;
++        }
++        startIndex = last;
++        if (startIndex >= numerOfRows) {
++          break;
++        }
++      }
++    }
++    return bitSet;
++  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index afc1ccb,0000000..92efb0a
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@@ -1,66 -1,0 +1,247 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
++import org.carbondata.core.carbon.datastore.block.SegmentProperties;
++import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
++import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
++import org.carbondata.core.carbon.metadata.encoder.Encoding;
++import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
++import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 +import org.carbondata.core.util.ByteUtil;
++import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.scan.expression.Expression;
++import org.carbondata.scan.expression.exception.FilterUnsupportedException;
++import org.carbondata.scan.filter.FilterUtil;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
++import org.carbondata.scan.processor.BlocksChunkHolder;
 +
 +public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilterExecuterImpl {
-   private byte[][] filterRangeValues;
++  protected byte[][] filterRangeValues;
 +
 +  public RowLevelRangeLessThanEqualFilterExecuterImpl(
 +      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
 +      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
-     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
++      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
++      SegmentProperties segmentProperties) {
++    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties);
 +    this.filterRangeValues = filterRangeValues;
 +  }
 +
 +  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
 +    BitSet bitSet = new BitSet(1);
 +    byte[][] filterValues = this.filterRangeValues;
 +    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
 +    boolean isScanRequired = false;
 +    for (int k = 0; k < filterValues.length; k++) {
 +      // and filter-min should be positive
 +      int minCompare =
 +          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
 +
 +      // if any filter applied is not in range of min and max of block
 +      // then since its a less than equal to fiter validate whether the block
 +      // min range is less than equal to applied filter member
 +      if (minCompare >= 0) {
 +        isScanRequired = true;
 +        break;
 +      }
 +    }
 +    if (isScanRequired) {
 +      bitSet.set(0);
 +    }
 +    return bitSet;
 +
 +  }
 +
++  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
++      throws FilterUnsupportedException {
++    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
++      return super.applyFilter(blockChunkHolder);
++    }
++    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
++        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
++    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
++      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
++          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
++    }
++    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
++        blockChunkHolder.getDataBlock().nodeSize());
++  }
++
++  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows) {
++    byte[] defaultValue = null;
++    if (dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
++      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
++          .getDirectDictionaryGenerator(
++              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
++      int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
++      defaultValue = FilterUtil.getMaskKey(key, dimColEvaluatorInfoList.get(0).getDimension(),
++          this.segmentProperties.getDimensionKeyGenerator());
++    }
++    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
++        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++
++      return setFilterdIndexToBitSetWithColumnIndex(
++          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows, defaultValue);
++
++    }
++    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, defaultValue);
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all members
++   * will be considered for applying range filters. this method will be called if the
++   * column is not supported by default so column index mapping  will be present for
++   * accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSetWithColumnIndex(
++      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
++      byte[] defaultValue) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
++    int start = 0;
++    int last = 0;
++    int skip = 0;
++    int startIndex = 0;
++    byte[][] filterValues = this.filterRangeValues;
++    //find the number of default values to skip the null value in case of direct dictionary
++    if (null != defaultValue) {
++      start = CarbonUtil
++          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++              defaultValue, true);
++      if (start < 0) {
++        skip = -(start + 1);
++        // end of block
++        if (skip == numerOfRows) {
++          return bitSet;
++        }
++      } else {
++        skip = start;
++      }
++      startIndex = skip;
++    }
++    for (int i = 0; i < filterValues.length; i++) {
++      start = CarbonUtil
++          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++              filterValues[i], true);
++      if (start < 0) {
++        start = -(start + 1);
++        if (start == numerOfRows) {
++          start = start - 1;
++        }
++        // Method will compare the tentative index value after binary search, this tentative
++        // index needs to be compared by the filter member if its >= filter then from that
++        // index the bitset will be considered for filtering process.
++        if (ByteUtil
++            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
++            <= 0) {
++          start = start - 1;
++        }
++      }
++      last = start;
++      for (int j = start; j >= skip; j--) {
++        bitSet.set(columnIndex[j]);
++        last--;
++      }
++      startIndex = last;
++      if (startIndex <= 0) {
++        break;
++      }
++    }
++    return bitSet;
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all
++   * members will be considered for applying range filters. this method will
++   * be called if the column is sorted default so column index
++   * mapping will be present for accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @param defaultValue
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows, byte[] defaultValue) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      int start = 0;
++      int last = 0;
++      int startIndex = 0;
++      byte[][] filterValues = this.filterRangeValues;
++      int skip = 0;
++      //find the number of default values to skip the null value in case of direct dictionary
++      if (null != defaultValue) {
++        start = CarbonUtil.getFirstIndexUsingBinarySearch(
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++            defaultValue, true);
++        if (start < 0) {
++          skip = -(start + 1);
++          // end of block
++          if (skip == numerOfRows) {
++            return bitSet;
++          }
++        } else {
++          skip = start;
++        }
++        startIndex = skip;
++      }
++      for (int k = 0; k < filterValues.length; k++) {
++        start = CarbonUtil.getFirstIndexUsingBinarySearch(
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++            filterValues[k], true);
++        if (start < 0) {
++          start = -(start + 1);
++          if (start == numerOfRows) {
++            start = start - 1;
++          }
++          // Method will compare the tentative index value after binary search, this tentative
++          // index needs to be compared by the filter member if its <= filter then from that
++          // index the bitset will be considered for filtering process.
++          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start))
++              <= 0) {
++            start = start - 1;
++          }
++        }
++        last = start;
++        for (int j = start; j >= skip; j--) {
++          bitSet.set(j);
++          last--;
++        }
++        startIndex = last;
++        if (startIndex <= 0) {
++          break;
++        }
++      }
++    }
++    return bitSet;
++  }
++
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index d608cc6,0000000..46e7d1b
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@@ -1,65 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
++import org.carbondata.core.carbon.datastore.block.SegmentProperties;
++import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
++import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
++import org.carbondata.core.carbon.metadata.encoder.Encoding;
++import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
++import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 +import org.carbondata.core.util.ByteUtil;
++import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.scan.expression.Expression;
++import org.carbondata.scan.expression.exception.FilterUnsupportedException;
++import org.carbondata.scan.filter.FilterUtil;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
++import org.carbondata.scan.processor.BlocksChunkHolder;
 +
 +public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
 +  private byte[][] filterRangeValues;
 +
 +  public RowLevelRangeLessThanFiterExecuterImpl(
 +      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
 +      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues) {
-     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier);
++      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
++      SegmentProperties segmentProperties) {
++    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties);
 +    this.filterRangeValues = filterRangeValues;
 +  }
 +
 +  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
 +    BitSet bitSet = new BitSet(1);
 +    byte[][] filterValues = this.filterRangeValues;
 +    int columnIndex = this.dimColEvaluatorInfoList.get(0).getColumnIndex();
 +    boolean isScanRequired = false;
 +    for (int k = 0; k < filterValues.length; k++) {
 +      // and filter-min should be positive
 +      int minCompare =
 +          ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue[columnIndex]);
 +
 +      // if any filter applied is not in range of min and max of block
 +      // then since its a less than fiter validate whether the block
 +      // min range is less  than applied filter member
 +      if (minCompare > 0) {
 +        isScanRequired = true;
 +        break;
 +      }
 +    }
 +    if (isScanRequired) {
 +      bitSet.set(0);
 +    }
 +    return bitSet;
 +
 +  }
++
++  @Override public BitSet applyFilter(BlocksChunkHolder blockChunkHolder)
++      throws FilterUnsupportedException {
++    if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
++      return super.applyFilter(blockChunkHolder);
++    }
++    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
++        .get(dimColEvaluatorInfoList.get(0).getColumnIndex());
++    if (null == blockChunkHolder.getDimensionDataChunk()[blockIndex]) {
++      blockChunkHolder.getDimensionDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
++          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
++    }
++    return getFilteredIndexes(blockChunkHolder.getDimensionDataChunk()[blockIndex],
++        blockChunkHolder.getDataBlock().nodeSize());
++  }
++
++  private BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows) {
++    byte[] defaultValue = null;
++    if (dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
++      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
++          .getDirectDictionaryGenerator(
++              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
++      int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
++      defaultValue = FilterUtil.getMaskKey(key, dimColEvaluatorInfoList.get(0).getDimension(),
++          this.segmentProperties.getDimensionKeyGenerator());
++    }
++    if (null != dimensionColumnDataChunk.getAttributes().getInvertedIndexes()
++        && dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      return setFilterdIndexToBitSetWithColumnIndex(
++          (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, numerOfRows, defaultValue);
++    }
++    return setFilterdIndexToBitSet(dimensionColumnDataChunk, numerOfRows, defaultValue);
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all members
++   * will be considered for applying range filters. this method will be called if the
++   * column is not supported by default so column index mapping  will be present for
++   * accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSetWithColumnIndex(
++      FixedLengthDimensionDataChunk dimensionColumnDataChunk, int numerOfRows,
++      byte[] defaultValue) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    int[] columnIndex = dimensionColumnDataChunk.getAttributes().getInvertedIndexes();
++    int start = 0;
++    int last = 0;
++    int startIndex = 0;
++    int skip = 0;
++    byte[][] filterValues = this.filterRangeValues;
++
++    //find the number of default values to skip the null value in case of direct dictionary
++    if (null != defaultValue) {
++      start = CarbonUtil
++          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++              defaultValue, false);
++      if (start < 0) {
++        skip = -(start + 1);
++        // end of block
++        if (skip == numerOfRows) {
++          return bitSet;
++        }
++      } else {
++        skip = start;
++      }
++      startIndex = skip;
++    }
++
++    for (int i = 0; i < filterValues.length; i++) {
++      start = CarbonUtil
++          .getFirstIndexUsingBinarySearch(dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++              filterValues[i], false);
++      // Logic will handle the case where the range filter member is not present in block
++      // in this case the binary search will return the index from where the bit sets will be
++      // set inorder to apply filters. this is Lesser than filter so the range will be taken
++      // from the prev element which is Lesser than filter member.
++      start = CarbonUtil.nextLesserValueToTarget(start, dimensionColumnDataChunk, filterValues[i]);
++      if (start < 0) {
++        start = -(start + 1);
++        if (start == numerOfRows) {
++          start = start - 1;
++        }
++        // Method will compare the tentative index value after binary search, this tentative
++        // index needs to be compared by the filter member if its < filter then from that
++        // index the bitset will be considered for filtering process.
++        if (ByteUtil
++            .compare(filterValues[i], dimensionColumnDataChunk.getChunkData(columnIndex[start]))
++            < 0) {
++          start = start - 1;
++        }
++      }
++      last = start;
++      for (int j = start; j >= skip; j--) {
++        bitSet.set(columnIndex[j]);
++        last--;
++      }
++      startIndex = last;
++      if (startIndex >= 0) {
++        break;
++      }
++    }
++    return bitSet;
++  }
++
++  /**
++   * Method will scan the block and finds the range start index from which all
++   * members will be considered for applying range filters. this method will
++   * be called if the column is sorted default so column index
++   * mapping will be present for accesing the members from the block.
++   *
++   * @param dimensionColumnDataChunk
++   * @param numerOfRows
++   * @return BitSet.
++   */
++  private BitSet setFilterdIndexToBitSet(DimensionColumnDataChunk dimensionColumnDataChunk,
++      int numerOfRows, byte[] defaultValue) {
++    BitSet bitSet = new BitSet(numerOfRows);
++    if (dimensionColumnDataChunk instanceof FixedLengthDimensionDataChunk) {
++      int start = 0;
++      int last = 0;
++      int startIndex = 0;
++      int skip = 0;
++      byte[][] filterValues = this.filterRangeValues;
++      //find the number of default values to skip the null value in case of direct dictionary
++      if (null != defaultValue) {
++        start = CarbonUtil.getFirstIndexUsingBinarySearch(
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++            defaultValue, false);
++        if (start < 0) {
++          skip = -(start + 1);
++          // end of block
++          if (skip == numerOfRows) {
++            return bitSet;
++          }
++        } else {
++          skip = start;
++        }
++        startIndex = skip;
++      }
++      for (int k = 0; k < filterValues.length; k++) {
++        start = CarbonUtil.getFirstIndexUsingBinarySearch(
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, startIndex, numerOfRows - 1,
++            filterValues[k], false);
++        start = CarbonUtil.nextLesserValueToTarget(start,
++            (FixedLengthDimensionDataChunk) dimensionColumnDataChunk, filterValues[k]);
++        if (start < 0) {
++          start = -(start + 1);
++          if (start >= numerOfRows) {
++            start = numerOfRows - 1;
++          }
++          // Method will compare the tentative index value after binary search, this tentative
++          // index needs to be compared by the filter member if its < filter then from that
++          // index the bitset will be considered for filtering process.
++          if (ByteUtil.compare(filterValues[k], dimensionColumnDataChunk.getChunkData(start)) < 0) {
++            start = start - 1;
++          }
++        }
++        last = start;
++        for (int j = start; j >= skip; j--) {
++          bitSet.set(j);
++          last--;
++        }
++        startIndex = last;
++        if (startIndex <= 0) {
++          break;
++        }
++      }
++    }
++    return bitSet;
++  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
index db2fa2d,0000000..e6eb6da
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
+++ b/core/src/main/java/org/carbondata/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
@@@ -1,90 -1,0 +1,93 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.executer;
 +
++import org.carbondata.core.carbon.datastore.block.SegmentProperties;
 +import org.carbondata.scan.filter.intf.FilterExecuterType;
 +import org.carbondata.scan.filter.resolver.FilterResolverIntf;
 +import org.carbondata.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
 +
 +public class RowLevelRangeTypeExecuterFacory {
 +
 +  private RowLevelRangeTypeExecuterFacory() {
 +
 +  }
 +
 +  /**
 +   * The method returns the Row Level Range fiter type instance based on
 +   * filter tree resolver type.
 +   *
 +   * @param filterExpressionResolverTree
++   * @param segmentProperties
 +   * @param dataType                     DataType
 +   * @return the generator instance
 +   */
 +  public static RowLevelFilterExecuterImpl getRowLevelRangeTypeExecuter(
-       FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree) {
++      FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree,
++      SegmentProperties segmentProperties) {
 +    switch (filterExecuterType) {
 +
 +      case ROWLEVEL_LESSTHAN:
 +        return new RowLevelRangeLessThanFiterExecuterImpl(
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getDimColEvaluatorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getMsrColEvalutorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                 .getFilterRangeValues());
++                .getFilterRangeValues(segmentProperties), segmentProperties);
 +      case ROWLEVEL_LESSTHAN_EQUALTO:
 +        return new RowLevelRangeLessThanEqualFilterExecuterImpl(
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getDimColEvaluatorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getMsrColEvalutorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                 .getFilterRangeValues());
++                .getFilterRangeValues(segmentProperties), segmentProperties);
 +      case ROWLEVEL_GREATERTHAN_EQUALTO:
 +        return new RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getDimColEvaluatorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getMsrColEvalutorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                 .getFilterRangeValues());
++                .getFilterRangeValues(segmentProperties), segmentProperties);
 +      case ROWLEVEL_GREATERTHAN:
 +        return new RowLevelRangeGrtThanFiterExecuterImpl(
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getDimColEvaluatorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
 +                .getMsrColEvalutorInfoList(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getFilterExpression(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
 +            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                 .getFilterRangeValues());
++                .getFilterRangeValues(segmentProperties), segmentProperties);
 +      default:
 +        // Scenario wont come logic must break
 +        return null;
 +
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/core/src/main/java/org/carbondata/scan/filter/resolver/AndFilterResolverImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/resolver/AndFilterResolverImpl.java
index 37685c8,0000000..cde5c43
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/resolver/AndFilterResolverImpl.java
+++ b/core/src/main/java/org/carbondata/scan/filter/resolver/AndFilterResolverImpl.java
@@@ -1,51 -1,0 +1,52 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter.resolver;
 +
 +import java.util.SortedMap;
 +
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 +import org.carbondata.core.carbon.datastore.block.SegmentProperties;
++import org.carbondata.scan.executor.exception.QueryExecutionException;
 +import org.carbondata.scan.filter.intf.ExpressionType;
 +
 +public class AndFilterResolverImpl extends LogicalFilterResolverImpl {
 +
 +  /**
-    *
++   *i
 +   */
 +  private static final long serialVersionUID = -761688076874662001L;
 +
 +  public AndFilterResolverImpl(FilterResolverIntf leftEvalutor, FilterResolverIntf rightEvalutor,
 +      ExpressionType filterExpressionType) {
 +    super(leftEvalutor, rightEvalutor, filterExpressionType);
 +  }
 +
 +  @Override public void getStartKey(SegmentProperties segmentProperties, long[] startKeys,
 +      SortedMap<Integer, byte[]> noDicStartKeys) {
 +    leftEvalutor.getStartKey(segmentProperties, startKeys, noDicStartKeys);
 +    rightEvalutor.getStartKey(segmentProperties, startKeys, noDicStartKeys);
 +  }
 +
 +  @Override public void getEndKey(SegmentProperties segmentProperties,
 +      AbsoluteTableIdentifier tableIdentifier, long[] endKeys,
-       SortedMap<Integer, byte[]> noDicEndKeys) {
++      SortedMap<Integer, byte[]> noDicEndKeys) throws QueryExecutionException {
 +    leftEvalutor.getEndKey(segmentProperties, tableIdentifier, endKeys, noDicEndKeys);
 +    rightEvalutor.getEndKey(segmentProperties, tableIdentifier, endKeys, noDicEndKeys);
 +  }
 +}