You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/04/05 13:55:58 UTC

carbondata git commit: [CARBONDATA-2277] Fix for filter of default values on all datatypes

Repository: carbondata
Updated Branches:
  refs/heads/master 280a4003a -> b52f1571d


[CARBONDATA-2277] Fix for filter of default values on all datatypes

1. Added solution to handle filter keys for the direct dictionary on default values.
2. For no dictionary columns changes code to get correct bytes value of default values.

This closes #2102


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b52f1571
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b52f1571
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b52f1571

Branch: refs/heads/master
Commit: b52f1571da554cb375b3fcd9b9fb6fff0fd7d9d6
Parents: 280a400
Author: Jatin <ja...@knoldus.in>
Authored: Sun Mar 25 13:45:24 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Apr 5 19:28:32 2018 +0530

----------------------------------------------------------------------
 .../scan/executor/util/RestructureUtil.java     |   2 +-
 .../carbondata/core/scan/filter/FilterUtil.java |  75 ++-
 .../executer/RestructureEvaluatorImpl.java      |  20 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |   7 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |   6 +-
 ...velRangeLessThanEqualFilterExecuterImpl.java |   7 +-
 ...RowLevelRangeLessThanFilterExecuterImpl.java | 499 +++++++++++++++++++
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 498 ------------------
 .../RowLevelRangeTypeExecuterFacory.java        |  98 ----
 .../RowLevelRangeTypeExecuterFactory.java       |  98 ++++
 .../RowLevelRangeFilterResolverImpl.java        |   2 +
 .../carbondata/core/util/DataTypeUtil.java      |   1 +
 .../AlterTableValidationTestCase.scala          | 109 +++-
 13 files changed, 791 insertions(+), 631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index d7247b2..9c1f2f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -194,7 +194,7 @@ public class RestructureUtil {
    * @param defaultValue
    * @return
    */
-  private static Object getDirectDictionaryDefaultValue(DataType dataType, byte[] defaultValue) {
+  public static Object getDirectDictionaryDefaultValue(DataType dataType, byte[] defaultValue) {
     Object directDictionaryDefaultValue = null;
     if (!isDefaultValueNull(defaultValue)) {
       DirectDictionaryGenerator directDictionaryGenerator =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index a397355..c91eb2e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -88,7 +89,7 @@ import org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterI
 import org.apache.carbondata.core.scan.filter.executer.RestructureExcludeFilterExecutorImpl;
 import org.apache.carbondata.core.scan.filter.executer.RestructureIncludeFilterExecutorImpl;
 import org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl;
-import org.apache.carbondata.core.scan.filter.executer.RowLevelRangeTypeExecuterFacory;
+import org.apache.carbondata.core.scan.filter.executer.RowLevelRangeTypeExecuterFactory;
 import org.apache.carbondata.core.scan.filter.executer.TrueFilterExecutor;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
@@ -167,7 +168,7 @@ public final class FilterUtil {
         case ROWLEVEL_LESSTHAN_EQUALTO:
         case ROWLEVEL_GREATERTHAN_EQUALTO:
         case ROWLEVEL_GREATERTHAN:
-          return RowLevelRangeTypeExecuterFacory
+          return RowLevelRangeTypeExecuterFactory
               .getRowLevelRangeTypeExecuter(filterExecuterType, filterExpressionResolverTree,
                   segmentProperties);
         case RANGE:
@@ -834,27 +835,9 @@ public final class FilterUtil {
     return columnFilterInfo;
   }
 
-  /**
-   * Below method will be used to covert the filter surrogate keys
-   * to mdkey
-   *
-   * @param columnFilterInfo
-   * @param carbonDimension
-   * @param segmentProperties
-   * @return
-   */
-  public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo,
-      CarbonDimension carbonDimension, SegmentProperties segmentProperties,  boolean isExclude) {
-    if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
-      return columnFilterInfo.getNoDictionaryFilterValuesList()
-          .toArray((new byte[columnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
-    }
-    KeyGenerator blockLevelKeyGenerator = segmentProperties.getDimensionKeyGenerator();
-    int[] dimColumnsCardinality = segmentProperties.getDimColumnsCardinality();
-    int[] keys = new int[blockLevelKeyGenerator.getDimCount()];
-    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
-    Arrays.fill(keys, 0);
-    int keyOrdinalOfDimensionFromCurrentBlock = carbonDimension.getKeyOrdinal();
+  private static byte[][] getFilterValuesInBytes(ColumnFilterInfo columnFilterInfo,
+      boolean isExclude, KeyGenerator blockLevelKeyGenerator, int[] dimColumnsCardinality,
+      int[] keys, List<byte[]> filterValuesList, int keyOrdinalOfDimensionFromCurrentBlock) {
     if (null != columnFilterInfo) {
       int[] rangesForMaskedByte =
           getRangesForMaskedByte(keyOrdinalOfDimensionFromCurrentBlock, blockLevelKeyGenerator);
@@ -881,7 +864,53 @@ public final class FilterUtil {
       }
     }
     return filterValuesList.toArray(new byte[filterValuesList.size()][]);
+  }
+
+  /**
+   * This method will be used to get the Filter key array list for blocks which do not contain
+   * filter column and the column Encoding is Direct Dictionary
+   *
+   * @param columnFilterInfo
+   * @param isExclude
+   * @return
+   */
+  public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo, boolean isExclude) {
+    int[] dimColumnsCardinality = new int[] { Integer.MAX_VALUE };
+    int[] dimensionBitLength =
+        CarbonUtil.getDimensionBitLength(dimColumnsCardinality, new int[] { 1 });
+    KeyGenerator blockLevelKeyGenerator = new MultiDimKeyVarLengthGenerator(dimensionBitLength);
+    int[] keys = new int[blockLevelKeyGenerator.getDimCount()];
+    Arrays.fill(keys, 0);
+    int keyOrdinalOfDimensionFromCurrentBlock = 0;
+    List<byte[]> filterValuesList =
+        new ArrayList<byte[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    return getFilterValuesInBytes(columnFilterInfo, isExclude, blockLevelKeyGenerator,
+        dimColumnsCardinality, keys, filterValuesList, keyOrdinalOfDimensionFromCurrentBlock);
+  }
 
+  /**
+   * Below method will be used to covert the filter surrogate keys
+   * to mdkey
+   *
+   * @param columnFilterInfo
+   * @param carbonDimension
+   * @param segmentProperties
+   * @return
+   */
+  public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo,
+      CarbonDimension carbonDimension, SegmentProperties segmentProperties,  boolean isExclude) {
+    if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+      return columnFilterInfo.getNoDictionaryFilterValuesList()
+          .toArray((new byte[columnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
+    }
+    KeyGenerator blockLevelKeyGenerator = segmentProperties.getDimensionKeyGenerator();
+    int[] dimColumnsCardinality = segmentProperties.getDimColumnsCardinality();
+    int[] keys = new int[blockLevelKeyGenerator.getDimCount()];
+    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    Arrays.fill(keys, 0);
+    int keyOrdinalOfDimensionFromCurrentBlock = carbonDimension.getKeyOrdinal();
+    return getFilterValuesInBytes(columnFilterInfo, isExclude, blockLevelKeyGenerator,
+        dimColumnsCardinality, keys, filterValuesList, keyOrdinalOfDimensionFromCurrentBlock);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
index f3c066a..e1432b0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
@@ -21,15 +21,17 @@ import java.nio.charset.Charset;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.filter.ColumnFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.comparator.Comparator;
 import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
@@ -74,7 +76,17 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
       // 3 cases: is NUll, is Not Null and filter on default value of newly added column
       int defaultSurrogateValueToCompare = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
       if (null != defaultValue) {
-        defaultSurrogateValueToCompare++;
+        if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+              .getDirectDictionaryGenerator(dimension.getDataType());
+          if (directDictionaryGenerator != null) {
+            String value =
+                new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+            defaultSurrogateValueToCompare = Integer.parseInt(value);
+          }
+        } else {
+          defaultSurrogateValueToCompare++;
+        }
       }
       List<Integer> filterList = null;
       if (filterValues.isIncludeFilter() && !filterValues.isOptimized()) {
@@ -109,8 +121,8 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
     Object defaultValue = null;
     if (null != measure.getDefaultValue()) {
       // default value for case where user gives is Null condition
-      defaultValue = DataTypeUtil
-          .getMeasureObjectFromDataType(measure.getDefaultValue(), measure.getDataType());
+      defaultValue = RestructureUtil
+          .getMeasureDefaultValue(measure.getColumnSchema(), measure.getDefaultValue());
     }
     List<Object> measureFilterValuesList = filterValues.getMeasuresFilterValuesList();
     for (Object filterValue : measureFilterValuesList) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index e35bb8a..be871d4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
@@ -66,7 +67,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
-    if (isMeasurePresentInCurrentBlock[0]) {
+    if (!msrColEvalutorInfoList.isEmpty()) {
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
     }
@@ -100,7 +101,9 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
       if (null != defaultValue) {
         for (int k = 0; k < msrFilterRangeValues.length; k++) {
           int maxCompare = comparator.compare(msrFilterRangeValues[k],
-              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType()));
+              RestructureUtil.getMeasureDefaultValue(measure.getColumnSchema(),
+                  measure.getDefaultValue()));
+
           if (maxCompare < 0) {
             isDefaultValuePresentInFilter = true;
             break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index d48abf9..a3359be 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
@@ -66,7 +67,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
-    if (isMeasurePresentInCurrentBlock[0]) {
+    if (!msrColEvalutorInfoList.isEmpty()) {
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
     }
@@ -100,7 +101,8 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
       if (null != defaultValue) {
         for (int k = 0; k < msrFilterRangeValues.length; k++) {
           int maxCompare = comparator.compare(msrFilterRangeValues[k],
-              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType()));
+              RestructureUtil.getMeasureDefaultValue(measure.getColumnSchema(),
+                  measure.getDefaultValue()));
           if (maxCompare <= 0) {
             isDefaultValuePresentInFilter = true;
             break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index d89a488..0c268c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
@@ -68,7 +69,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
-    if (isMeasurePresentInCurrentBlock[0]) {
+    if (!msrColEvalutorInfoList.isEmpty()) {
       CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
       comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
     }
@@ -102,7 +103,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
       if (null != defaultValue) {
         for (int k = 0; k < msrFilterRangeValues.length; k++) {
           int maxCompare = comparator.compare(msrFilterRangeValues[k],
-              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType()));
+              RestructureUtil.getMeasureDefaultValue(measure.getColumnSchema(),
+                  measure.getDefaultValue()));
+
           if (maxCompare >= 0) {
             isDefaultValuePresentInFilter = true;
             break;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
new file mode 100644
index 0000000..c5ed77d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
+import org.apache.carbondata.core.util.BitSetGroup;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
+
+public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecuterImpl {
+  private byte[][] filterRangeValues;
+  private Object[] msrFilterRangeValues;
+  private SerializableComparator comparator;
+
+  /**
+   * flag to check whether default values is present in the filter value list
+   */
+  private boolean isDefaultValuePresentInFilter;
+  private int lastDimensionColOrdinal = 0;
+  public RowLevelRangeLessThanFilterExecuterImpl(
+      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
+      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
+      Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
+        null);
+    this.filterRangeValues = filterRangeValues;
+    this.msrFilterRangeValues = msrFilterRangeValues;
+    lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
+    if (!msrColEvalutorInfoList.isEmpty()) {
+      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
+      comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
+    }
+    ifDefaultValueMatchesFilter();
+    if (isDimensionPresentInCurrentBlock[0]) {
+      isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
+          && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
+    }
+  }
+
+  /**
+   * This method will check whether default value is present in the given filter values
+   */
+  private void ifDefaultValueMatchesFilter() {
+    if (!dimColEvaluatorInfoList.isEmpty() && !isDimensionPresentInCurrentBlock[0]) {
+      CarbonDimension dimension = this.dimColEvaluatorInfoList.get(0).getDimension();
+      byte[] defaultValue = dimension.getDefaultValue();
+      if (null != defaultValue) {
+        for (int k = 0; k < filterRangeValues.length; k++) {
+          int maxCompare =
+              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
+          if (maxCompare > 0) {
+            isDefaultValuePresentInFilter = true;
+            break;
+          }
+        }
+      }
+    } else if (!msrColEvalutorInfoList.isEmpty() && !isMeasurePresentInCurrentBlock[0]) {
+      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
+      byte[] defaultValue = measure.getDefaultValue();
+      if (null != defaultValue) {
+        for (int k = 0; k < msrFilterRangeValues.length; k++) {
+          Object convertedValue = RestructureUtil
+              .getMeasureDefaultValue(measure.getColumnSchema(), measure.getDefaultValue());
+          int maxCompare =
+              comparator.compare(msrFilterRangeValues[k], convertedValue);
+          if (maxCompare > 0) {
+            isDefaultValuePresentInFilter = true;
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
+    BitSet bitSet = new BitSet(1);
+    byte[] minValue = null;
+    boolean isScanRequired = false;
+    if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) {
+      if (isMeasurePresentInCurrentBlock[0]) {
+        minValue = blockMinValue[measureChunkIndex[0] + lastDimensionColOrdinal];
+        isScanRequired =
+            isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
+      } else {
+        minValue = blockMinValue[dimensionChunkIndex[0]];
+        isScanRequired = isScanRequired(minValue, filterRangeValues);
+      }
+    } else {
+      isScanRequired = isDefaultValuePresentInFilter;
+    }
+    if (isScanRequired) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+
+
+  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues) {
+    boolean isScanRequired = false;
+    for (int k = 0; k < filterValues.length; k++) {
+      // and filter-min should be positive
+      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue);
+
+      // if any filter applied is not in range of min and max of block
+      // then since its a less than equal to fiter validate whether the block
+      // min range is less than equal to applied filter member
+      if (minCompare > 0) {
+        isScanRequired = true;
+        break;
+      }
+    }
+    return isScanRequired;
+  }
+
+  private boolean isScanRequired(byte[] minValue, Object[] filterValue,
+      DataType dataType) {
+    Object value = DataTypeUtil.getMeasureObjectFromDataType(minValue, dataType);
+    for (int i = 0; i < filterValue.length; i++) {
+      // TODO handle min and max for null values.
+      if (filterValue[i] == null) {
+        return true;
+      }
+      if (comparator.compare(filterValue[i], value) > 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public BitSetGroup applyFilter(RawBlockletColumnChunks rawBlockletColumnChunks,
+      boolean useBitsetPipeLine) throws IOException {
+    // select all rows if dimension does not exists in the current block
+    if (!isDimensionPresentInCurrentBlock[0] && !isMeasurePresentInCurrentBlock[0]) {
+      int numberOfRows = rawBlockletColumnChunks.getDataBlock().numRows();
+      return FilterUtil
+          .createBitSetGroupWithDefaultValue(rawBlockletColumnChunks.getDataBlock().numberOfPages(),
+              numberOfRows, true);
+    }
+    if (isDimensionPresentInCurrentBlock[0]) {
+      int chunkIndex =
+          segmentProperties.getDimensionOrdinalToChunkMapping().get(dimensionChunkIndex[0]);
+      if (null == rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex]) {
+        rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex] =
+            rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
+                rawBlockletColumnChunks.getFileReader(), chunkIndex);
+      }
+      DimensionRawColumnChunk rawColumnChunk =
+          rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex];
+      BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+      for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+        if (rawColumnChunk.getMinValues() != null) {
+          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
+            BitSet bitSet = getFilteredIndexes(rawColumnChunk.decodeColumnPage(i),
+                rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        } else {
+          BitSet bitSet = getFilteredIndexes(rawColumnChunk.decodeColumnPage(i),
+              rawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
+        }
+      }
+      return bitSetGroup;
+    } else {
+      int chunkIndex =
+          segmentProperties.getMeasuresOrdinalToChunkMapping().get(measureChunkIndex[0]);
+      if (null == rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
+        rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex] =
+            rawBlockletColumnChunks.getDataBlock().readMeasureChunk(
+                rawBlockletColumnChunks.getFileReader(), chunkIndex);
+      }
+      MeasureRawColumnChunk rawColumnChunk =
+          rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex];
+      BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
+      for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
+        if (rawColumnChunk.getMinValues() != null) {
+          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.msrFilterRangeValues,
+              msrColEvalutorInfoList.get(0).getType())) {
+            BitSet bitSet =
+                getFilteredIndexesForMeasures(rawColumnChunk.decodeColumnPage(i),
+                    rawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        } else {
+          BitSet bitSet =
+              getFilteredIndexesForMeasures(rawColumnChunk.decodeColumnPage(i),
+                  rawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
+        }
+      }
+      return bitSetGroup;
+    }
+  }
+
+  @Override
+  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+      throws FilterUnsupportedException, IOException {
+    if (isDimensionPresentInCurrentBlock[0]) {
+      byte[] col =
+          (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
+      return ByteUtil.compare(filterRangeValues[0], col) > 0;
+    }
+
+    if (isMeasurePresentInCurrentBlock[0]) {
+      Object col =
+          value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
+      return comparator.compare(msrFilterRangeValues[0], col) > 0;
+    }
+    return false;
+  }
+
+  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
+      int numerOfRows) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    Object[] filterValues = this.msrFilterRangeValues;
+    DataType msrType = msrColEvalutorInfoList.get(0).getType();
+    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
+    BitSet nullBitSet = columnPage.getNullBits();
+    for (int i = 0; i < filterValues.length; i++) {
+      if (filterValues[i] == null) {
+        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+          bitSet.set(j);
+        }
+        continue;
+      }
+      for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
+        if (!nullBitSet.get(startIndex)) {
+          Object msrValue = DataTypeUtil
+              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
+                  msrType, msrColEvalutorInfoList.get(0).getMeasure());
+
+          if (comparator.compare(msrValue, filterValues[i]) < 0) {
+            // This is a match.
+            bitSet.set(startIndex);
+          }
+        }
+      }
+    }
+    return bitSet;
+  }
+
+  private BitSet getFilteredIndexes(DimensionColumnPage dimensionColumnPage,
+      int numerOfRows) {
+    byte[] defaultValue = null;
+    if (dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(
+              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(null);
+      CarbonDimension currentBlockDimension =
+          segmentProperties.getDimensions().get(dimensionChunkIndex[0]);
+      if (currentBlockDimension.isSortColumn()) {
+        defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
+            this.segmentProperties.getSortColumnsGenerator());
+      } else {
+        defaultValue = ByteUtil.toBytes(key);
+      }
+    } else if (dimColEvaluatorInfoList.get(0).getDimension().getDataType() != DataTypes.STRING) {
+      defaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+    }
+    BitSet bitSet = null;
+    if (dimensionColumnPage.isExplicitSorted()) {
+      bitSet = setFilterdIndexToBitSetWithColumnIndex(dimensionColumnPage, numerOfRows,
+          defaultValue);
+    } else {
+      bitSet = setFilterdIndexToBitSet(dimensionColumnPage, numerOfRows, defaultValue);
+    }
+    if (dimensionColumnPage.isNoDicitionaryColumn()) {
+      FilterUtil.removeNullValues(dimensionColumnPage, bitSet,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all members
+   * will be considered for applying range filters. this method will be called if the
+   * column is not supported by default so column index mapping  will be present for
+   * accesing the members from the block.
+   *
+   * @param dimensionColumnPage
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSetWithColumnIndex(
+      DimensionColumnPage dimensionColumnPage, int numerOfRows,
+      byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    int start = 0;
+    int last = 0;
+    int startIndex = 0;
+    int skip = 0;
+    byte[][] filterValues = this.filterRangeValues;
+
+    //find the number of default values to skip the null value in case of direct dictionary
+    if (null != defaultValue) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
+              defaultValue, true);
+      if (start < 0) {
+        skip = -(start + 1);
+        // end of block
+        if (skip == numerOfRows) {
+          return bitSet;
+        }
+      } else {
+        // as start will be last index of null value inclusive
+        // so adding 1 to skip last null value
+        skip = start + 1;
+      }
+      startIndex = skip;
+    }
+
+    for (int i = 0; i < filterValues.length; i++) {
+      start = CarbonUtil
+          .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
+              filterValues[i], false);
+      if (start >= 0) {
+        // Logic will handle the case where the range filter member is not present in block
+        // in this case the binary search will return the index from where the bit sets will be
+        // set inorder to apply filters. this is Lesser than filter so the range will be taken
+        // from the prev element which is Lesser than filter member.
+        start =
+            CarbonUtil.nextLesserValueToTarget(start, dimensionColumnPage, filterValues[i]);
+      }
+      if (start < 0) {
+        start = -(start + 1);
+        if (start >= numerOfRows) {
+          start = start - 1;
+        }
+        // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
+        // will be pointing to the next consecutive position. So compare it again and point to the
+        // previous value returned from getFirstIndexUsingBinarySearch.
+        if (ByteUtil.compare(filterValues[i],
+            dimensionColumnPage.getChunkData(dimensionColumnPage.getInvertedIndex(start)))
+            < 0) {
+          start = start - 1;
+        }
+      }
+      last = start;
+      for (int j = start; j >= skip; j--) {
+        bitSet.set(dimensionColumnPage.getInvertedIndex(j));
+        last--;
+      }
+      startIndex = last;
+      if (startIndex >= 0) {
+        break;
+      }
+    }
+    return bitSet;
+  }
+
+  /**
+   * Method will scan the block and finds the range start index from which all
+   * members will be considered for applying range filters. this method will
+   * be called if the column is sorted default so column index
+   * mapping will be present for accesing the members from the block.
+   *
+   * @param dimensionColumnPage
+   * @param numerOfRows
+   * @return BitSet.
+   */
+  private BitSet setFilterdIndexToBitSet(DimensionColumnPage dimensionColumnPage,
+      int numerOfRows, byte[] defaultValue) {
+    BitSet bitSet = new BitSet(numerOfRows);
+    byte[][] filterValues = this.filterRangeValues;
+    // binary search can only be applied if column is sorted
+    if (isNaturalSorted) {
+      int start = 0;
+      int last = 0;
+      int startIndex = 0;
+      int skip = 0;
+      //find the number of default values to skip the null value in case of direct dictionary
+      if (null != defaultValue) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex,
+                numerOfRows - 1, defaultValue, true);
+        if (start < 0) {
+          skip = -(start + 1);
+          // end of block
+          if (skip == numerOfRows) {
+            return bitSet;
+          }
+        } else {
+          // as start will be last index of null value inclusive
+          // so adding 1 to skip last null value
+          skip = start + 1;
+        }
+        startIndex = skip;
+      }
+      for (int k = 0; k < filterValues.length; k++) {
+        start = CarbonUtil
+            .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex,
+                numerOfRows - 1, filterValues[k], false);
+        if (start >= 0) {
+          start =
+              CarbonUtil.nextLesserValueToTarget(start, dimensionColumnPage, filterValues[k]);
+        }
+        if (start < 0) {
+          start = -(start + 1);
+
+          if (start >= numerOfRows) {
+            start = numerOfRows - 1;
+          }
+          // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
+          // will be pointing to the next consecutive position. So compare it again and point to the
+          // previous value returned from getFirstIndexUsingBinarySearch.
+          if (ByteUtil.compare(filterValues[k], dimensionColumnPage.getChunkData(start)) < 0) {
+            start = start - 1;
+          }
+        }
+        last = start;
+        for (int j = start; j >= skip; j--) {
+          bitSet.set(j);
+          last--;
+        }
+        startIndex = last;
+        if (startIndex <= 0) {
+          break;
+        }
+      }
+    } else {
+      for (int k = 0; k < filterValues.length; k++) {
+        for (int i = 0; i < numerOfRows; i++) {
+          if (ByteUtil.compare(dimensionColumnPage.getChunkData(i), filterValues[k]) < 0) {
+            bitSet.set(i);
+          }
+        }
+      }
+    }
+    return bitSet;
+  }
+
+  @Override
+  public void readColumnChunks(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException {
+    if (isDimensionPresentInCurrentBlock[0]) {
+      if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        super.readColumnChunks(rawBlockletColumnChunks);
+      }
+      int chunkIndex = dimensionChunkIndex[0];
+      if (null == rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex]) {
+        rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex] =
+            rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
+                rawBlockletColumnChunks.getFileReader(), chunkIndex);
+      }
+    } else if (isMeasurePresentInCurrentBlock[0]) {
+      int chunkIndex = measureChunkIndex[0];
+      if (null == rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
+        rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex] =
+            rawBlockletColumnChunks.getDataBlock().readMeasureChunk(
+                rawBlockletColumnChunks.getFileReader(), chunkIndex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
deleted file mode 100644
index a00c7db..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.filter.executer;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
-import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
-import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
-import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
-import org.apache.carbondata.core.util.BitSetGroup;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.comparator.Comparator;
-import org.apache.carbondata.core.util.comparator.SerializableComparator;
-
-public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecuterImpl {
-  private byte[][] filterRangeValues;
-  private Object[] msrFilterRangeValues;
-  private SerializableComparator comparator;
-
-  /**
-   * flag to check whether default values is present in the filter value list
-   */
-  private boolean isDefaultValuePresentInFilter;
-  private int lastDimensionColOrdinal = 0;
-  public RowLevelRangeLessThanFiterExecuterImpl(
-      List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
-      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp,
-      AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
-      Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
-    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, tableIdentifier, segmentProperties,
-        null);
-    this.filterRangeValues = filterRangeValues;
-    this.msrFilterRangeValues = msrFilterRangeValues;
-    lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal();
-    if (isMeasurePresentInCurrentBlock[0]) {
-      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
-      comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType());
-    }
-    ifDefaultValueMatchesFilter();
-    if (isDimensionPresentInCurrentBlock[0]) {
-      isNaturalSorted = dimColEvaluatorInfoList.get(0).getDimension().isUseInvertedIndex()
-          && dimColEvaluatorInfoList.get(0).getDimension().isSortColumn();
-    }
-  }
-
-  /**
-   * This method will check whether default value is present in the given filter values
-   */
-  private void ifDefaultValueMatchesFilter() {
-    if (!dimColEvaluatorInfoList.isEmpty() && !isDimensionPresentInCurrentBlock[0]) {
-      CarbonDimension dimension = this.dimColEvaluatorInfoList.get(0).getDimension();
-      byte[] defaultValue = dimension.getDefaultValue();
-      if (null != defaultValue) {
-        for (int k = 0; k < filterRangeValues.length; k++) {
-          int maxCompare =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterRangeValues[k], defaultValue);
-          if (maxCompare > 0) {
-            isDefaultValuePresentInFilter = true;
-            break;
-          }
-        }
-      }
-    } else if (!msrColEvalutorInfoList.isEmpty() && !isMeasurePresentInCurrentBlock[0]) {
-      CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure();
-      byte[] defaultValue = measure.getDefaultValue();
-      if (null != defaultValue) {
-        for (int k = 0; k < msrFilterRangeValues.length; k++) {
-          Object convertedValue =
-              DataTypeUtil.getMeasureObjectFromDataType(defaultValue, measure.getDataType());
-          int maxCompare =
-              comparator.compare(msrFilterRangeValues[k], convertedValue);
-          if (maxCompare > 0) {
-            isDefaultValuePresentInFilter = true;
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
-    BitSet bitSet = new BitSet(1);
-    byte[] minValue = null;
-    boolean isScanRequired = false;
-    if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) {
-      if (isMeasurePresentInCurrentBlock[0]) {
-        minValue = blockMinValue[measureChunkIndex[0] + lastDimensionColOrdinal];
-        isScanRequired =
-            isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType());
-      } else {
-        minValue = blockMinValue[dimensionChunkIndex[0]];
-        isScanRequired = isScanRequired(minValue, filterRangeValues);
-      }
-    } else {
-      isScanRequired = isDefaultValuePresentInFilter;
-    }
-    if (isScanRequired) {
-      bitSet.set(0);
-    }
-    return bitSet;
-  }
-
-
-  private boolean isScanRequired(byte[] blockMinValue, byte[][] filterValues) {
-    boolean isScanRequired = false;
-    for (int k = 0; k < filterValues.length; k++) {
-      // and filter-min should be positive
-      int minCompare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterValues[k], blockMinValue);
-
-      // if any filter applied is not in range of min and max of block
-      // then since its a less than equal to fiter validate whether the block
-      // min range is less than equal to applied filter member
-      if (minCompare > 0) {
-        isScanRequired = true;
-        break;
-      }
-    }
-    return isScanRequired;
-  }
-
-  private boolean isScanRequired(byte[] minValue, Object[] filterValue,
-      DataType dataType) {
-    Object value = DataTypeUtil.getMeasureObjectFromDataType(minValue, dataType);
-    for (int i = 0; i < filterValue.length; i++) {
-      // TODO handle min and max for null values.
-      if (filterValue[i] == null) {
-        return true;
-      }
-      if (comparator.compare(filterValue[i], value) > 0) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public BitSetGroup applyFilter(RawBlockletColumnChunks rawBlockletColumnChunks,
-      boolean useBitsetPipeLine) throws IOException {
-    // select all rows if dimension does not exists in the current block
-    if (!isDimensionPresentInCurrentBlock[0] && !isMeasurePresentInCurrentBlock[0]) {
-      int numberOfRows = rawBlockletColumnChunks.getDataBlock().numRows();
-      return FilterUtil
-          .createBitSetGroupWithDefaultValue(rawBlockletColumnChunks.getDataBlock().numberOfPages(),
-              numberOfRows, true);
-    }
-    if (isDimensionPresentInCurrentBlock[0]) {
-      int chunkIndex =
-          segmentProperties.getDimensionOrdinalToChunkMapping().get(dimensionChunkIndex[0]);
-      if (null == rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex]) {
-        rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex] =
-            rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
-                rawBlockletColumnChunks.getFileReader(), chunkIndex);
-      }
-      DimensionRawColumnChunk rawColumnChunk =
-          rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex];
-      BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
-      for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
-        if (rawColumnChunk.getMinValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
-            BitSet bitSet = getFilteredIndexes(rawColumnChunk.decodeColumnPage(i),
-                rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          }
-        } else {
-          BitSet bitSet = getFilteredIndexes(rawColumnChunk.decodeColumnPage(i),
-              rawColumnChunk.getRowCount()[i]);
-          bitSetGroup.setBitSet(bitSet, i);
-        }
-      }
-      return bitSetGroup;
-    } else {
-      int chunkIndex =
-          segmentProperties.getMeasuresOrdinalToChunkMapping().get(measureChunkIndex[0]);
-      if (null == rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
-        rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex] =
-            rawBlockletColumnChunks.getDataBlock().readMeasureChunk(
-                rawBlockletColumnChunks.getFileReader(), chunkIndex);
-      }
-      MeasureRawColumnChunk rawColumnChunk =
-          rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex];
-      BitSetGroup bitSetGroup = new BitSetGroup(rawColumnChunk.getPagesCount());
-      for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
-        if (rawColumnChunk.getMinValues() != null) {
-          if (isScanRequired(rawColumnChunk.getMinValues()[i], this.msrFilterRangeValues,
-              msrColEvalutorInfoList.get(0).getType())) {
-            BitSet bitSet =
-                getFilteredIndexesForMeasures(rawColumnChunk.decodeColumnPage(i),
-                    rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          }
-        } else {
-          BitSet bitSet =
-              getFilteredIndexesForMeasures(rawColumnChunk.decodeColumnPage(i),
-                  rawColumnChunk.getRowCount()[i]);
-          bitSetGroup.setBitSet(bitSet, i);
-        }
-      }
-      return bitSetGroup;
-    }
-  }
-
-  @Override
-  public boolean applyFilter(RowIntf value, int dimOrdinalMax)
-      throws FilterUnsupportedException, IOException {
-    if (isDimensionPresentInCurrentBlock[0]) {
-      byte[] col =
-          (byte[]) value.getVal(dimColEvaluatorInfoList.get(0).getDimension().getOrdinal());
-      return ByteUtil.compare(filterRangeValues[0], col) > 0;
-    }
-
-    if (isMeasurePresentInCurrentBlock[0]) {
-      Object col =
-          value.getVal(msrColEvalutorInfoList.get(0).getMeasure().getOrdinal() + dimOrdinalMax);
-      return comparator.compare(msrFilterRangeValues[0], col) > 0;
-    }
-    return false;
-  }
-
-  private BitSet getFilteredIndexesForMeasures(ColumnPage columnPage,
-      int numerOfRows) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    Object[] filterValues = this.msrFilterRangeValues;
-    DataType msrType = msrColEvalutorInfoList.get(0).getType();
-    SerializableComparator comparator = Comparator.getComparatorByDataTypeForMeasure(msrType);
-    BitSet nullBitSet = columnPage.getNullBits();
-    for (int i = 0; i < filterValues.length; i++) {
-      if (filterValues[i] == null) {
-        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
-          bitSet.set(j);
-        }
-        continue;
-      }
-      for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
-        if (!nullBitSet.get(startIndex)) {
-          Object msrValue = DataTypeUtil
-              .getMeasureObjectBasedOnDataType(columnPage, startIndex,
-                  msrType, msrColEvalutorInfoList.get(0).getMeasure());
-
-          if (comparator.compare(msrValue, filterValues[i]) < 0) {
-            // This is a match.
-            bitSet.set(startIndex);
-          }
-        }
-      }
-    }
-    return bitSet;
-  }
-
-  private BitSet getFilteredIndexes(DimensionColumnPage dimensionColumnPage,
-      int numerOfRows) {
-    byte[] defaultValue = null;
-    if (dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-          .getDirectDictionaryGenerator(
-              dimColEvaluatorInfoList.get(0).getDimension().getDataType());
-      int key = directDictionaryGenerator.generateDirectSurrogateKey(null);
-      CarbonDimension currentBlockDimension =
-          segmentProperties.getDimensions().get(dimensionChunkIndex[0]);
-      if (currentBlockDimension.isSortColumn()) {
-        defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
-            this.segmentProperties.getSortColumnsGenerator());
-      } else {
-        defaultValue = ByteUtil.toBytes(key);
-      }
-    } else if (dimColEvaluatorInfoList.get(0).getDimension().getDataType() != DataTypes.STRING) {
-      defaultValue = CarbonCommonConstants.EMPTY_BYTE_ARRAY;
-    }
-    BitSet bitSet = null;
-    if (dimensionColumnPage.isExplicitSorted()) {
-      bitSet = setFilterdIndexToBitSetWithColumnIndex(dimensionColumnPage, numerOfRows,
-          defaultValue);
-    } else {
-      bitSet = setFilterdIndexToBitSet(dimensionColumnPage, numerOfRows, defaultValue);
-    }
-    if (dimensionColumnPage.isNoDicitionaryColumn()) {
-      FilterUtil.removeNullValues(dimensionColumnPage, bitSet,
-          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
-    }
-    return bitSet;
-  }
-
-  /**
-   * Method will scan the block and finds the range start index from which all members
-   * will be considered for applying range filters. this method will be called if the
-   * column is not supported by default so column index mapping  will be present for
-   * accesing the members from the block.
-   *
-   * @param dimensionColumnPage
-   * @param numerOfRows
-   * @return BitSet.
-   */
-  private BitSet setFilterdIndexToBitSetWithColumnIndex(
-      DimensionColumnPage dimensionColumnPage, int numerOfRows,
-      byte[] defaultValue) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    int start = 0;
-    int last = 0;
-    int startIndex = 0;
-    int skip = 0;
-    byte[][] filterValues = this.filterRangeValues;
-
-    //find the number of default values to skip the null value in case of direct dictionary
-    if (null != defaultValue) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
-              defaultValue, true);
-      if (start < 0) {
-        skip = -(start + 1);
-        // end of block
-        if (skip == numerOfRows) {
-          return bitSet;
-        }
-      } else {
-        // as start will be last index of null value inclusive
-        // so adding 1 to skip last null value
-        skip = start + 1;
-      }
-      startIndex = skip;
-    }
-
-    for (int i = 0; i < filterValues.length; i++) {
-      start = CarbonUtil
-          .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex, numerOfRows - 1,
-              filterValues[i], false);
-      if (start >= 0) {
-        // Logic will handle the case where the range filter member is not present in block
-        // in this case the binary search will return the index from where the bit sets will be
-        // set inorder to apply filters. this is Lesser than filter so the range will be taken
-        // from the prev element which is Lesser than filter member.
-        start =
-            CarbonUtil.nextLesserValueToTarget(start, dimensionColumnPage, filterValues[i]);
-      }
-      if (start < 0) {
-        start = -(start + 1);
-        if (start >= numerOfRows) {
-          start = start - 1;
-        }
-        // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
-        // will be pointing to the next consecutive position. So compare it again and point to the
-        // previous value returned from getFirstIndexUsingBinarySearch.
-        if (ByteUtil.compare(filterValues[i],
-            dimensionColumnPage.getChunkData(dimensionColumnPage.getInvertedIndex(start)))
-            < 0) {
-          start = start - 1;
-        }
-      }
-      last = start;
-      for (int j = start; j >= skip; j--) {
-        bitSet.set(dimensionColumnPage.getInvertedIndex(j));
-        last--;
-      }
-      startIndex = last;
-      if (startIndex >= 0) {
-        break;
-      }
-    }
-    return bitSet;
-  }
-
-  /**
-   * Method will scan the block and finds the range start index from which all
-   * members will be considered for applying range filters. this method will
-   * be called if the column is sorted default so column index
-   * mapping will be present for accesing the members from the block.
-   *
-   * @param dimensionColumnPage
-   * @param numerOfRows
-   * @return BitSet.
-   */
-  private BitSet setFilterdIndexToBitSet(DimensionColumnPage dimensionColumnPage,
-      int numerOfRows, byte[] defaultValue) {
-    BitSet bitSet = new BitSet(numerOfRows);
-    byte[][] filterValues = this.filterRangeValues;
-    // binary search can only be applied if column is sorted
-    if (isNaturalSorted) {
-      int start = 0;
-      int last = 0;
-      int startIndex = 0;
-      int skip = 0;
-      //find the number of default values to skip the null value in case of direct dictionary
-      if (null != defaultValue) {
-        start = CarbonUtil
-            .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex,
-                numerOfRows - 1, defaultValue, true);
-        if (start < 0) {
-          skip = -(start + 1);
-          // end of block
-          if (skip == numerOfRows) {
-            return bitSet;
-          }
-        } else {
-          // as start will be last index of null value inclusive
-          // so adding 1 to skip last null value
-          skip = start + 1;
-        }
-        startIndex = skip;
-      }
-      for (int k = 0; k < filterValues.length; k++) {
-        start = CarbonUtil
-            .getFirstIndexUsingBinarySearch(dimensionColumnPage, startIndex,
-                numerOfRows - 1, filterValues[k], false);
-        if (start >= 0) {
-          start =
-              CarbonUtil.nextLesserValueToTarget(start, dimensionColumnPage, filterValues[k]);
-        }
-        if (start < 0) {
-          start = -(start + 1);
-
-          if (start >= numerOfRows) {
-            start = numerOfRows - 1;
-          }
-          // When negative value of start is returned from getFirstIndexUsingBinarySearch the Start
-          // will be pointing to the next consecutive position. So compare it again and point to the
-          // previous value returned from getFirstIndexUsingBinarySearch.
-          if (ByteUtil.compare(filterValues[k], dimensionColumnPage.getChunkData(start)) < 0) {
-            start = start - 1;
-          }
-        }
-        last = start;
-        for (int j = start; j >= skip; j--) {
-          bitSet.set(j);
-          last--;
-        }
-        startIndex = last;
-        if (startIndex <= 0) {
-          break;
-        }
-      }
-    } else {
-      for (int k = 0; k < filterValues.length; k++) {
-        for (int i = 0; i < numerOfRows; i++) {
-          if (ByteUtil.compare(dimensionColumnPage.getChunkData(i), filterValues[k]) < 0) {
-            bitSet.set(i);
-          }
-        }
-      }
-    }
-    return bitSet;
-  }
-
-  @Override
-  public void readColumnChunks(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException {
-    if (isDimensionPresentInCurrentBlock[0]) {
-      if (!dimColEvaluatorInfoList.get(0).getDimension().hasEncoding(Encoding.DICTIONARY)) {
-        super.readColumnChunks(rawBlockletColumnChunks);
-      }
-      int chunkIndex = dimensionChunkIndex[0];
-      if (null == rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex]) {
-        rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex] =
-            rawBlockletColumnChunks.getDataBlock().readDimensionChunk(
-                rawBlockletColumnChunks.getFileReader(), chunkIndex);
-      }
-    } else if (isMeasurePresentInCurrentBlock[0]) {
-      int chunkIndex = measureChunkIndex[0];
-      if (null == rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) {
-        rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex] =
-            rawBlockletColumnChunks.getDataBlock().readMeasureChunk(
-                rawBlockletColumnChunks.getFileReader(), chunkIndex);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
deleted file mode 100644
index 3469a54..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFacory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.scan.filter.executer;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
-
-public class RowLevelRangeTypeExecuterFacory {
-
-  private RowLevelRangeTypeExecuterFacory() {
-
-  }
-
-  /**
-   * The method returns the Row Level Range fiter type instance based on
-   * filter tree resolver type.
-   *
-   * @param filterExpressionResolverTree
-   * @param segmentProperties
-   * @return the generator instance
-   */
-  public static RowLevelFilterExecuterImpl getRowLevelRangeTypeExecuter(
-      FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree,
-      SegmentProperties segmentProperties) {
-    switch (filterExecuterType) {
-
-      case ROWLEVEL_LESSTHAN:
-        return new RowLevelRangeLessThanFiterExecuterImpl(
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getDimColEvaluatorInfoList(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMsrColEvalutorInfoList(),
-            filterExpressionResolverTree.getFilterExpression(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues(segmentProperties),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-            .getMeasureFilterRangeValues(), segmentProperties);
-      case ROWLEVEL_LESSTHAN_EQUALTO:
-        return new RowLevelRangeLessThanEqualFilterExecuterImpl(
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getDimColEvaluatorInfoList(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMsrColEvalutorInfoList(),
-            filterExpressionResolverTree.getFilterExpression(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues(segmentProperties),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMeasureFilterRangeValues(), segmentProperties);
-      case ROWLEVEL_GREATERTHAN_EQUALTO:
-        return new RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getDimColEvaluatorInfoList(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMsrColEvalutorInfoList(),
-            filterExpressionResolverTree.getFilterExpression(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues(segmentProperties),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMeasureFilterRangeValues(), segmentProperties);
-      case ROWLEVEL_GREATERTHAN:
-        return new RowLevelRangeGrtThanFiterExecuterImpl(
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getDimColEvaluatorInfoList(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMsrColEvalutorInfoList(),
-            filterExpressionResolverTree.getFilterExpression(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getFilterRangeValues(segmentProperties),
-            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
-                .getMeasureFilterRangeValues(), segmentProperties);
-      default:
-        // Scenario wont come logic must break
-        return null;
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFactory.java
new file mode 100644
index 0000000..3aeb71f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeTypeExecuterFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
+
+public class RowLevelRangeTypeExecuterFactory {
+
+  private RowLevelRangeTypeExecuterFactory() {
+
+  }
+
+  /**
+   * The method returns the Row Level Range fiter type instance based on
+   * filter tree resolver type.
+   *
+   * @param filterExpressionResolverTree
+   * @param segmentProperties
+   * @return the generator instance
+   */
+  public static RowLevelFilterExecuterImpl getRowLevelRangeTypeExecuter(
+      FilterExecuterType filterExecuterType, FilterResolverIntf filterExpressionResolverTree,
+      SegmentProperties segmentProperties) {
+    switch (filterExecuterType) {
+
+      case ROWLEVEL_LESSTHAN:
+        return new RowLevelRangeLessThanFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+            .getMeasureFilterRangeValues(), segmentProperties);
+      case ROWLEVEL_LESSTHAN_EQUALTO:
+        return new RowLevelRangeLessThanEqualFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMeasureFilterRangeValues(), segmentProperties);
+      case ROWLEVEL_GREATERTHAN_EQUALTO:
+        return new RowLevelRangeGrtrThanEquaToFilterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMeasureFilterRangeValues(), segmentProperties);
+      case ROWLEVEL_GREATERTHAN:
+        return new RowLevelRangeGrtThanFiterExecuterImpl(
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getDimColEvaluatorInfoList(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMsrColEvalutorInfoList(),
+            filterExpressionResolverTree.getFilterExpression(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree).getTableIdentifier(),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getFilterRangeValues(segmentProperties),
+            ((RowLevelRangeFilterResolverImpl) filterExpressionResolverTree)
+                .getMeasureFilterRangeValues(), segmentProperties);
+      default:
+        // Scenario wont come logic must break
+        return null;
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
index e85e9d1..c83e662 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelRangeFilterResolverImpl.java
@@ -91,6 +91,8 @@ public class RowLevelRangeFilterResolverImpl extends ConditionalFilterResolverIm
       if (null != dimensionFromCurrentBlock) {
         return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(),
             dimensionFromCurrentBlock, segmentProperties, false);
+      } else {
+        return FilterUtil.getKeyArray(this.dimColEvaluatorInfoList.get(0).getFilterValues(), false);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index a4d6094..25f0099 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -729,6 +729,7 @@ public final class DataTypeUtil {
               .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
         } else {
           try {
+            timeStampformatter.remove();
             Date dateToStr = timeStampformatter.get().parse(data);
             return ByteUtil.toBytes(dateToStr.getTime());
           } catch (ParseException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b52f1571/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 5957abe..a86bf17 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -42,6 +42,7 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
     sql("drop table if exists allKeyCol")
     sql("drop table if exists testalterwithboolean")
     sql("drop table if exists testalterwithbooleanwithoutdefaultvalue")
+    sql("drop table if exists test")
 
 
     // clean data folder
@@ -353,6 +354,10 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
       "alter table default.restructure add columns(designation int) TBLPROPERTIES" +
       "('DEFAULT.VALUE.designation'='67890')")
     checkAnswer(sql("select distinct(designation) from restructure"), Row(67890))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
   }
 
   test("test change datatype of int and decimal column") {
@@ -584,6 +589,108 @@ test("test alter command for boolean data type with correct default measure valu
       "alter table testalterwithbooleanwithoutdefaultvalue add columns(booleanfield boolean)")
     checkAnswer(sql("select * from testalterwithbooleanwithoutdefaultvalue"),Seq(Row(1,"anubhav",null)))
   }
+  test("test alter command for filter on default values on date datatype") {
+    sql("drop table if exists test")
+    sql(
+      "create table test(id int,vin string,phonenumber long,area string,salary int,country " +
+      "string,longdate date) stored by 'carbondata'")
+    sql("insert into test select 1,'String1',12345,'area',20,'country','2017-02-12'")
+    sql("alter table test add columns (c3 date) TBLPROPERTIES('DEFAULT.VALUE.c3' = '1993-01-01')")
+    sql("alter table test add columns (c4 date)")
+    sql("insert into test select 2,'String1',12345,'area',20,'country','2017-02-12','1994-01-01','1994-01-01'")
+    sql("insert into test select 3,'String1',12345,'area',20,'country','2017-02-12','1995-01-01','1995-01-01'")
+    sql("insert into test select 4,'String1',12345,'area',20,'country','2017-02-12','1996-01-01','1996-01-01'")
+    checkAnswer(sql("select id from test where c3='1993-01-01'"), Seq(Row(1)))
+    checkAnswer(sql("select id from test where c3!='1993-01-01'"), Seq(Row(2),Row(3),Row(4)))
+    checkAnswer(sql("select id from test where c3<'1995-01-01'"), Seq(Row(1), Row(2)))
+    checkAnswer(sql("select id from test where c3>'1994-01-01'"), Seq(Row(3), Row(4)))
+    checkAnswer(sql("select id from test where c3>='1995-01-01'"), Seq(Row(3), Row(4)))
+    checkAnswer(sql("select id from test where c3<='1994-01-01'"), Seq(Row(1), Row(2)))
+    checkAnswer(sql("select id from test where c4 IS NULL"), Seq(Row(1)))
+    checkAnswer(sql("select id from test where c4 IS NOT NULL"), Seq(Row(2),Row(3),Row(4)))
+  }
+
+  test("test alter command for filter on default values on timestamp datatype") {
+    def testFilterWithDefaultValue(flag: Boolean) = {
+      try {
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            "yyyy/MM/dd HH:mm:ss")
+        sql("drop table if exists test")
+        sql(
+          "create table test(id int,vin string,phonenumber long,area string,salary int,country " +
+          "string,longdate date) stored by 'carbondata'")
+        sql("insert into test select 1,'String1',12345,'area',20,'country','2017-02-12'")
+        if (flag) {
+          sql(
+            "alter table test add columns (c3 timestamp) TBLPROPERTIES('DEFAULT.VALUE.c3' = " +
+            "'1996/01/01 11:11:11', 'DICTIONARY_INCLUDE' = 'c3')")
+        } else {
+          sql(
+            "alter table test add columns (c3 timestamp) TBLPROPERTIES('DEFAULT.VALUE.c3' = " +
+            "'1996/01/01 11:11:11')")
+        }
+        sql("alter table test add columns (c4 timestamp)")
+        sql(
+          "insert into test select 2,'String1',12345,'area',20,'country','2017-02-12','1994/01/01 10:10:10','1994/01/01 10:10:10'")
+        sql(
+          "insert into test select 3,'String1',12345,'area',20,'country','2017-02-12','1995/01/01 11:11:11','1995/01/01 11:11:11'")
+        sql(
+          "insert into test select 4,'String1',12345,'area',20,'country','2017-02-12','1996/01/01 10:10:10','1996/01/01 10:10:10'")
+        checkAnswer(sql("select id from test where c3='1996-01-01 11:11:11'"), Seq(Row(1)))
+        checkAnswer(sql("select id from test where c3!='1996-01-01 11:11:11'"), Seq(Row(2),Row(3),Row(4)))
+        checkAnswer(sql("select id from test where c3<'1995-01-01 11:11:11'"), Seq(Row(2)))
+        checkAnswer(sql("select id from test where c3>'1994-01-02 11:11:11'"),
+          Seq(Row(3), Row(4), Row(1)))
+        checkAnswer(sql("select id from test where c3>='1995-01-01 11:11:11'"),
+          Seq(Row(3), Row(4), Row(1)))
+        checkAnswer(sql("select id from test where c3<='1995-01-02 11:11:11'"), Seq(Row(2), Row(3)))
+        checkAnswer(sql("select id from test where c4 IS NULL"), Seq(Row(1)))
+        checkAnswer(sql("select id from test where c4 IS NOT NULL"), Seq(Row(2),Row(3),Row(4)))
+      }
+      finally {
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      }
+    }
+
+    testFilterWithDefaultValue(false)
+    testFilterWithDefaultValue(true)
+
+  }
+
+  test("test alter command for filter on default values on int") {
+    def testFilterWithDefaultValue(flag: Boolean) = {
+      sql("drop table if exists test")
+      sql(
+        "create table test(id int,vin string,phonenumber long,area string,salary int,country " +
+        "string,longdate date) stored by 'carbondata'")
+      sql("insert into test select 1,'String1',12345,'area',5000,'country','2017/02/12'")
+      if (flag) {
+        sql(s"alter table test add columns (c3 int) TBLPROPERTIES('DEFAULT.VALUE.c3' = '23', " +
+            s"'DICTIONARY_INCLUDE'='c3')")
+      } else {
+        sql(s"alter table test add columns (c3 int) TBLPROPERTIES('DEFAULT.VALUE.c3' = '23')")
+      }
+      sql(s"alter table test add columns (c4 int)")
+      sql("insert into test select 2,'String1',12345,'area',5000,'country','2017/02/12',25,25")
+      sql("insert into test select 3,'String1',12345,'area',5000,'country','2017/02/12',35,35")
+      sql("insert into test select 4,'String1',12345,'area',5000,'country','2017/02/12',45,45")
+      checkAnswer(sql("select id from test where c3=23"), Seq(Row(1)))
+      checkAnswer(sql("select id from test where c3!=23"), Seq(Row(2), Row(3), Row(4)))
+      checkAnswer(sql("select id from test where c3<34"), Seq(Row(1), Row(2)))
+      checkAnswer(sql("select id from test where c3>24"), Seq(Row(2), Row(3), Row(4)))
+      checkAnswer(sql("select id from test where c3>=35"), Seq(Row(3), Row(4)))
+      checkAnswer(sql("select id from test where c3<=35"), Seq(Row(1), Row(2), Row(3)))
+      checkAnswer(sql("select id from test where c4 IS NULL"), Seq(Row(1)))
+      checkAnswer(sql("select id from test where c4 IS NOT NULL"), Seq(Row(2),Row(3),Row(4)))
+    }
+
+    testFilterWithDefaultValue(true)
+    testFilterWithDefaultValue(false)
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS restructure")
     sql("drop table if exists table1")
@@ -599,6 +706,6 @@ test("test alter command for boolean data type with correct default measure valu
     sql("drop table if exists allKeyCol")
     sql("drop table if exists testalterwithboolean")
     sql("drop table if exists testalterwithbooleanwithoutdefaultvalue")
-
+    sql("drop table if exists test")
   }
 }