You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/07/06 14:42:12 UTC

[47/50] [abbrv] carbondata git commit: Measure Filter implementation

Measure Filter implementation

Measure Implementation for Include and Exclude Filter

RowLevel Measure Implementation

RowLevel Less LessThan Greater GreaterThan Implementation for measure

Rectify Datatype Conversion Measure

Restructure Changes for Measure

Rebase to Branch-1.1

Handling DataType Comparision and Rebase Error Rectify

Review Comments Implementation


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/17db292a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/17db292a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/17db292a

Branch: refs/heads/branch-1.1
Commit: 17db292a0d52057d3296e96d18e7dd700e9f274c
Parents: d4adc09
Author: sounakr <so...@gmail.com>
Authored: Tue Jun 20 22:52:36 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Jul 3 15:18:20 2017 +0530

----------------------------------------------------------------------
 .../core/datastore/block/SegmentProperties.java |   7 +
 .../schema/table/column/CarbonColumn.java       |   7 +
 .../core/scan/expression/ColumnExpression.java  |  21 ++
 .../conditional/ConditionalExpression.java      |   2 +-
 .../logical/BinaryLogicalExpression.java        |   4 +-
 .../core/scan/filter/ColumnFilterInfo.java      |  88 ++++++
 .../core/scan/filter/DimColumnFilterInfo.java   |  78 -----
 .../scan/filter/FilterExpressionProcessor.java  |  70 ++++-
 .../carbondata/core/scan/filter/FilterUtil.java | 281 +++++++++++++++----
 .../ExcludeColGroupFilterExecuterImpl.java      |   2 +-
 .../executer/ExcludeFilterExecuterImpl.java     | 156 ++++++++--
 .../IncludeColGroupFilterExecuterImpl.java      |   2 +-
 .../executer/IncludeFilterExecuterImpl.java     | 241 +++++++++++++---
 .../MeasureColumnExecuterFilterInfo.java        |  30 ++
 .../executer/RestructureEvaluatorImpl.java      |  34 ++-
 .../RestructureExcludeFilterExecutorImpl.java   |  17 +-
 .../RestructureIncludeFilterExecutorImpl.java   |  17 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  24 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  | 237 ++++++++++++----
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java | 239 ++++++++++++----
 ...velRangeLessThanEqualFilterExecuterImpl.java | 205 +++++++++++---
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 207 +++++++++++---
 .../resolver/ConditionalFilterResolverImpl.java | 105 ++++---
 .../filter/resolver/FilterResolverIntf.java     |   9 +
 .../resolver/LogicalFilterResolverImpl.java     |   4 +
 .../resolver/RowLevelFilterResolverImpl.java    |   3 +-
 .../RowLevelRangeFilterResolverImpl.java        |  97 +++++--
 .../resolverinfo/ColumnResolvedFilterInfo.java  |  22 ++
 .../DimColumnResolvedFilterInfo.java            |  22 +-
 .../MeasureColumnResolvedFilterInfo.java        |  98 ++++++-
 .../TrueConditionalResolverImpl.java            |   2 +-
 .../visitor/CustomTypeDictionaryVisitor.java    |  17 +-
 .../visitor/DictionaryColumnVisitor.java        |  11 +-
 .../visitor/FilterInfoTypeVisitorFactory.java   |  16 +-
 .../visitor/MeasureColumnVisitor.java           |  77 +++++
 .../visitor/NoDictionaryTypeVisitor.java        |  10 +-
 .../visitor/RangeDictionaryColumnVisitor.java   |  10 +-
 .../visitor/RangeDirectDictionaryVisitor.java   |  10 +-
 .../visitor/RangeNoDictionaryTypeVisitor.java   |  10 +-
 .../visitor/ResolvedFilterInfoVisitorIntf.java  |   5 +-
 .../carbondata/core/scan/model/QueryModel.java  |  18 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   8 +
 .../carbondata/core/util/DataTypeUtil.java      |  67 +++++
 .../core/scan/filter/FilterUtilTest.java        |   9 +-
 .../ExpressionWithNullTestCase.scala            |  10 +-
 .../spark/sql/SparkUnknownExpression.scala      |   2 +-
 .../spark/sql/SparkUnknownExpression.scala      |   2 +-
 .../vectorreader/AddColumnTestCases.scala       |   4 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  14 +
 50 files changed, 2114 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 3bc208d..c08e570 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -151,6 +151,8 @@ public class SegmentProperties {
    */
   private ColumnGroupModel colGroupModel;
 
+  private int lastDimensionColOrdinal;
+
   public SegmentProperties(List<ColumnSchema> columnsInTable, int[] columnCardinality) {
     dimensions = new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     complexDimensions =
@@ -392,6 +394,7 @@ public class SegmentProperties {
       }
       counter++;
     }
+    lastDimensionColOrdinal = dimensonOrdinal;
     dimColumnsCardinality = new int[cardinalityIndexForNormalDimensionColumn.size()];
     complexDimColumnCardinality = new int[cardinalityIndexForComplexDimensionColumn.size()];
     int index = 0;
@@ -797,4 +800,8 @@ public class SegmentProperties {
     return CarbonUtil.getMeasureFromCurrentBlock(this.measures, columnId);
   }
 
+  public int getLastDimensionColOrdinal() {
+    return lastDimensionColOrdinal;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index 11dc21b..17ffb5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -139,6 +139,13 @@ public class CarbonColumn implements Serializable {
   }
 
   /**
+   * @return true if column is measure, otherwise false
+   */
+  public Boolean isMeasure() {
+    return !isDimesion();
+  }
+
+  /**
    * return the visibility
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
index 69fa9d6..a9fea79 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.scan.expression;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 
@@ -31,12 +32,16 @@ public class ColumnExpression extends LeafExpression {
 
   private boolean isDimension;
 
+  private boolean isMeasure;
+
   private int colIndex = -1;
 
   private DataType dataType;
 
   private CarbonDimension dimension;
 
+  private CarbonMeasure measure;
+
   private CarbonColumn carbonColumn;
 
   public ColumnExpression(String columnName, DataType dataType) {
@@ -53,6 +58,14 @@ public class ColumnExpression extends LeafExpression {
     this.dimension = dimension;
   }
 
+  public CarbonMeasure getMeasure() {
+    return measure;
+  }
+
+  public void setMeasure(CarbonMeasure measure) {
+    this.measure = measure;
+  }
+
   public String getColumnName() {
     return columnName;
   }
@@ -69,6 +82,14 @@ public class ColumnExpression extends LeafExpression {
     this.isDimension = isDimension;
   }
 
+  public boolean isMeasure() {
+    return isMeasure;
+  }
+
+  public void setMeasure(boolean isMeasure) {
+    this.isMeasure = isMeasure;
+  }
+
   public int getColIndex() {
     return colIndex;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
index 265fa32..d7b940c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
@@ -28,7 +28,7 @@ public interface ConditionalExpression {
   // traversing the tree
   List<ColumnExpression> getColumnList();
 
-  boolean isSingleDimension();
+  boolean isSingleColumn();
 
   List<ExpressionResult> getLiterals();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
index 1228793..1a5b6a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
@@ -80,11 +80,11 @@ public abstract class BinaryLogicalExpression extends BinaryExpression {
     }
   }
 
-  public boolean isSingleDimension() {
+  public boolean isSingleColumn() {
     List<ColumnExpression> listOfExp =
         new ArrayList<ColumnExpression>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     getColumnList(this, listOfExp);
-    if (listOfExp.size() == 1 && listOfExp.get(0).isDimension()) {
+    if (listOfExp.size() == 1 && (listOfExp.get(0).isDimension() || listOfExp.get(0).isMeasure())) {
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
new file mode 100644
index 0000000..008d908
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class ColumnFilterInfo implements Serializable {
+
+  private static final long serialVersionUID = 8181578747306832771L;
+
+  private boolean isIncludeFilter;
+
+  private List<Integer> filterList;
+
+  /**
+   * Implicit column filter values to be used for block and blocklet pruning
+   */
+  private List<String> implicitColumnFilterList;
+  private List<Integer> excludeFilterList;
+  /**
+   * maintain the no dictionary filter values list.
+   */
+  private List<byte[]> noDictionaryFilterValuesList;
+
+  private List<byte[]> measuresFilterValuesList;
+
+  public List<byte[]> getNoDictionaryFilterValuesList() {
+    return noDictionaryFilterValuesList;
+  }
+
+  public boolean isIncludeFilter() {
+    return isIncludeFilter;
+  }
+
+  public void setIncludeFilter(boolean isIncludeFilter) {
+    this.isIncludeFilter = isIncludeFilter;
+  }
+
+  public List<Integer> getFilterList() {
+    return filterList;
+  }
+
+  public void setFilterList(List<Integer> filterList) {
+    this.filterList = filterList;
+  }
+
+  public void setFilterListForNoDictionaryCols(List<byte[]> noDictionaryFilterValuesList) {
+    this.noDictionaryFilterValuesList = noDictionaryFilterValuesList;
+  }
+
+  public List<Integer> getExcludeFilterList() {
+    return excludeFilterList;
+  }
+  public void setExcludeFilterList(List<Integer> excludeFilterList) {
+    this.excludeFilterList = excludeFilterList;
+  }
+  public List<String> getImplicitColumnFilterList() {
+    return implicitColumnFilterList;
+  }
+
+  public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) {
+    this.implicitColumnFilterList = implicitColumnFilterList;
+  }
+
+  public List<byte[]> getMeasuresFilterValuesList() {
+    return measuresFilterValuesList;
+  }
+
+  public void setMeasuresFilterValuesList(List<byte[]> measuresFilterValuesList) {
+    this.measuresFilterValuesList = measuresFilterValuesList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java
deleted file mode 100644
index 16c6965..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.scan.filter;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class DimColumnFilterInfo implements Serializable {
-
-  private static final long serialVersionUID = 8181578747306832771L;
-
-  private boolean isIncludeFilter;
-
-  private List<Integer> filterList;
-
-  /**
-   * Implicit column filter values to be used for block and blocklet pruning
-   */
-  private List<String> implicitColumnFilterList;
-  private List<Integer> excludeFilterList;
-  /**
-   * maintain the no dictionary filter values list.
-   */
-  private List<byte[]> noDictionaryFilterValuesList;
-
-  public List<byte[]> getNoDictionaryFilterValuesList() {
-    return noDictionaryFilterValuesList;
-  }
-
-  public boolean isIncludeFilter() {
-    return isIncludeFilter;
-  }
-
-  public void setIncludeFilter(boolean isIncludeFilter) {
-    this.isIncludeFilter = isIncludeFilter;
-  }
-
-  public List<Integer> getFilterList() {
-    return filterList;
-  }
-
-  public void setFilterList(List<Integer> filterList) {
-    this.filterList = filterList;
-  }
-
-  public void setFilterListForNoDictionaryCols(List<byte[]> noDictionaryFilterValuesList) {
-    this.noDictionaryFilterValuesList = noDictionaryFilterValuesList;
-  }
-
-  public List<Integer> getExcludeFilterList() {
-    return excludeFilterList;
-  }
-  public void setExcludeFilterList(List<Integer> excludeFilterList) {
-    this.excludeFilterList = excludeFilterList;
-  }
-  public List<String> getImplicitColumnFilterList() {
-    return implicitColumnFilterList;
-  }
-
-  public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) {
-    this.implicitColumnFilterList = implicitColumnFilterList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 264c0ae..80d0bc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -259,11 +259,34 @@ public class FilterExpressionProcessor implements FilterProcessor {
         return new TrueConditionalResolverImpl(expression, false, false, tableIdentifier);
       case EQUALS:
         currentCondExpression = (BinaryConditionalExpression) expression;
-        if (currentCondExpression.isSingleDimension()
+        if (currentCondExpression.isSingleColumn()
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.ARRAY
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.STRUCT) {
+
+          if (currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure()) {
+            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
+                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
+                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
+                    || FilterUtil
+                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
+              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, true,
+                  tableIdentifier);
+            }
+            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
+                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.GREATERTHAN_EQUALTO
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.LESSTHAN_EQUALTO) {
+              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, true,
+                  tableIdentifier);
+            }
+            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
+                tableIdentifier,
+                currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
+          }
           // getting new dim index.
           if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
               .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
@@ -287,20 +310,44 @@ public class FilterExpressionProcessor implements FilterProcessor {
             }
           }
           return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
-              tableIdentifier);
+              tableIdentifier,
+              currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
 
         }
         break;
       case RANGE:
         return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
-            tableIdentifier);
+            tableIdentifier, false);
       case NOT_EQUALS:
         currentCondExpression = (BinaryConditionalExpression) expression;
-        if (currentCondExpression.isSingleDimension()
+        if (currentCondExpression.isSingleColumn()
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.ARRAY
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.STRUCT) {
+
+          if (currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure()) {
+            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
+                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
+                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
+                    || FilterUtil
+                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
+              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, false,
+                  tableIdentifier);
+            }
+            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
+                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.GREATERTHAN_EQUALTO
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.LESSTHAN_EQUALTO) {
+              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, false,
+                  tableIdentifier);
+            }
+            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
+                tableIdentifier, true);
+          }
+
           if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
               .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
               .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
@@ -322,31 +369,32 @@ public class FilterExpressionProcessor implements FilterProcessor {
             }
 
             return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
-                tableIdentifier);
+                tableIdentifier, false);
           }
           return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
-              tableIdentifier);
+              tableIdentifier, false);
         }
         break;
 
       default:
         if (expression instanceof ConditionalExpression) {
           condExpression = (ConditionalExpression) expression;
-          if (condExpression.isSingleDimension()
+          if (condExpression.isSingleColumn()
               && condExpression.getColumnList().get(0).getCarbonColumn().getDataType()
               != DataType.ARRAY
               && condExpression.getColumnList().get(0).getCarbonColumn().getDataType()
               != DataType.STRUCT) {
             condExpression = (ConditionalExpression) expression;
-            if (condExpression.getColumnList().get(0).getCarbonColumn()
+            if ((condExpression.getColumnList().get(0).getCarbonColumn()
                 .hasEncoding(Encoding.DICTIONARY) && !condExpression.getColumnList().get(0)
-                .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-              return new ConditionalFilterResolverImpl(expression, true, true, tableIdentifier);
+                .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY))
+                || (currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure())) {
+              return new ConditionalFilterResolverImpl(expression, true, true, tableIdentifier,
+                  currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
             }
           }
         }
     }
     return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 73387db..9bdf7f2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -56,6 +56,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
@@ -70,6 +71,7 @@ import org.apache.carbondata.core.scan.filter.executer.ExcludeFilterExecuterImpl
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.IncludeColGroupFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.IncludeFilterExecuterImpl;
+import org.apache.carbondata.core.scan.filter.executer.MeasureColumnExecuterFilterInfo;
 import org.apache.carbondata.core.scan.filter.executer.OrFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.RestructureExcludeFilterExecutorImpl;
@@ -85,6 +87,7 @@ import org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolver
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -117,10 +120,12 @@ public final class FilterUtil {
       switch (filterExecuterType) {
         case INCLUDE:
           return getIncludeFilterExecuter(
-              filterExpressionResolverTree.getDimColResolvedFilterInfo(), segmentProperties);
+              filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+              filterExpressionResolverTree.getMsrColResolvedFilterInfo(), segmentProperties);
         case EXCLUDE:
           return getExcludeFilterExecuter(
-              filterExpressionResolverTree.getDimColResolvedFilterInfo(), segmentProperties);
+              filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+              filterExpressionResolverTree.getMsrColResolvedFilterInfo(), segmentProperties);
         case OR:
           return new OrFilterExecuterImpl(
               createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties,
@@ -180,9 +185,27 @@ public final class FilterUtil {
    * @return
    */
   private static FilterExecuter getIncludeFilterExecuter(
-      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties) {
-
-    if (dimColResolvedFilterInfo.getDimension().isColumnar()) {
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      MeasureColumnResolvedFilterInfo msrColResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
+    if (null != msrColResolvedFilterInfo) {
+      CarbonMeasure measuresFromCurrentBlock = segmentProperties
+          .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+      if (null != measuresFromCurrentBlock) {
+        // update dimension and column index according to the dimension position in current block
+        MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
+            msrColResolvedFilterInfo.getCopyObject();
+        msrColResolvedFilterInfoCopyObject.setMeasure(measuresFromCurrentBlock);
+        msrColResolvedFilterInfoCopyObject.setColumnIndex(measuresFromCurrentBlock.getOrdinal());
+        msrColResolvedFilterInfoCopyObject.setType(measuresFromCurrentBlock.getDataType());
+        return new IncludeFilterExecuterImpl(null, msrColResolvedFilterInfoCopyObject,
+            segmentProperties, true);
+      } else {
+        return new RestructureIncludeFilterExecutorImpl(dimColResolvedFilterInfo,
+            msrColResolvedFilterInfo, segmentProperties, true);
+      }
+    }
+    if (null != dimColResolvedFilterInfo && dimColResolvedFilterInfo.getDimension().isColumnar()) {
       CarbonDimension dimensionFromCurrentBlock =
           segmentProperties.getDimensionFromCurrentBlock(dimColResolvedFilterInfo.getDimension());
       if (null != dimensionFromCurrentBlock) {
@@ -191,10 +214,11 @@ public final class FilterUtil {
             dimColResolvedFilterInfo.getCopyObject();
         dimColResolvedFilterInfoCopyObject.setDimension(dimensionFromCurrentBlock);
         dimColResolvedFilterInfoCopyObject.setColumnIndex(dimensionFromCurrentBlock.getOrdinal());
-        return new IncludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, segmentProperties);
+        return new IncludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, null,
+            segmentProperties, false);
       } else {
         return new RestructureIncludeFilterExecutorImpl(dimColResolvedFilterInfo,
-            segmentProperties);
+            msrColResolvedFilterInfo, segmentProperties, false);
       }
     } else {
       return new IncludeColGroupFilterExecuterImpl(dimColResolvedFilterInfo, segmentProperties);
@@ -209,9 +233,29 @@ public final class FilterUtil {
    * @return
    */
   private static FilterExecuter getExcludeFilterExecuter(
-      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties) {
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      MeasureColumnResolvedFilterInfo msrColResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
 
-    if (dimColResolvedFilterInfo.getDimension().isColumnar()) {
+    if (null != msrColResolvedFilterInfo) {
+      CarbonMeasure measuresFromCurrentBlock = segmentProperties
+          .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+      if (null != measuresFromCurrentBlock) {
+        // update dimension and column index according to the dimension position in current block
+        MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
+            msrColResolvedFilterInfo.getCopyObject();
+        msrColResolvedFilterInfoCopyObject.setMeasure(measuresFromCurrentBlock);
+        msrColResolvedFilterInfoCopyObject.setColumnIndex(measuresFromCurrentBlock.getOrdinal());
+        msrColResolvedFilterInfoCopyObject.setType(measuresFromCurrentBlock.getDataType());
+        return new ExcludeFilterExecuterImpl(null, msrColResolvedFilterInfoCopyObject,
+            segmentProperties, true);
+      } else {
+        return new RestructureExcludeFilterExecutorImpl(dimColResolvedFilterInfo,
+            msrColResolvedFilterInfo, segmentProperties, true);
+      }
+    }
+    if ((null != dimColResolvedFilterInfo) && (dimColResolvedFilterInfo.getDimension()
+        .isColumnar())) {
       CarbonDimension dimensionFromCurrentBlock =
           segmentProperties.getDimensionFromCurrentBlock(dimColResolvedFilterInfo.getDimension());
       if (null != dimensionFromCurrentBlock) {
@@ -220,10 +264,11 @@ public final class FilterUtil {
             dimColResolvedFilterInfo.getCopyObject();
         dimColResolvedFilterInfoCopyObject.setDimension(dimensionFromCurrentBlock);
         dimColResolvedFilterInfoCopyObject.setColumnIndex(dimensionFromCurrentBlock.getOrdinal());
-        return new ExcludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, segmentProperties);
+        return new ExcludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, null,
+            segmentProperties, false);
       } else {
         return new RestructureExcludeFilterExecutorImpl(dimColResolvedFilterInfo,
-            segmentProperties);
+            msrColResolvedFilterInfo, segmentProperties, false);
       }
     } else {
       return new ExcludeColGroupFilterExecuterImpl(dimColResolvedFilterInfo, segmentProperties);
@@ -349,13 +394,13 @@ public final class FilterUtil {
 
   /**
    * This method will get the no dictionary data based on filters and same
-   * will be in DimColumnFilterInfo
+   * will be in ColumnFilterInfo
    *
    * @param evaluateResultListFinal
    * @param isIncludeFilter
-   * @return DimColumnFilterInfo
+   * @return ColumnFilterInfo
    */
-  public static DimColumnFilterInfo getNoDictionaryValKeyMemberForFilter(
+  public static ColumnFilterInfo getNoDictionaryValKeyMemberForFilter(
       List<String> evaluateResultListFinal, boolean isIncludeFilter) {
     List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
     for (String result : evaluateResultListFinal) {
@@ -371,9 +416,9 @@ public final class FilterUtil {
 
     };
     Collections.sort(filterValuesList, filterNoDictValueComaparator);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
     if (filterValuesList.size() > 0) {
-      columnFilterInfo = new DimColumnFilterInfo();
+      columnFilterInfo = new ColumnFilterInfo();
       columnFilterInfo.setIncludeFilter(isIncludeFilter);
       columnFilterInfo.setFilterListForNoDictionaryCols(filterValuesList);
 
@@ -382,6 +427,55 @@ public final class FilterUtil {
   }
 
   /**
+   * This method will get the no dictionary data based on filters and same
+   * will be in ColumnFilterInfo
+   *
+   * @param evaluateResultListFinal
+   * @param isIncludeFilter
+   * @return ColumnFilterInfo
+   */
+  public static ColumnFilterInfo getMeasureValKeyMemberForFilter(
+      List<String> evaluateResultListFinal, boolean isIncludeFilter, DataType dataType,
+      CarbonMeasure carbonMeasure) throws FilterUnsupportedException {
+    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    String result = null;
+    try {
+      int length = evaluateResultListFinal.size();
+      for (int i = 0; i < length; i++) {
+        result = evaluateResultListFinal.get(i);
+        if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(result)) {
+          filterValuesList.add(new byte[0]);
+          continue;
+        }
+
+        filterValuesList
+            .add(DataTypeUtil.getMeasureByteArrayBasedOnDataTypes(result, dataType, carbonMeasure));
+
+      }
+    } catch (Throwable ex) {
+      throw new FilterUnsupportedException("Unsupported Filter condition: " + result, ex);
+    }
+
+    Comparator<byte[]> filterMeasureComaparator = new Comparator<byte[]>() {
+
+      @Override public int compare(byte[] filterMember1, byte[] filterMember2) {
+        // TODO Auto-generated method stub
+        return ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterMember1, filterMember2);
+      }
+
+    };
+    Collections.sort(filterValuesList, filterMeasureComaparator);
+    ColumnFilterInfo columnFilterInfo = null;
+    if (filterValuesList.size() > 0) {
+      columnFilterInfo = new ColumnFilterInfo();
+      columnFilterInfo.setIncludeFilter(isIncludeFilter);
+      columnFilterInfo.setMeasuresFilterValuesList(filterValuesList);
+
+    }
+    return columnFilterInfo;
+  }
+
+  /**
    * Method will prepare the  dimfilterinfo instance by resolving the filter
    * expression value to its respective surrogates.
    *
@@ -392,7 +486,7 @@ public final class FilterUtil {
    * @return
    * @throws IOException
    */
-  public static DimColumnFilterInfo getFilterValues(AbsoluteTableIdentifier tableIdentifier,
+  public static ColumnFilterInfo getFilterValues(AbsoluteTableIdentifier tableIdentifier,
       ColumnExpression columnExpression, List<String> evaluateResultList, boolean isIncludeFilter)
       throws IOException {
     Dictionary forwardDictionary = null;
@@ -417,7 +511,7 @@ public final class FilterUtil {
    * @param isIncludeFilter
    * @return
    */
-  private static DimColumnFilterInfo getFilterValues(ColumnExpression columnExpression,
+  private static ColumnFilterInfo getFilterValues(ColumnExpression columnExpression,
       List<String> evaluateResultList, Dictionary forwardDictionary, boolean isIncludeFilter) {
     sortFilterModelMembers(columnExpression, evaluateResultList);
     List<Integer> surrogates =
@@ -425,9 +519,9 @@ public final class FilterUtil {
     // Reading the dictionary value from cache.
     getDictionaryValue(evaluateResultList, forwardDictionary, surrogates);
     Collections.sort(surrogates);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
     if (surrogates.size() > 0) {
-      columnFilterInfo = new DimColumnFilterInfo();
+      columnFilterInfo = new ColumnFilterInfo();
       columnFilterInfo.setIncludeFilter(isIncludeFilter);
       columnFilterInfo.setFilterList(surrogates);
     }
@@ -455,11 +549,11 @@ public final class FilterUtil {
    * @param expression
    * @param columnExpression
    * @param isIncludeFilter
-   * @return DimColumnFilterInfo
+   * @return ColumnFilterInfo
    * @throws FilterUnsupportedException
    * @throws IOException
    */
-  public static DimColumnFilterInfo getFilterListForAllValues(
+  public static ColumnFilterInfo getFilterListForAllValues(
       AbsoluteTableIdentifier tableIdentifier, Expression expression,
       final ColumnExpression columnExpression, boolean isIncludeFilter)
       throws IOException, FilterUnsupportedException {
@@ -524,11 +618,11 @@ public final class FilterUtil {
    * @return
    * @throws FilterUnsupportedException
    */
-  public static DimColumnFilterInfo getFilterListForRS(Expression expression,
+  public static ColumnFilterInfo getFilterListForRS(Expression expression,
       ColumnExpression columnExpression, String defaultValues, int defaultSurrogate)
       throws FilterUnsupportedException {
     List<Integer> filterValuesList = new ArrayList<Integer>(20);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
     // List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
     List<String> evaluateResultListFinal = new ArrayList<String>(20);
     // KeyGenerator keyGenerator =
@@ -550,7 +644,7 @@ public final class FilterUtil {
         }
       }
       if (filterValuesList.size() > 0) {
-        columnFilterInfo = new DimColumnFilterInfo();
+        columnFilterInfo = new ColumnFilterInfo();
         columnFilterInfo.setFilterList(filterValuesList);
       }
     } catch (FilterIllegalMemberException e) {
@@ -571,12 +665,12 @@ public final class FilterUtil {
    * @return
    * @throws FilterUnsupportedException
    */
-  public static DimColumnFilterInfo getFilterListForAllMembersRS(Expression expression,
+  public static ColumnFilterInfo getFilterListForAllMembersRS(Expression expression,
       ColumnExpression columnExpression, String defaultValues, int defaultSurrogate,
       boolean isIncludeFilter) throws FilterUnsupportedException {
     List<Integer> filterValuesList = new ArrayList<Integer>(20);
     List<String> evaluateResultListFinal = new ArrayList<String>(20);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
 
     // KeyGenerator keyGenerator =
     // KeyGeneratorFactory.getKeyGenerator(new int[] { defaultSurrogate });
@@ -602,7 +696,7 @@ public final class FilterUtil {
     if (null == defaultValues) {
       defaultValues = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
     }
-    columnFilterInfo = new DimColumnFilterInfo();
+    columnFilterInfo = new ColumnFilterInfo();
     for (int i = 0; i < evaluateResultListFinal.size(); i++) {
       if (evaluateResultListFinal.get(i).equals(defaultValues)) {
         filterValuesList.add(defaultSurrogate);
@@ -617,16 +711,21 @@ public final class FilterUtil {
    * Below method will be used to covert the filter surrogate keys
    * to mdkey
    *
-   * @param dimColumnFilterInfo
+   * @param columnFilterInfo
    * @param carbonDimension
    * @param segmentProperties
    * @return
    */
-  public static byte[][] getKeyArray(DimColumnFilterInfo dimColumnFilterInfo,
-      CarbonDimension carbonDimension, SegmentProperties segmentProperties) {
+  public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo,
+      CarbonDimension carbonDimension, CarbonMeasure carbonMeasure,
+      SegmentProperties segmentProperties) {
+    if (null != carbonMeasure) {
+      return columnFilterInfo.getMeasuresFilterValuesList()
+          .toArray((new byte[columnFilterInfo.getMeasuresFilterValuesList().size()][]));
+    }
     if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
-      return dimColumnFilterInfo.getNoDictionaryFilterValuesList()
-          .toArray((new byte[dimColumnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
+      return columnFilterInfo.getNoDictionaryFilterValuesList()
+          .toArray((new byte[columnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
     }
     KeyGenerator blockLevelKeyGenerator = segmentProperties.getDimensionKeyGenerator();
     int[] dimColumnsCardinality = segmentProperties.getDimColumnsCardinality();
@@ -634,10 +733,10 @@ public final class FilterUtil {
     List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
     Arrays.fill(keys, 0);
     int keyOrdinalOfDimensionFromCurrentBlock = carbonDimension.getKeyOrdinal();
-    if (null != dimColumnFilterInfo) {
+    if (null != columnFilterInfo) {
       int[] rangesForMaskedByte =
           getRangesForMaskedByte(keyOrdinalOfDimensionFromCurrentBlock, blockLevelKeyGenerator);
-      for (Integer surrogate : dimColumnFilterInfo.getFilterList()) {
+      for (Integer surrogate : columnFilterInfo.getFilterList()) {
         try {
           if (surrogate <= dimColumnsCardinality[keyOrdinalOfDimensionFromCurrentBlock]) {
             keys[keyOrdinalOfDimensionFromCurrentBlock] = surrogate;
@@ -690,7 +789,7 @@ public final class FilterUtil {
    * @param startKeyList
    * @return long[] start key
    */
-  public static void getStartKey(Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+  public static void getStartKey(Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       SegmentProperties segmentProperties, long[] startKey, List<long[]> startKeyList) {
     for (int i = 0; i < startKey.length; i++) {
       // The min surrogate key is 1, set it as the init value for starkey of each column level
@@ -719,17 +818,17 @@ public final class FilterUtil {
       DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
       SegmentProperties segmentProperties,
       SortedMap<Integer, byte[]> setOfStartKeyByteArray) {
-    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+    Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter =
         dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
     // step 1
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
       if (!entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
-        List<DimColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
+        List<ColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
         if (null == listOfDimColFilterInfo) {
           continue;
         }
         boolean isExcludePresent = false;
-        for (DimColumnFilterInfo info : listOfDimColFilterInfo) {
+        for (ColumnFilterInfo info : listOfDimColFilterInfo) {
           if (!info.isIncludeFilter()) {
             isExcludePresent = true;
           }
@@ -782,17 +881,17 @@ public final class FilterUtil {
       SegmentProperties segmentProperties,
       SortedMap<Integer, byte[]> setOfEndKeyByteArray) {
 
-    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+    Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter =
         dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
     // step 1
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
       if (!entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
-        List<DimColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
+        List<ColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
         if (null == listOfDimColFilterInfo) {
           continue;
         }
         boolean isExcludePresent = false;
-        for (DimColumnFilterInfo info : listOfDimColFilterInfo) {
+        for (ColumnFilterInfo info : listOfDimColFilterInfo) {
           if (!info.isIncludeFilter()) {
             isExcludePresent = true;
           }
@@ -854,15 +953,15 @@ public final class FilterUtil {
    * @param startKey
    */
   private static void getStartKeyWithFilter(
-      Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+      Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       SegmentProperties segmentProperties, long[] startKey, List<long[]> startKeyList) {
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
-      List<DimColumnFilterInfo> values = entry.getValue();
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<ColumnFilterInfo> values = entry.getValue();
       if (null == values || !entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
         continue;
       }
       boolean isExcludePresent = false;
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (!info.isIncludeFilter()) {
           isExcludePresent = true;
         }
@@ -879,7 +978,7 @@ public final class FilterUtil {
         continue;
       }
       int keyOrdinalOfDimensionFromCurrentBlock = dimensionFromCurrentBlock.getKeyOrdinal();
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (startKey[keyOrdinalOfDimensionFromCurrentBlock] < info.getFilterList().get(0)) {
           startKey[keyOrdinalOfDimensionFromCurrentBlock] = info.getFilterList().get(0);
         }
@@ -890,7 +989,7 @@ public final class FilterUtil {
     }
   }
 
-  public static void getEndKey(Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+  public static void getEndKey(Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       long[] endKey, SegmentProperties segmentProperties,
       List<long[]> endKeyList) {
 
@@ -919,15 +1018,15 @@ public final class FilterUtil {
   }
 
   private static void getEndKeyWithFilter(
-      Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+      Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       SegmentProperties segmentProperties, long[] endKey, List<long[]> endKeyList) {
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
-      List<DimColumnFilterInfo> values = entry.getValue();
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<ColumnFilterInfo> values = entry.getValue();
       if (null == values || !entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
         continue;
       }
       boolean isExcludeFilterPresent = false;
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (!info.isIncludeFilter()) {
           isExcludeFilterPresent = true;
         }
@@ -944,7 +1043,7 @@ public final class FilterUtil {
         continue;
       }
       int keyOrdinalOfDimensionFromCurrentBlock = dimensionFromCurrentBlock.getKeyOrdinal();
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (endKey[keyOrdinalOfDimensionFromCurrentBlock] > info.getFilterList()
             .get(info.getFilterList().size() - 1)) {
           endKey[keyOrdinalOfDimensionFromCurrentBlock] =
@@ -1023,12 +1122,17 @@ public final class FilterUtil {
    * @param dimension
    * @param dimColumnExecuterInfo
    */
-  public static void prepareKeysFromSurrogates(DimColumnFilterInfo filterValues,
+  public static void prepareKeysFromSurrogates(ColumnFilterInfo filterValues,
       SegmentProperties segmentProperties, CarbonDimension dimension,
-      DimColumnExecuterFilterInfo dimColumnExecuterInfo) {
-    byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, segmentProperties);
-    dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
-
+      DimColumnExecuterFilterInfo dimColumnExecuterInfo, CarbonMeasure measures,
+      MeasureColumnExecuterFilterInfo msrColumnExecuterInfo) {
+    if (null != measures) {
+      byte[][] keysBasedOnFilter = getKeyArray(filterValues, null, measures, segmentProperties);
+      msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+    } else {
+      byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, null, segmentProperties);
+      dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+    }
   }
 
   /**
@@ -1474,4 +1578,61 @@ public final class FilterUtil {
       }
     }
   }
+
+  /**
+   * create Comparator for Measure Datatype
+   *
+   * @param dataType
+   * @return
+   */
+  public static Comparator getComparatorByDataTypeForMeasure(DataType dataType) {
+    switch (dataType) {
+      case INT:
+      case SHORT:
+      case LONG:
+        return new LongComparator();
+      case DOUBLE:
+        return new DoubleComparator();
+      case DECIMAL:
+        return new BigDecimalComparator();
+      default:
+        throw new IllegalArgumentException("Unsupported data type");
+    }
+  }
+
+
+  static class DoubleComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      double key1Double1 = (double)key1;
+      double key1Double2 = (double)key2;
+      if (key1Double1 < key1Double2) {
+        return -1;
+      } else if (key1Double1 > key1Double2) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+  }
+
+  static class LongComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      long longKey1 = (long) key1;
+      long longKey2 = (long) key2;
+      if (longKey1 < longKey2) {
+        return -1;
+      } else if (longKey1 > longKey2) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+  }
+
+  static class BigDecimalComparator implements Comparator<Object> {
+    @Override public int compare(Object key1, Object key2) {
+      return ((BigDecimal) key1).compareTo((BigDecimal) key2);
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
index fb3a582..b56c8a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
@@ -49,7 +49,7 @@ public class ExcludeColGroupFilterExecuterImpl extends ExcludeFilterExecuterImpl
    */
   public ExcludeColGroupFilterExecuterImpl(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
       SegmentProperties segmentProperties) {
-    super(dimColResolvedFilterInfo, segmentProperties);
+    super(dimColResolvedFilterInfo, null, segmentProperties, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 23209ed..a716a8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -18,56 +18,147 @@ package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class ExcludeFilterExecuterImpl implements FilterExecuter {
 
   protected DimColumnResolvedFilterInfo dimColEvaluatorInfo;
   protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo;
+  protected MeasureColumnExecuterFilterInfo msrColumnExecutorInfo;
   protected SegmentProperties segmentProperties;
   /**
    * is dimension column data is natural sorted
    */
   private boolean isNaturalSorted;
+
   public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
-      SegmentProperties segmentProperties) {
-    this.dimColEvaluatorInfo = dimColEvaluatorInfo;
-    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+      MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
+      boolean isMeasure) {
     this.segmentProperties = segmentProperties;
-    FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), segmentProperties,
-        dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
-    isNaturalSorted = dimColEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColEvaluatorInfo
-        .getDimension().isSortColumn();
+    if (!isMeasure) {
+      this.dimColEvaluatorInfo = dimColEvaluatorInfo;
+      dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+
+      FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), segmentProperties,
+          dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo, null, null);
+      isNaturalSorted =
+          dimColEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColEvaluatorInfo
+              .getDimension().isSortColumn();
+    } else {
+      this.msrColumnEvaluatorInfo = msrColumnEvaluatorInfo;
+      msrColumnExecutorInfo = new MeasureColumnExecuterFilterInfo();
+      FilterUtil
+          .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
+              null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
+    }
+
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    if (null != dimColumnExecuterInfo) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      DimensionRawColumnChunk dimensionRawColumnChunk =
+          blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+      DimensionColumnDataChunk[] dimensionColumnDataChunks =
+          dimensionRawColumnChunk.convertToDimColDataChunks();
+      BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+      for (int i = 0; i < dimensionColumnDataChunks.length; i++) {
+        BitSet bitSet = getFilteredIndexes(dimensionColumnDataChunks[i],
+            dimensionRawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+
+      return bitSetGroup;
+    } else if (null != msrColumnExecutorInfo) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      MeasureRawColumnChunk measureRawColumnChunk =
+          blockChunkHolder.getMeasureRawDataChunk()[blockIndex];
+      MeasureColumnDataChunk[] measureColumnDataChunks =
+          measureRawColumnChunk.convertToMeasureColDataChunks();
+      BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
+      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      for (int i = 0; i < measureColumnDataChunks.length; i++) {
+        BitSet bitSet =
+            getFilteredIndexes(measureColumnDataChunks[i], measureRawColumnChunk.getRowCount()[i],
+                msrType);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+      return bitSetGroup;
     }
-    DimensionRawColumnChunk dimensionRawColumnChunk =
-        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
-    DimensionColumnDataChunk[] dimensionColumnDataChunks =
-        dimensionRawColumnChunk.convertToDimColDataChunks();
-    BitSetGroup bitSetGroup =
-        new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
-    for (int i = 0; i < dimensionColumnDataChunks.length; i++) {
-      BitSet bitSet = getFilteredIndexes(dimensionColumnDataChunks[i],
-          dimensionRawColumnChunk.getRowCount()[i]);
-      bitSetGroup.setBitSet(bitSet, i);
+    return null;
+  }
+
+  private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
+    switch (msrColumnEvaluatorInfo.getType()) {
+      case SHORT:
+        return DataType.SHORT;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      default:
+        return DataType.DOUBLE;
     }
+  }
 
-    return bitSetGroup;
+  protected BitSet getFilteredIndexes(MeasureColumnDataChunk measureColumnDataChunk,
+      int numerOfRows, DataType msrType) {
+    // Here the algorithm is
+    // Get the measure values from the chunk. compare sequentially with the
+    // the filter values. The one that matches sets it Bitset.
+    BitSet bitSet = new BitSet(numerOfRows);
+    bitSet.flip(0, numerOfRows);
+    byte[][] filterValues = msrColumnExecutorInfo.getFilterKeys();
+    Comparator comparator = FilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    for (int i = 0; i < filterValues.length; i++) {
+      if (filterValues[i].length == 0) {
+        BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+          bitSet.flip(j);
+        }
+        continue;
+      }
+      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
+      for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
+        // Check if filterValue[i] matches with measure Values.
+        Object msrValue = DataTypeUtil
+            .getMeasureObjectBasedOnDataType(measureColumnDataChunk, startIndex,
+                 msrColumnEvaluatorInfo.getMeasure());
+
+        if (comparator.compare(msrValue, filter) == 0) {
+          // This is a match.
+          bitSet.flip(startIndex);
+        }
+      }
+    }
+    return bitSet;
   }
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimColumnDataChunk,
@@ -150,11 +241,20 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   }
 
   @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    if (null != dimColumnExecuterInfo) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+    } else if (null != msrColumnExecutorInfo) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
index c64f498..45831e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -53,7 +53,7 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl
    */
   public IncludeColGroupFilterExecuterImpl(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
       SegmentProperties segmentProperties) {
-    super(dimColResolvedFilterInfo, segmentProperties);
+    super(dimColResolvedFilterInfo, null, segmentProperties, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 8704496..394e561 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -17,22 +17,32 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class IncludeFilterExecuterImpl implements FilterExecuter {
 
   protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
   protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo;
+  protected MeasureColumnExecuterFilterInfo msrColumnExecutorInfo;
   protected SegmentProperties segmentProperties;
   /**
    * is dimension column data is natural sorted
@@ -40,42 +50,136 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
   private boolean isNaturalSorted;
 
   public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
-      SegmentProperties segmentProperties) {
-    this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+      MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
+      boolean isMeasure) {
+
     this.segmentProperties = segmentProperties;
-    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
-    FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
-        segmentProperties, dimColumnEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
-    isNaturalSorted =
-        dimColumnEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColumnEvaluatorInfo
-            .getDimension().isSortColumn();
+    if (isMeasure == false) {
+      this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+      dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+      FilterUtil
+          .prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(), segmentProperties,
+              dimColumnEvaluatorInfo.getDimension(), dimColumnExecuterInfo, null, null);
+      isNaturalSorted =
+          dimColumnEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColumnEvaluatorInfo
+              .getDimension().isSortColumn();
+
+    } else {
+      this.msrColumnEvaluatorInfo = msrColumnEvaluatorInfo;
+      msrColumnExecutorInfo = new MeasureColumnExecuterFilterInfo();
+      FilterUtil
+          .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
+              null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
+
+    }
+
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColumnEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
-    }
-    DimensionRawColumnChunk dimensionRawColumnChunk =
-        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
-    BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
-    for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
-      if (dimensionRawColumnChunk.getMaxValues() != null) {
-        if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
-            dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys())) {
+    if (null != dimColumnExecuterInfo) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      DimensionRawColumnChunk dimensionRawColumnChunk =
+          blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+      BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+      for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+        if (dimensionRawColumnChunk.getMaxValues() != null) {
+          if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
+              dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys())) {
+            BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+                dimensionRawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        } else {
           BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
               dimensionRawColumnChunk.getRowCount()[i]);
           bitSetGroup.setBitSet(bitSet, i);
         }
-      } else {
-        BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
-            dimensionRawColumnChunk.getRowCount()[i]);
-        bitSetGroup.setBitSet(bitSet, i);
+      }
+      return bitSetGroup;
+    } else if (null != msrColumnExecutorInfo) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      MeasureRawColumnChunk measureRawColumnChunk =
+          blockChunkHolder.getMeasureRawDataChunk()[blockIndex];
+      BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
+      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      for (int i = 0; i < measureRawColumnChunk.getPagesCount(); i++) {
+        if (measureRawColumnChunk.getMaxValues() != null) {
+          if (isScanRequired(measureRawColumnChunk.getMaxValues()[i],
+              measureRawColumnChunk.getMinValues()[i], msrColumnExecutorInfo.getFilterKeys(),
+              msrColumnEvaluatorInfo.getType())) {
+            BitSet bitSet =
+                getFilteredIndexesForMeasures(measureRawColumnChunk.convertToMeasureColDataChunk(i),
+                    measureRawColumnChunk.getRowCount()[i], msrType);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        } else {
+          BitSet bitSet =
+              getFilteredIndexesForMeasures(measureRawColumnChunk.convertToMeasureColDataChunk(i),
+                  measureRawColumnChunk.getRowCount()[i], msrType);
+          bitSetGroup.setBitSet(bitSet, i);
+        }
+      }
+      return bitSetGroup;
+    }
+    return null;
+  }
+
+  private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
+    switch (msrColumnEvaluatorInfo.getType()) {
+      case SHORT:
+        return DataType.SHORT;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      default:
+        return DataType.DOUBLE;
+    }
+  }
+
+  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
+      int rowsInPage, DataType msrType) {
+    // Here the algorithm is
+    // Get the measure values from the chunk. compare sequentially with the
+    // the filter values. The one that matches sets it Bitset.
+    BitSet bitSet = new BitSet(rowsInPage);
+    byte[][] filterValues = msrColumnExecutorInfo.getFilterKeys();
+
+    Comparator comparator = FilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    for (int i = 0; i < filterValues.length; i++) {
+      if (filterValues[i].length == 0) {
+        BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+          bitSet.set(j);
+        }
+        continue;
+      }
+      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
+      for (int startIndex = 0; startIndex < rowsInPage; startIndex++) {
+        // Check if filterValue[i] matches with measure Values.
+        Object msrValue = DataTypeUtil
+            .getMeasureObjectBasedOnDataType(measureColumnDataChunk, startIndex,
+                 msrColumnEvaluatorInfo.getMeasure());
+
+        if (comparator.compare(msrValue, filter) == 0) {
+          // This is a match.
+          bitSet.set(startIndex);
+        }
       }
     }
-    return bitSetGroup;
+    return bitSet;
   }
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -152,12 +256,28 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
 
   public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
     BitSet bitSet = new BitSet(1);
-    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
-    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+    byte[][] filterValues = null;
+    int columnIndex = 0;
+    int blockIndex = 0;
+    boolean isScanRequired = false;
+
+    if (null != dimColumnExecuterInfo) {
+      filterValues = dimColumnExecuterInfo.getFilterKeys();
+      columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+      blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+      isScanRequired =
+          isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
+
+    } else if (null != msrColumnExecutorInfo) {
+      filterValues = msrColumnExecutorInfo.getFilterKeys();
+      columnIndex = msrColumnEvaluatorInfo.getColumnIndex();
+      blockIndex =
+          segmentProperties.getMeasuresOrdinalToBlockMapping().get(columnIndex) + segmentProperties
+              .getLastDimensionColOrdinal();
+      isScanRequired = isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues,
+          msrColumnEvaluatorInfo.getType());
+    }
 
-    boolean isScanRequired =
-        isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
     if (isScanRequired) {
       bitSet.set(0);
     }
@@ -186,12 +306,61 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] maxValue, byte[] minValue, byte[][] filterValue,
+      DataType dataType) {
+    for (int i = 0; i < filterValue.length; i++) {
+      if (filterValue[i].length == 0 || maxValue.length == 0 || minValue.length == 0) {
+        return isScanRequired(maxValue, minValue, filterValue);
+      } else {
+        switch (dataType) {
+          case DOUBLE:
+            double maxValueDouble = ByteBuffer.wrap(maxValue).getDouble();
+            double minValueDouble = ByteBuffer.wrap(minValue).getDouble();
+            double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
+            if (filterValueDouble <= maxValueDouble && filterValueDouble >= minValueDouble) {
+              return true;
+            }
+            break;
+          case INT:
+          case SHORT:
+          case LONG:
+            long maxValueLong = ByteBuffer.wrap(maxValue).getLong();
+            long minValueLong = ByteBuffer.wrap(minValue).getLong();
+            long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
+            if (filterValueLong <= maxValueLong && filterValueLong >= minValueLong) {
+              return true;
+            }
+            break;
+          case DECIMAL:
+            BigDecimal maxDecimal = DataTypeUtil.byteToBigDecimal(maxValue);
+            BigDecimal minDecimal = DataTypeUtil.byteToBigDecimal(minValue);
+            BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
+            if (filterDecimal.compareTo(maxDecimal) <= 0
+                && filterDecimal.compareTo(minDecimal) >= 0) {
+              return true;
+            }
+        }
+      }
+    }
+    return false;
+  }
+
+
   @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColumnEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    if (null != dimColumnExecuterInfo) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+    } else if (null != msrColumnExecutorInfo) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
new file mode 100644
index 0000000..cc7e837
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+public class MeasureColumnExecuterFilterInfo {
+
+  byte[][] filterKeys;
+
+  public void setFilterKeys(byte[][] filterKeys) {
+    this.filterKeys = filterKeys;
+  }
+
+  public byte[][] getFilterKeys() {
+    return filterKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
index 65184fb..8f3d2b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
@@ -23,8 +23,10 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.scan.filter.DimColumnFilterInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.filter.ColumnFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -42,7 +44,7 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
   protected boolean isDimensionDefaultValuePresentInFilterValues(
       DimColumnResolvedFilterInfo dimColumnEvaluatorInfo) {
     boolean isDefaultValuePresentInFilterValues = false;
-    DimColumnFilterInfo filterValues = dimColumnEvaluatorInfo.getFilterValues();
+    ColumnFilterInfo filterValues = dimColumnEvaluatorInfo.getFilterValues();
     CarbonDimension dimension = dimColumnEvaluatorInfo.getDimension();
     byte[] defaultValue = dimension.getDefaultValue();
     if (!dimension.hasEncoding(Encoding.DICTIONARY)) {
@@ -78,4 +80,32 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
     }
     return isDefaultValuePresentInFilterValues;
   }
+
+  /**
+   * This method will check whether a default value for the non-existing column is present
+   * in the filter values list
+   *
+   * @param measureColumnResolvedFilterInfo
+   * @return
+   */
+  protected boolean isMeasureDefaultValuePresentInFilterValues(
+      MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo) {
+    boolean isDefaultValuePresentInFilterValues = false;
+    ColumnFilterInfo filterValues = measureColumnResolvedFilterInfo.getFilterValues();
+    CarbonMeasure measure = measureColumnResolvedFilterInfo.getMeasure();
+    byte[] defaultValue = measure.getDefaultValue();
+    if (null == defaultValue) {
+      // default value for case where user gives is Null condition
+      defaultValue = new byte[0];
+    }
+    List<byte[]> measureFilterValuesList = filterValues.getMeasuresFilterValuesList();
+    for (byte[] filterValue : measureFilterValuesList) {
+      int compare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(defaultValue, filterValue);
+      if (compare == 0) {
+        isDefaultValuePresentInFilterValues = true;
+        break;
+      }
+    }
+    return isDefaultValuePresentInFilterValues;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/17db292a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
index 2954c40..8e06894 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
@@ -22,13 +22,16 @@ import java.util.BitSet;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
 public class RestructureExcludeFilterExecutorImpl extends RestructureEvaluatorImpl {
 
   protected DimColumnResolvedFilterInfo dimColEvaluatorInfo;
+  protected MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo;
   protected SegmentProperties segmentProperties;
+  protected boolean isMeasure;
 
   /**
    * flag to check whether filter values contain the default value applied on the dimension column
@@ -37,11 +40,19 @@ public class RestructureExcludeFilterExecutorImpl extends RestructureEvaluatorIm
   protected boolean isDefaultValuePresentInFilterValues;
 
   public RestructureExcludeFilterExecutorImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
-      SegmentProperties segmentProperties) {
+      MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo,
+      SegmentProperties segmentProperties, boolean isMeasure) {
     this.dimColEvaluatorInfo = dimColEvaluatorInfo;
+    this.measureColumnResolvedFilterInfo = measureColumnResolvedFilterInfo;
     this.segmentProperties = segmentProperties;
-    isDefaultValuePresentInFilterValues =
-        isDimensionDefaultValuePresentInFilterValues(dimColEvaluatorInfo);
+    this.isMeasure = isMeasure;
+    if (isMeasure) {
+      isDefaultValuePresentInFilterValues =
+          isMeasureDefaultValuePresentInFilterValues(measureColumnResolvedFilterInfo);
+    } else {
+      isDefaultValuePresentInFilterValues =
+          isDimensionDefaultValuePresentInFilterValues(dimColEvaluatorInfo);
+    }
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {