You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/07/31 11:49:17 UTC

[3/6] carbondata git commit: Measure Filter implementation

Measure Filter implementation

Measure Implementation for Include and Exclude Filter

RowLevel Measure Implementation

RowLevel Less LessThan Greater GreaterThan Implementation for measure

Rectify Datatype Conversion Measure

Rectify 1

Restructure Changes for Measure

Test Case Fixes


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/266c473b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/266c473b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/266c473b

Branch: refs/heads/master
Commit: 266c473bab10476addf09b64f0db22d48ee6924a
Parents: dd809ed
Author: sounakr <so...@gmail.com>
Authored: Tue Jun 20 22:52:36 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Sun Jul 30 11:59:47 2017 +0530

----------------------------------------------------------------------
 .../core/datastore/block/SegmentProperties.java |   7 +
 .../core/metadata/schema/table/CarbonTable.java |   3 +
 .../core/scan/expression/ColumnExpression.java  |  22 ++
 .../conditional/ConditionalExpression.java      |   2 +-
 .../logical/BinaryLogicalExpression.java        |   4 +-
 .../core/scan/filter/ColumnFilterInfo.java      |  88 +++++++
 .../core/scan/filter/DimColumnFilterInfo.java   |  78 ------
 .../scan/filter/FilterExpressionProcessor.java  |  70 +++++-
 .../carbondata/core/scan/filter/FilterUtil.java | 227 ++++++++++++-----
 .../ExcludeColGroupFilterExecuterImpl.java      |   2 +-
 .../executer/ExcludeFilterExecuterImpl.java     | 163 +++++++++---
 .../IncludeColGroupFilterExecuterImpl.java      |   2 +-
 .../executer/IncludeFilterExecuterImpl.java     | 250 ++++++++++++++++---
 .../MeasureColumnExecuterFilterInfo.java        |  30 +++
 .../executer/RestructureEvaluatorImpl.java      |  34 ++-
 .../RestructureExcludeFilterExecutorImpl.java   |  17 +-
 .../RestructureIncludeFilterExecutorImpl.java   |  17 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  24 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  | 238 ++++++++++++++----
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java | 240 ++++++++++++++----
 ...velRangeLessThanEqualFilterExecuterImpl.java | 206 ++++++++++++---
 .../RowLevelRangeLessThanFiterExecuterImpl.java | 208 ++++++++++++---
 .../filter/partition/PartitionFilterUtil.java   |   2 +
 .../resolver/ConditionalFilterResolverImpl.java | 113 ++++++---
 .../filter/resolver/FilterResolverIntf.java     |   9 +
 .../resolver/LogicalFilterResolverImpl.java     |   4 +
 .../resolver/RowLevelFilterResolverImpl.java    |   3 +-
 .../RowLevelRangeFilterResolverImpl.java        |  96 +++++--
 .../resolverinfo/ColumnResolvedFilterInfo.java  |  22 ++
 .../DimColumnResolvedFilterInfo.java            |  22 +-
 .../MeasureColumnResolvedFilterInfo.java        |  98 +++++++-
 .../TrueConditionalResolverImpl.java            |   2 +-
 .../visitor/CustomTypeDictionaryVisitor.java    |  17 +-
 .../visitor/DictionaryColumnVisitor.java        |  11 +-
 .../visitor/FilterInfoTypeVisitorFactory.java   |  16 +-
 .../visitor/MeasureColumnVisitor.java           |  77 ++++++
 .../visitor/NoDictionaryTypeVisitor.java        |  10 +-
 .../visitor/RangeDictionaryColumnVisitor.java   |  10 +-
 .../visitor/RangeDirectDictionaryVisitor.java   |  10 +-
 .../visitor/RangeNoDictionaryTypeVisitor.java   |  10 +-
 .../visitor/ResolvedFilterInfoVisitorIntf.java  |   5 +-
 .../carbondata/core/scan/model/QueryModel.java  |  18 +-
 .../apache/carbondata/core/util/ByteUtil.java   |   1 -
 .../apache/carbondata/core/util/CarbonUtil.java |   8 +
 .../carbondata/core/util/DataTypeUtil.java      | 139 +++++++++++
 .../core/scan/filter/FilterUtilTest.java        |   8 +-
 .../ExpressionWithNullTestCase.scala            |  10 +-
 .../spark/sql/SparkUnknownExpression.scala      |   2 +-
 .../spark/sql/SparkUnknownExpression.scala      |   2 +-
 49 files changed, 2135 insertions(+), 522 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index cd12c3b..02bd7bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -163,6 +163,8 @@ public class SegmentProperties {
 
   private int numberOfNoDictSortColumns = 0;
 
+  private int lastDimensionColOrdinal;
+
   public SegmentProperties(List<ColumnSchema> columnsInTable, int[] columnCardinality) {
     dimensions = new ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     complexDimensions =
@@ -409,6 +411,7 @@ public class SegmentProperties {
       }
       counter++;
     }
+    lastDimensionColOrdinal = dimensonOrdinal;
     dimColumnsCardinality = new int[cardinalityIndexForNormalDimensionColumn.size()];
     complexDimColumnCardinality = new int[cardinalityIndexForComplexDimensionColumn.size()];
     int index = 0;
@@ -837,4 +840,8 @@ public class SegmentProperties {
   public int getNumberOfDictSortColumns() {
     return this.numberOfSortColumns - this.numberOfNoDictSortColumns;
   }
+
+  public int getLastDimensionColOrdinal() {
+    return lastDimensionColOrdinal;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 01b3022..07d856c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -124,6 +124,8 @@ public class CarbonTable implements Serializable {
    */
   private int numberOfNoDictSortColumns;
 
+  private int lastDimensionColumnOrdinal;
+
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -256,6 +258,7 @@ public class CarbonTable implements Serializable {
                  columnSchema.getSchemaOrdinal()));
       }
     }
+    lastDimensionColumnOrdinal = dimensionOrdinal;
     fillVisibleDimensions(tableSchema.getTableName());
     fillVisibleMeasures(tableSchema.getTableName());
     addImplicitDimension(dimensionOrdinal, implicitDimensions);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
index d27d789..981efb5 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.scan.expression;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 
@@ -31,12 +32,16 @@ public class ColumnExpression extends LeafExpression {
 
   private boolean isDimension;
 
+  private boolean isMeasure;
+
   private int colIndex = -1;
 
   private DataType dataType;
 
   private CarbonDimension dimension;
 
+  private CarbonMeasure measure;
+
   private CarbonColumn carbonColumn;
 
   public ColumnExpression(String columnName, DataType dataType) {
@@ -53,6 +58,14 @@ public class ColumnExpression extends LeafExpression {
     this.dimension = dimension;
   }
 
+  public CarbonMeasure getMeasure() {
+    return measure;
+  }
+
+  public void setMeasure(CarbonMeasure measure) {
+    this.measure = measure;
+  }
+
   public String getColumnName() {
     return columnName;
   }
@@ -69,6 +82,14 @@ public class ColumnExpression extends LeafExpression {
     this.isDimension = isDimension;
   }
 
+  public boolean isMeasure() {
+    return isMeasure;
+  }
+
+  public void setMeasure(boolean isMeasure) {
+    this.isMeasure = isMeasure;
+  }
+
   public int getColIndex() {
     return colIndex;
   }
@@ -110,4 +131,5 @@ public class ColumnExpression extends LeafExpression {
   @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) {
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
index 265fa32..d7b940c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ConditionalExpression.java
@@ -28,7 +28,7 @@ public interface ConditionalExpression {
   // traversing the tree
   List<ColumnExpression> getColumnList();
 
-  boolean isSingleDimension();
+  boolean isSingleColumn();
 
   List<ExpressionResult> getLiterals();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
index 1228793..1a5b6a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/BinaryLogicalExpression.java
@@ -80,11 +80,11 @@ public abstract class BinaryLogicalExpression extends BinaryExpression {
     }
   }
 
-  public boolean isSingleDimension() {
+  public boolean isSingleColumn() {
     List<ColumnExpression> listOfExp =
         new ArrayList<ColumnExpression>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     getColumnList(this, listOfExp);
-    if (listOfExp.size() == 1 && listOfExp.get(0).isDimension()) {
+    if (listOfExp.size() == 1 && (listOfExp.get(0).isDimension() || listOfExp.get(0).isMeasure())) {
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
new file mode 100644
index 0000000..008d908
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.filter;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class ColumnFilterInfo implements Serializable {
+
+  private static final long serialVersionUID = 8181578747306832771L;
+
+  private boolean isIncludeFilter;
+
+  private List<Integer> filterList;
+
+  /**
+   * Implicit column filter values to be used for block and blocklet pruning
+   */
+  private List<String> implicitColumnFilterList;
+  private List<Integer> excludeFilterList;
+  /**
+   * maintain the no dictionary filter values list.
+   */
+  private List<byte[]> noDictionaryFilterValuesList;
+
+  private List<byte[]> measuresFilterValuesList;
+
+  public List<byte[]> getNoDictionaryFilterValuesList() {
+    return noDictionaryFilterValuesList;
+  }
+
+  public boolean isIncludeFilter() {
+    return isIncludeFilter;
+  }
+
+  public void setIncludeFilter(boolean isIncludeFilter) {
+    this.isIncludeFilter = isIncludeFilter;
+  }
+
+  public List<Integer> getFilterList() {
+    return filterList;
+  }
+
+  public void setFilterList(List<Integer> filterList) {
+    this.filterList = filterList;
+  }
+
+  public void setFilterListForNoDictionaryCols(List<byte[]> noDictionaryFilterValuesList) {
+    this.noDictionaryFilterValuesList = noDictionaryFilterValuesList;
+  }
+
+  public List<Integer> getExcludeFilterList() {
+    return excludeFilterList;
+  }
+  public void setExcludeFilterList(List<Integer> excludeFilterList) {
+    this.excludeFilterList = excludeFilterList;
+  }
+  public List<String> getImplicitColumnFilterList() {
+    return implicitColumnFilterList;
+  }
+
+  public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) {
+    this.implicitColumnFilterList = implicitColumnFilterList;
+  }
+
+  public List<byte[]> getMeasuresFilterValuesList() {
+    return measuresFilterValuesList;
+  }
+
+  public void setMeasuresFilterValuesList(List<byte[]> measuresFilterValuesList) {
+    this.measuresFilterValuesList = measuresFilterValuesList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java
deleted file mode 100644
index 16c6965..0000000
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/DimColumnFilterInfo.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.scan.filter;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class DimColumnFilterInfo implements Serializable {
-
-  private static final long serialVersionUID = 8181578747306832771L;
-
-  private boolean isIncludeFilter;
-
-  private List<Integer> filterList;
-
-  /**
-   * Implicit column filter values to be used for block and blocklet pruning
-   */
-  private List<String> implicitColumnFilterList;
-  private List<Integer> excludeFilterList;
-  /**
-   * maintain the no dictionary filter values list.
-   */
-  private List<byte[]> noDictionaryFilterValuesList;
-
-  public List<byte[]> getNoDictionaryFilterValuesList() {
-    return noDictionaryFilterValuesList;
-  }
-
-  public boolean isIncludeFilter() {
-    return isIncludeFilter;
-  }
-
-  public void setIncludeFilter(boolean isIncludeFilter) {
-    this.isIncludeFilter = isIncludeFilter;
-  }
-
-  public List<Integer> getFilterList() {
-    return filterList;
-  }
-
-  public void setFilterList(List<Integer> filterList) {
-    this.filterList = filterList;
-  }
-
-  public void setFilterListForNoDictionaryCols(List<byte[]> noDictionaryFilterValuesList) {
-    this.noDictionaryFilterValuesList = noDictionaryFilterValuesList;
-  }
-
-  public List<Integer> getExcludeFilterList() {
-    return excludeFilterList;
-  }
-  public void setExcludeFilterList(List<Integer> excludeFilterList) {
-    this.excludeFilterList = excludeFilterList;
-  }
-  public List<String> getImplicitColumnFilterList() {
-    return implicitColumnFilterList;
-  }
-
-  public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) {
-    this.implicitColumnFilterList = implicitColumnFilterList;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 02052aa..104ef56 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -415,11 +415,34 @@ public class FilterExpressionProcessor implements FilterProcessor {
         return new TrueConditionalResolverImpl(expression, false, false, tableIdentifier);
       case EQUALS:
         currentCondExpression = (BinaryConditionalExpression) expression;
-        if (currentCondExpression.isSingleDimension()
+        if (currentCondExpression.isSingleColumn()
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.ARRAY
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.STRUCT) {
+
+          if (currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure()) {
+            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
+                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
+                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
+                    || FilterUtil
+                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
+              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, true,
+                  tableIdentifier);
+            }
+            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
+                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.GREATERTHAN_EQUALTO
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.LESSTHAN_EQUALTO) {
+              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, true,
+                  tableIdentifier);
+            }
+            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
+                tableIdentifier,
+                currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
+          }
           // getting new dim index.
           if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
               .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
@@ -443,20 +466,44 @@ public class FilterExpressionProcessor implements FilterProcessor {
             }
           }
           return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
-              tableIdentifier);
+              tableIdentifier,
+              currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
 
         }
         break;
       case RANGE:
         return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
-            tableIdentifier);
+            tableIdentifier, false);
       case NOT_EQUALS:
         currentCondExpression = (BinaryConditionalExpression) expression;
-        if (currentCondExpression.isSingleDimension()
+        if (currentCondExpression.isSingleColumn()
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.ARRAY
             && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
             != DataType.STRUCT) {
+
+          if (currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure()) {
+            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
+                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
+                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
+                    || FilterUtil
+                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
+              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, false,
+                  tableIdentifier);
+            }
+            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
+                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.GREATERTHAN_EQUALTO
+                || currentCondExpression.getFilterExpressionType()
+                == ExpressionType.LESSTHAN_EQUALTO) {
+              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, false,
+                  tableIdentifier);
+            }
+            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
+                tableIdentifier, true);
+          }
+
           if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
               .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
               .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
@@ -478,31 +525,32 @@ public class FilterExpressionProcessor implements FilterProcessor {
             }
 
             return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
-                tableIdentifier);
+                tableIdentifier, false);
           }
           return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false,
-              tableIdentifier);
+              tableIdentifier, false);
         }
         break;
 
       default:
         if (expression instanceof ConditionalExpression) {
           condExpression = (ConditionalExpression) expression;
-          if (condExpression.isSingleDimension()
+          if (condExpression.isSingleColumn()
               && condExpression.getColumnList().get(0).getCarbonColumn().getDataType()
               != DataType.ARRAY
               && condExpression.getColumnList().get(0).getCarbonColumn().getDataType()
               != DataType.STRUCT) {
             condExpression = (ConditionalExpression) expression;
-            if (condExpression.getColumnList().get(0).getCarbonColumn()
+            if ((condExpression.getColumnList().get(0).getCarbonColumn()
                 .hasEncoding(Encoding.DICTIONARY) && !condExpression.getColumnList().get(0)
-                .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-              return new ConditionalFilterResolverImpl(expression, true, true, tableIdentifier);
+                .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY))
+                || (currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure())) {
+              return new ConditionalFilterResolverImpl(expression, true, true, tableIdentifier,
+                  currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
             }
           }
         }
     }
     return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index b06789e..6d531ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -56,6 +56,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
@@ -70,6 +71,7 @@ import org.apache.carbondata.core.scan.filter.executer.ExcludeFilterExecuterImpl
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.executer.IncludeColGroupFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.IncludeFilterExecuterImpl;
+import org.apache.carbondata.core.scan.filter.executer.MeasureColumnExecuterFilterInfo;
 import org.apache.carbondata.core.scan.filter.executer.OrFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterImpl;
 import org.apache.carbondata.core.scan.filter.executer.RestructureExcludeFilterExecutorImpl;
@@ -85,6 +87,7 @@ import org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolver
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -117,10 +120,12 @@ public final class FilterUtil {
       switch (filterExecuterType) {
         case INCLUDE:
           return getIncludeFilterExecuter(
-              filterExpressionResolverTree.getDimColResolvedFilterInfo(), segmentProperties);
+              filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+              filterExpressionResolverTree.getMsrColResolvedFilterInfo(), segmentProperties);
         case EXCLUDE:
           return getExcludeFilterExecuter(
-              filterExpressionResolverTree.getDimColResolvedFilterInfo(), segmentProperties);
+              filterExpressionResolverTree.getDimColResolvedFilterInfo(),
+              filterExpressionResolverTree.getMsrColResolvedFilterInfo(), segmentProperties);
         case OR:
           return new OrFilterExecuterImpl(
               createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties,
@@ -180,9 +185,27 @@ public final class FilterUtil {
    * @return
    */
   private static FilterExecuter getIncludeFilterExecuter(
-      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties) {
-
-    if (dimColResolvedFilterInfo.getDimension().isColumnar()) {
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      MeasureColumnResolvedFilterInfo msrColResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
+    if (null != msrColResolvedFilterInfo && msrColResolvedFilterInfo.getMeasure().isColumnar()) {
+      CarbonMeasure measuresFromCurrentBlock = segmentProperties
+          .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+      if (null != measuresFromCurrentBlock) {
+        // update dimension and column index according to the dimension position in current block
+        MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
+            msrColResolvedFilterInfo.getCopyObject();
+        msrColResolvedFilterInfoCopyObject.setMeasure(measuresFromCurrentBlock);
+        msrColResolvedFilterInfoCopyObject.setColumnIndex(measuresFromCurrentBlock.getOrdinal());
+        msrColResolvedFilterInfoCopyObject.setType(measuresFromCurrentBlock.getDataType());
+        return new IncludeFilterExecuterImpl(null, msrColResolvedFilterInfoCopyObject,
+            segmentProperties, true);
+      } else {
+        return new RestructureIncludeFilterExecutorImpl(dimColResolvedFilterInfo,
+            msrColResolvedFilterInfo, segmentProperties, true);
+      }
+    }
+    if (null != dimColResolvedFilterInfo && dimColResolvedFilterInfo.getDimension().isColumnar()) {
       CarbonDimension dimensionFromCurrentBlock =
           segmentProperties.getDimensionFromCurrentBlock(dimColResolvedFilterInfo.getDimension());
       if (null != dimensionFromCurrentBlock) {
@@ -191,10 +214,11 @@ public final class FilterUtil {
             dimColResolvedFilterInfo.getCopyObject();
         dimColResolvedFilterInfoCopyObject.setDimension(dimensionFromCurrentBlock);
         dimColResolvedFilterInfoCopyObject.setColumnIndex(dimensionFromCurrentBlock.getOrdinal());
-        return new IncludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, segmentProperties);
+        return new IncludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, null,
+            segmentProperties, false);
       } else {
         return new RestructureIncludeFilterExecutorImpl(dimColResolvedFilterInfo,
-            segmentProperties);
+            msrColResolvedFilterInfo, segmentProperties, false);
       }
     } else {
       return new IncludeColGroupFilterExecuterImpl(dimColResolvedFilterInfo, segmentProperties);
@@ -209,9 +233,29 @@ public final class FilterUtil {
    * @return
    */
   private static FilterExecuter getExcludeFilterExecuter(
-      DimColumnResolvedFilterInfo dimColResolvedFilterInfo, SegmentProperties segmentProperties) {
+      DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
+      MeasureColumnResolvedFilterInfo msrColResolvedFilterInfo,
+      SegmentProperties segmentProperties) {
 
-    if (dimColResolvedFilterInfo.getDimension().isColumnar()) {
+    if (null != msrColResolvedFilterInfo && msrColResolvedFilterInfo.getMeasure().isColumnar()) {
+      CarbonMeasure measuresFromCurrentBlock = segmentProperties
+          .getMeasureFromCurrentBlock(msrColResolvedFilterInfo.getMeasure().getColumnId());
+      if (null != measuresFromCurrentBlock) {
+        // update dimension and column index according to the dimension position in current block
+        MeasureColumnResolvedFilterInfo msrColResolvedFilterInfoCopyObject =
+            msrColResolvedFilterInfo.getCopyObject();
+        msrColResolvedFilterInfoCopyObject.setMeasure(measuresFromCurrentBlock);
+        msrColResolvedFilterInfoCopyObject.setColumnIndex(measuresFromCurrentBlock.getOrdinal());
+        msrColResolvedFilterInfoCopyObject.setType(measuresFromCurrentBlock.getDataType());
+        return new ExcludeFilterExecuterImpl(null, msrColResolvedFilterInfoCopyObject,
+            segmentProperties, true);
+      } else {
+        return new RestructureExcludeFilterExecutorImpl(dimColResolvedFilterInfo,
+            msrColResolvedFilterInfo, segmentProperties, true);
+      }
+    }
+    if ((null != dimColResolvedFilterInfo) && (dimColResolvedFilterInfo.getDimension()
+        .isColumnar())) {
       CarbonDimension dimensionFromCurrentBlock =
           segmentProperties.getDimensionFromCurrentBlock(dimColResolvedFilterInfo.getDimension());
       if (null != dimensionFromCurrentBlock) {
@@ -220,10 +264,11 @@ public final class FilterUtil {
             dimColResolvedFilterInfo.getCopyObject();
         dimColResolvedFilterInfoCopyObject.setDimension(dimensionFromCurrentBlock);
         dimColResolvedFilterInfoCopyObject.setColumnIndex(dimensionFromCurrentBlock.getOrdinal());
-        return new ExcludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, segmentProperties);
+        return new ExcludeFilterExecuterImpl(dimColResolvedFilterInfoCopyObject, null,
+            segmentProperties, false);
       } else {
         return new RestructureExcludeFilterExecutorImpl(dimColResolvedFilterInfo,
-            segmentProperties);
+            msrColResolvedFilterInfo, segmentProperties, false);
       }
     } else {
       return new ExcludeColGroupFilterExecuterImpl(dimColResolvedFilterInfo, segmentProperties);
@@ -349,13 +394,13 @@ public final class FilterUtil {
 
   /**
    * This method will get the no dictionary data based on filters and same
-   * will be in DimColumnFilterInfo
+   * will be in ColumnFilterInfo
    *
    * @param evaluateResultListFinal
    * @param isIncludeFilter
-   * @return DimColumnFilterInfo
+   * @return ColumnFilterInfo
    */
-  public static DimColumnFilterInfo getNoDictionaryValKeyMemberForFilter(
+  public static ColumnFilterInfo getNoDictionaryValKeyMemberForFilter(
       List<String> evaluateResultListFinal, boolean isIncludeFilter, DataType dataType)
       throws FilterUnsupportedException {
     List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
@@ -384,9 +429,9 @@ public final class FilterUtil {
 
     };
     Collections.sort(filterValuesList, filterNoDictValueComaparator);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
     if (filterValuesList.size() > 0) {
-      columnFilterInfo = new DimColumnFilterInfo();
+      columnFilterInfo = new ColumnFilterInfo();
       columnFilterInfo.setIncludeFilter(isIncludeFilter);
       columnFilterInfo.setFilterListForNoDictionaryCols(filterValuesList);
 
@@ -395,6 +440,58 @@ public final class FilterUtil {
   }
 
   /**
+   * This method will get the no dictionary data based on filters and same
+   * will be in ColumnFilterInfo
+   *
+   * @param evaluateResultListFinal
+   * @param isIncludeFilter
+   * @return ColumnFilterInfo
+   */
+  public static ColumnFilterInfo getMeasureValKeyMemberForFilter(
+      List<String> evaluateResultListFinal, boolean isIncludeFilter, DataType dataType,
+      CarbonMeasure carbonMeasure) throws FilterUnsupportedException {
+    List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
+    String result = null;
+    try {
+      int length = evaluateResultListFinal.size();
+      for (int i = 0; i < length; i++) {
+        result = evaluateResultListFinal.get(i);
+        if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(result)) {
+          filterValuesList.add(new byte[0]);
+          continue;
+        }
+        // TODO have to understand what method to be used for measures.
+        // filterValuesList
+        //  .add(DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(result, dataType));
+
+        filterValuesList
+            .add(DataTypeUtil.getMeasureByteArrayBasedOnDataTypes(result, dataType, carbonMeasure));
+
+      }
+    } catch (Throwable ex) {
+      throw new FilterUnsupportedException("Unsupported Filter condition: " + result, ex);
+    }
+
+    Comparator<byte[]> filterMeasureComaparator = new Comparator<byte[]>() {
+
+      @Override public int compare(byte[] filterMember1, byte[] filterMember2) {
+        // TODO Auto-generated method stub
+        return ByteUtil.UnsafeComparer.INSTANCE.compareTo(filterMember1, filterMember2);
+      }
+
+    };
+    Collections.sort(filterValuesList, filterMeasureComaparator);
+    ColumnFilterInfo columnFilterInfo = null;
+    if (filterValuesList.size() > 0) {
+      columnFilterInfo = new ColumnFilterInfo();
+      columnFilterInfo.setIncludeFilter(isIncludeFilter);
+      columnFilterInfo.setMeasuresFilterValuesList(filterValuesList);
+
+    }
+    return columnFilterInfo;
+  }
+
+  /**
    * Method will prepare the  dimfilterinfo instance by resolving the filter
    * expression value to its respective surrogates.
    *
@@ -405,7 +502,7 @@ public final class FilterUtil {
    * @return
    * @throws IOException
    */
-  public static DimColumnFilterInfo getFilterValues(AbsoluteTableIdentifier tableIdentifier,
+  public static ColumnFilterInfo getFilterValues(AbsoluteTableIdentifier tableIdentifier,
       ColumnExpression columnExpression, List<String> evaluateResultList, boolean isIncludeFilter)
       throws IOException {
     Dictionary forwardDictionary = null;
@@ -430,7 +527,7 @@ public final class FilterUtil {
    * @param isIncludeFilter
    * @return
    */
-  private static DimColumnFilterInfo getFilterValues(ColumnExpression columnExpression,
+  private static ColumnFilterInfo getFilterValues(ColumnExpression columnExpression,
       List<String> evaluateResultList, Dictionary forwardDictionary, boolean isIncludeFilter) {
     sortFilterModelMembers(columnExpression, evaluateResultList);
     List<Integer> surrogates =
@@ -438,9 +535,9 @@ public final class FilterUtil {
     // Reading the dictionary value from cache.
     getDictionaryValue(evaluateResultList, forwardDictionary, surrogates);
     Collections.sort(surrogates);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
     if (surrogates.size() > 0) {
-      columnFilterInfo = new DimColumnFilterInfo();
+      columnFilterInfo = new ColumnFilterInfo();
       columnFilterInfo.setIncludeFilter(isIncludeFilter);
       columnFilterInfo.setFilterList(surrogates);
     }
@@ -468,11 +565,11 @@ public final class FilterUtil {
    * @param expression
    * @param columnExpression
    * @param isIncludeFilter
-   * @return DimColumnFilterInfo
+   * @return ColumnFilterInfo
    * @throws FilterUnsupportedException
    * @throws IOException
    */
-  public static DimColumnFilterInfo getFilterListForAllValues(
+  public static ColumnFilterInfo getFilterListForAllValues(
       AbsoluteTableIdentifier tableIdentifier, Expression expression,
       final ColumnExpression columnExpression, boolean isIncludeFilter)
       throws IOException, FilterUnsupportedException {
@@ -539,11 +636,11 @@ public final class FilterUtil {
    * @return
    * @throws FilterUnsupportedException
    */
-  public static DimColumnFilterInfo getFilterListForRS(Expression expression,
+  public static ColumnFilterInfo getFilterListForRS(Expression expression,
       ColumnExpression columnExpression, String defaultValues, int defaultSurrogate)
       throws FilterUnsupportedException {
     List<Integer> filterValuesList = new ArrayList<Integer>(20);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
     // List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
     List<String> evaluateResultListFinal = new ArrayList<String>(20);
     // KeyGenerator keyGenerator =
@@ -565,7 +662,7 @@ public final class FilterUtil {
         }
       }
       if (filterValuesList.size() > 0) {
-        columnFilterInfo = new DimColumnFilterInfo();
+        columnFilterInfo = new ColumnFilterInfo();
         columnFilterInfo.setFilterList(filterValuesList);
       }
     } catch (FilterIllegalMemberException e) {
@@ -586,12 +683,12 @@ public final class FilterUtil {
    * @return
    * @throws FilterUnsupportedException
    */
-  public static DimColumnFilterInfo getFilterListForAllMembersRS(Expression expression,
+  public static ColumnFilterInfo getFilterListForAllMembersRS(Expression expression,
       ColumnExpression columnExpression, String defaultValues, int defaultSurrogate,
       boolean isIncludeFilter) throws FilterUnsupportedException {
     List<Integer> filterValuesList = new ArrayList<Integer>(20);
     List<String> evaluateResultListFinal = new ArrayList<String>(20);
-    DimColumnFilterInfo columnFilterInfo = null;
+    ColumnFilterInfo columnFilterInfo = null;
 
     // KeyGenerator keyGenerator =
     // KeyGeneratorFactory.getKeyGenerator(new int[] { defaultSurrogate });
@@ -617,7 +714,7 @@ public final class FilterUtil {
     if (null == defaultValues) {
       defaultValues = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
     }
-    columnFilterInfo = new DimColumnFilterInfo();
+    columnFilterInfo = new ColumnFilterInfo();
     for (int i = 0; i < evaluateResultListFinal.size(); i++) {
       if (evaluateResultListFinal.get(i).equals(defaultValues)) {
         filterValuesList.add(defaultSurrogate);
@@ -632,16 +729,21 @@ public final class FilterUtil {
    * Below method will be used to covert the filter surrogate keys
    * to mdkey
    *
-   * @param dimColumnFilterInfo
+   * @param columnFilterInfo
    * @param carbonDimension
    * @param segmentProperties
    * @return
    */
-  public static byte[][] getKeyArray(DimColumnFilterInfo dimColumnFilterInfo,
-      CarbonDimension carbonDimension, SegmentProperties segmentProperties) {
+  public static byte[][] getKeyArray(ColumnFilterInfo columnFilterInfo,
+      CarbonDimension carbonDimension, CarbonMeasure carbonMeasure,
+      SegmentProperties segmentProperties) {
+    if (null != carbonMeasure) {
+      return columnFilterInfo.getMeasuresFilterValuesList()
+          .toArray((new byte[columnFilterInfo.getMeasuresFilterValuesList().size()][]));
+    }
     if (!carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
-      return dimColumnFilterInfo.getNoDictionaryFilterValuesList()
-          .toArray((new byte[dimColumnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
+      return columnFilterInfo.getNoDictionaryFilterValuesList()
+          .toArray((new byte[columnFilterInfo.getNoDictionaryFilterValuesList().size()][]));
     }
     KeyGenerator blockLevelKeyGenerator = segmentProperties.getDimensionKeyGenerator();
     int[] dimColumnsCardinality = segmentProperties.getDimColumnsCardinality();
@@ -649,10 +751,10 @@ public final class FilterUtil {
     List<byte[]> filterValuesList = new ArrayList<byte[]>(20);
     Arrays.fill(keys, 0);
     int keyOrdinalOfDimensionFromCurrentBlock = carbonDimension.getKeyOrdinal();
-    if (null != dimColumnFilterInfo) {
+    if (null != columnFilterInfo) {
       int[] rangesForMaskedByte =
           getRangesForMaskedByte(keyOrdinalOfDimensionFromCurrentBlock, blockLevelKeyGenerator);
-      for (Integer surrogate : dimColumnFilterInfo.getFilterList()) {
+      for (Integer surrogate : columnFilterInfo.getFilterList()) {
         try {
           if (surrogate <= dimColumnsCardinality[keyOrdinalOfDimensionFromCurrentBlock]) {
             keys[keyOrdinalOfDimensionFromCurrentBlock] = surrogate;
@@ -705,7 +807,7 @@ public final class FilterUtil {
    * @param startKeyList
    * @return long[] start key
    */
-  public static void getStartKey(Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+  public static void getStartKey(Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       SegmentProperties segmentProperties, long[] startKey, List<long[]> startKeyList) {
     for (int i = 0; i < startKey.length; i++) {
       // The min surrogate key is 1, set it as the init value for starkey of each column level
@@ -734,17 +836,17 @@ public final class FilterUtil {
       DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
       SegmentProperties segmentProperties,
       SortedMap<Integer, byte[]> setOfStartKeyByteArray) {
-    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+    Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter =
         dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
     // step 1
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
       if (!entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
-        List<DimColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
+        List<ColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
         if (null == listOfDimColFilterInfo) {
           continue;
         }
         boolean isExcludePresent = false;
-        for (DimColumnFilterInfo info : listOfDimColFilterInfo) {
+        for (ColumnFilterInfo info : listOfDimColFilterInfo) {
           if (!info.isIncludeFilter()) {
             isExcludePresent = true;
           }
@@ -797,17 +899,17 @@ public final class FilterUtil {
       SegmentProperties segmentProperties,
       SortedMap<Integer, byte[]> setOfEndKeyByteArray) {
 
-    Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter =
+    Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter =
         dimColResolvedFilterInfo.getDimensionResolvedFilterInstance();
     // step 1
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
       if (!entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
-        List<DimColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
+        List<ColumnFilterInfo> listOfDimColFilterInfo = entry.getValue();
         if (null == listOfDimColFilterInfo) {
           continue;
         }
         boolean isExcludePresent = false;
-        for (DimColumnFilterInfo info : listOfDimColFilterInfo) {
+        for (ColumnFilterInfo info : listOfDimColFilterInfo) {
           if (!info.isIncludeFilter()) {
             isExcludePresent = true;
           }
@@ -869,15 +971,15 @@ public final class FilterUtil {
    * @param startKey
    */
   private static void getStartKeyWithFilter(
-      Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+      Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       SegmentProperties segmentProperties, long[] startKey, List<long[]> startKeyList) {
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
-      List<DimColumnFilterInfo> values = entry.getValue();
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<ColumnFilterInfo> values = entry.getValue();
       if (null == values || !entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
         continue;
       }
       boolean isExcludePresent = false;
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (!info.isIncludeFilter()) {
           isExcludePresent = true;
         }
@@ -894,7 +996,7 @@ public final class FilterUtil {
         continue;
       }
       int keyOrdinalOfDimensionFromCurrentBlock = dimensionFromCurrentBlock.getKeyOrdinal();
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (keyOrdinalOfDimensionFromCurrentBlock < startKey.length) {
           if (startKey[keyOrdinalOfDimensionFromCurrentBlock] < info.getFilterList().get(0)) {
             startKey[keyOrdinalOfDimensionFromCurrentBlock] = info.getFilterList().get(0);
@@ -907,7 +1009,7 @@ public final class FilterUtil {
     }
   }
 
-  public static void getEndKey(Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+  public static void getEndKey(Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       long[] endKey, SegmentProperties segmentProperties,
       List<long[]> endKeyList) {
 
@@ -936,15 +1038,15 @@ public final class FilterUtil {
   }
 
   private static void getEndKeyWithFilter(
-      Map<CarbonDimension, List<DimColumnFilterInfo>> dimensionFilter,
+      Map<CarbonDimension, List<ColumnFilterInfo>> dimensionFilter,
       SegmentProperties segmentProperties, long[] endKey, List<long[]> endKeyList) {
-    for (Map.Entry<CarbonDimension, List<DimColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
-      List<DimColumnFilterInfo> values = entry.getValue();
+    for (Map.Entry<CarbonDimension, List<ColumnFilterInfo>> entry : dimensionFilter.entrySet()) {
+      List<ColumnFilterInfo> values = entry.getValue();
       if (null == values || !entry.getKey().hasEncoding(Encoding.DICTIONARY)) {
         continue;
       }
       boolean isExcludeFilterPresent = false;
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (!info.isIncludeFilter()) {
           isExcludeFilterPresent = true;
         }
@@ -961,7 +1063,7 @@ public final class FilterUtil {
         continue;
       }
       int keyOrdinalOfDimensionFromCurrentBlock = dimensionFromCurrentBlock.getKeyOrdinal();
-      for (DimColumnFilterInfo info : values) {
+      for (ColumnFilterInfo info : values) {
         if (keyOrdinalOfDimensionFromCurrentBlock < endKey.length) {
           if (endKey[keyOrdinalOfDimensionFromCurrentBlock] > info.getFilterList()
               .get(info.getFilterList().size() - 1)) {
@@ -1042,12 +1144,17 @@ public final class FilterUtil {
    * @param dimension
    * @param dimColumnExecuterInfo
    */
-  public static void prepareKeysFromSurrogates(DimColumnFilterInfo filterValues,
+  public static void prepareKeysFromSurrogates(ColumnFilterInfo filterValues,
       SegmentProperties segmentProperties, CarbonDimension dimension,
-      DimColumnExecuterFilterInfo dimColumnExecuterInfo) {
-    byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, segmentProperties);
-    dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
-
+      DimColumnExecuterFilterInfo dimColumnExecuterInfo, CarbonMeasure measures,
+      MeasureColumnExecuterFilterInfo msrColumnExecuterInfo) {
+    if (null != measures) {
+      byte[][] keysBasedOnFilter = getKeyArray(filterValues, null, measures, segmentProperties);
+      msrColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+    } else {
+      byte[][] keysBasedOnFilter = getKeyArray(filterValues, dimension, null, segmentProperties);
+      dimColumnExecuterInfo.setFilterKeys(keysBasedOnFilter);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
index fb3a582..b56c8a0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeColGroupFilterExecuterImpl.java
@@ -49,7 +49,7 @@ public class ExcludeColGroupFilterExecuterImpl extends ExcludeFilterExecuterImpl
    */
   public ExcludeColGroupFilterExecuterImpl(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
       SegmentProperties segmentProperties) {
-    super(dimColResolvedFilterInfo, segmentProperties);
+    super(dimColResolvedFilterInfo, null, segmentProperties, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
index 23209ed..21e8447 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ExcludeFilterExecuterImpl.java
@@ -18,56 +18,152 @@ package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class ExcludeFilterExecuterImpl implements FilterExecuter {
 
   protected DimColumnResolvedFilterInfo dimColEvaluatorInfo;
   protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo;
+  protected MeasureColumnExecuterFilterInfo msrColumnExecutorInfo;
   protected SegmentProperties segmentProperties;
+  protected boolean isDimensionPresentInCurrentBlock = false;
+  protected boolean isMeasurePresentInCurrentBlock = false;
   /**
    * is dimension column data is natural sorted
    */
-  private boolean isNaturalSorted;
+  private boolean isNaturalSorted = false;
+
   public ExcludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
-      SegmentProperties segmentProperties) {
-    this.dimColEvaluatorInfo = dimColEvaluatorInfo;
-    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+      MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
+      boolean isMeasure) {
     this.segmentProperties = segmentProperties;
-    FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), segmentProperties,
-        dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
-    isNaturalSorted = dimColEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColEvaluatorInfo
-        .getDimension().isSortColumn();
+    if (isMeasure == false) {
+      this.dimColEvaluatorInfo = dimColEvaluatorInfo;
+      dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+
+      FilterUtil.prepareKeysFromSurrogates(dimColEvaluatorInfo.getFilterValues(), segmentProperties,
+          dimColEvaluatorInfo.getDimension(), dimColumnExecuterInfo, null, null);
+      isDimensionPresentInCurrentBlock = true;
+      isNaturalSorted =
+          dimColEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColEvaluatorInfo
+              .getDimension().isSortColumn();
+    } else {
+      this.msrColumnEvaluatorInfo = msrColumnEvaluatorInfo;
+      msrColumnExecutorInfo = new MeasureColumnExecuterFilterInfo();
+      FilterUtil
+          .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
+              null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
+      isMeasurePresentInCurrentBlock = true;
+    }
+
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    if (isDimensionPresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      DimensionRawColumnChunk dimensionRawColumnChunk =
+          blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+      DimensionColumnDataChunk[] dimensionColumnDataChunks =
+          dimensionRawColumnChunk.convertToDimColDataChunks();
+      BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+      for (int i = 0; i < dimensionColumnDataChunks.length; i++) {
+        BitSet bitSet = getFilteredIndexes(dimensionColumnDataChunks[i],
+            dimensionRawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+
+      return bitSetGroup;
+    } else if (isMeasurePresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      MeasureRawColumnChunk measureRawColumnChunk =
+          blockChunkHolder.getMeasureRawDataChunk()[blockIndex];
+      MeasureColumnDataChunk[] measureColumnDataChunks =
+          measureRawColumnChunk.convertToMeasureColDataChunks();
+      BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
+      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      for (int i = 0; i < measureColumnDataChunks.length; i++) {
+        BitSet bitSet =
+            getFilteredIndexes(measureColumnDataChunks[i], measureRawColumnChunk.getRowCount()[i],
+                msrType);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+      return bitSetGroup;
     }
-    DimensionRawColumnChunk dimensionRawColumnChunk =
-        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
-    DimensionColumnDataChunk[] dimensionColumnDataChunks =
-        dimensionRawColumnChunk.convertToDimColDataChunks();
-    BitSetGroup bitSetGroup =
-        new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
-    for (int i = 0; i < dimensionColumnDataChunks.length; i++) {
-      BitSet bitSet = getFilteredIndexes(dimensionColumnDataChunks[i],
-          dimensionRawColumnChunk.getRowCount()[i]);
-      bitSetGroup.setBitSet(bitSet, i);
+    return null;
+  }
+
+  private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
+    switch (msrColumnEvaluatorInfo.getType()) {
+      case SHORT:
+        return DataType.SHORT;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      default:
+        return DataType.DOUBLE;
     }
+  }
 
-    return bitSetGroup;
+  protected BitSet getFilteredIndexes(MeasureColumnDataChunk measureColumnDataChunk,
+      int numerOfRows, DataType msrType) {
+    // Here the algorithm is
+    // Get the measure values from the chunk. compare sequentially with the
+    // the filter values. The one that matches sets it Bitset.
+    BitSet bitSet = new BitSet(numerOfRows);
+    bitSet.flip(0, numerOfRows);
+    byte[][] filterValues = msrColumnExecutorInfo.getFilterKeys();
+    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    for (int i = 0; i < filterValues.length; i++) {
+      if (filterValues[i].length == 0) {
+        BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+          bitSet.flip(j);
+        }
+        continue;
+      }
+      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
+      for (int startIndex = 0; startIndex < numerOfRows; startIndex++) {
+        // Check if filterValue[i] matches with measure Values.
+        Object msrValue = DataTypeUtil
+            .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
+                msrType, msrColumnEvaluatorInfo.getMeasure());
+
+        if (comparator.compare(msrValue, filter) == 0) {
+          // This is a match.
+          bitSet.flip(startIndex);
+        }
+      }
+    }
+    return bitSet;
   }
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimColumnDataChunk,
@@ -150,11 +246,20 @@ public class ExcludeFilterExecuterImpl implements FilterExecuter {
   }
 
   @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    if (isDimensionPresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+    } else if (isMeasurePresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
index c64f498..45831e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -53,7 +53,7 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl
    */
   public IncludeColGroupFilterExecuterImpl(DimColumnResolvedFilterInfo dimColResolvedFilterInfo,
       SegmentProperties segmentProperties) {
-    super(dimColResolvedFilterInfo, segmentProperties);
+    super(dimColResolvedFilterInfo, null, segmentProperties, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index a874835..0fa42ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -17,65 +17,174 @@
 package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.partition.PartitionFilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 public class IncludeFilterExecuterImpl implements FilterExecuter {
 
   protected DimColumnResolvedFilterInfo dimColumnEvaluatorInfo;
   protected DimColumnExecuterFilterInfo dimColumnExecuterInfo;
+  protected MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo;
+  protected MeasureColumnExecuterFilterInfo msrColumnExecutorInfo;
   protected SegmentProperties segmentProperties;
+  protected boolean isDimensionPresentInCurrentBlock = false;
+  protected boolean isMeasurePresentInCurrentBlock = false;
   /**
    * is dimension column data is natural sorted
    */
-  private boolean isNaturalSorted;
+  private boolean isNaturalSorted = false;
 
   public IncludeFilterExecuterImpl(DimColumnResolvedFilterInfo dimColumnEvaluatorInfo,
-      SegmentProperties segmentProperties) {
-    this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+      MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo, SegmentProperties segmentProperties,
+      boolean isMeasure) {
+
     this.segmentProperties = segmentProperties;
-    dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
-    FilterUtil.prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(),
-        segmentProperties, dimColumnEvaluatorInfo.getDimension(), dimColumnExecuterInfo);
-    isNaturalSorted =
-        dimColumnEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColumnEvaluatorInfo
-            .getDimension().isSortColumn();
+    if (isMeasure == false) {
+      this.dimColumnEvaluatorInfo = dimColumnEvaluatorInfo;
+      dimColumnExecuterInfo = new DimColumnExecuterFilterInfo();
+      FilterUtil
+          .prepareKeysFromSurrogates(dimColumnEvaluatorInfo.getFilterValues(), segmentProperties,
+              dimColumnEvaluatorInfo.getDimension(), dimColumnExecuterInfo, null, null);
+      isDimensionPresentInCurrentBlock = true;
+      isNaturalSorted =
+          dimColumnEvaluatorInfo.getDimension().isUseInvertedIndex() && dimColumnEvaluatorInfo
+              .getDimension().isSortColumn();
+
+    } else {
+      this.msrColumnEvaluatorInfo = msrColumnEvaluatorInfo;
+      msrColumnExecutorInfo = new MeasureColumnExecuterFilterInfo();
+      FilterUtil
+          .prepareKeysFromSurrogates(msrColumnEvaluatorInfo.getFilterValues(), segmentProperties,
+              null, null, msrColumnEvaluatorInfo.getMeasure(), msrColumnExecutorInfo);
+      isMeasurePresentInCurrentBlock = true;
+
+    }
+
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColumnEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
-    }
-    DimensionRawColumnChunk dimensionRawColumnChunk =
-        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
-    BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
-    for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
-      if (dimensionRawColumnChunk.getMaxValues() != null) {
-        if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
-            dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys())) {
+    if (isDimensionPresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      DimensionRawColumnChunk dimensionRawColumnChunk =
+          blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+      BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+      for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+        if (dimensionRawColumnChunk.getMaxValues() != null) {
+          if (isScanRequired(dimensionRawColumnChunk.getMaxValues()[i],
+              dimensionRawColumnChunk.getMinValues()[i], dimColumnExecuterInfo.getFilterKeys())) {
+            BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+                dimensionRawColumnChunk.getRowCount()[i]);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        } else {
           BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
               dimensionRawColumnChunk.getRowCount()[i]);
           bitSetGroup.setBitSet(bitSet, i);
         }
-      } else {
-        BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
-            dimensionRawColumnChunk.getRowCount()[i]);
-        bitSetGroup.setBitSet(bitSet, i);
       }
+      return bitSetGroup;
+    } else if (isMeasurePresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+      MeasureRawColumnChunk measureRawColumnChunk =
+          blockChunkHolder.getMeasureRawDataChunk()[blockIndex];
+      BitSetGroup bitSetGroup = new BitSetGroup(measureRawColumnChunk.getPagesCount());
+      DataType msrType = getMeasureDataType(msrColumnEvaluatorInfo);
+      for (int i = 0; i < measureRawColumnChunk.getPagesCount(); i++) {
+        if (measureRawColumnChunk.getMaxValues() != null) {
+          if (isScanRequired(measureRawColumnChunk.getMaxValues()[i],
+              measureRawColumnChunk.getMinValues()[i], msrColumnExecutorInfo.getFilterKeys(),
+              msrColumnEvaluatorInfo.getType())) {
+            BitSet bitSet =
+                getFilteredIndexesForMeasures(measureRawColumnChunk.convertToMeasureColDataChunk(i),
+                    measureRawColumnChunk.getRowCount()[i], msrType);
+            bitSetGroup.setBitSet(bitSet, i);
+          }
+        } else {
+          BitSet bitSet =
+              getFilteredIndexesForMeasures(measureRawColumnChunk.convertToMeasureColDataChunk(i),
+                  measureRawColumnChunk.getRowCount()[i], msrType);
+          bitSetGroup.setBitSet(bitSet, i);
+        }
+      }
+      return bitSetGroup;
     }
-    return bitSetGroup;
+    return null;
+  }
+
+  private DataType getMeasureDataType(MeasureColumnResolvedFilterInfo msrColumnEvaluatorInfo) {
+    switch (msrColumnEvaluatorInfo.getType()) {
+      case SHORT:
+        return DataType.SHORT;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      default:
+        return DataType.DOUBLE;
+    }
+  }
+
+  private BitSet getFilteredIndexesForMeasures(MeasureColumnDataChunk measureColumnDataChunk,
+      int rowsInPage, DataType msrType) {
+    // Here the algorithm is
+    // Get the measure values from the chunk. compare sequentially with the
+    // the filter values. The one that matches sets it Bitset.
+    BitSet bitSet = new BitSet(rowsInPage);
+    byte[][] filterValues = msrColumnExecutorInfo.getFilterKeys();
+
+    Comparator comparator = PartitionFilterUtil.getComparatorByDataTypeForMeasure(msrType);
+    for (int i = 0; i < filterValues.length; i++) {
+      if (filterValues[i].length == 0) {
+        BitSet nullBitSet = measureColumnDataChunk.getNullValueIndexHolder().getBitSet();
+        for (int j = nullBitSet.nextSetBit(0); j >= 0; j = nullBitSet.nextSetBit(j + 1)) {
+          bitSet.set(j);
+        }
+        continue;
+      }
+      Object filter = DataTypeUtil.getMeasureObjectFromDataType(filterValues[i], msrType);
+      for (int startIndex = 0; startIndex < rowsInPage; startIndex++) {
+        // Check if filterValue[i] matches with measure Values.
+        Object msrValue = DataTypeUtil
+            .getMeasureObjectBasedOnDataType(measureColumnDataChunk.getColumnPage(), startIndex,
+                msrType, msrColumnEvaluatorInfo.getMeasure());
+
+        if (comparator.compare(msrValue, filter) == 0) {
+          // This is a match.
+          bitSet.set(startIndex);
+        }
+      }
+    }
+    return bitSet;
   }
 
   protected BitSet getFilteredIndexes(DimensionColumnDataChunk dimensionColumnDataChunk,
@@ -152,12 +261,31 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
 
   public BitSet isScanRequired(byte[][] blkMaxVal, byte[][] blkMinVal) {
     BitSet bitSet = new BitSet(1);
-    byte[][] filterValues = dimColumnExecuterInfo.getFilterKeys();
-    int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+    byte[][] filterValues = null;
+    int columnIndex = 0;
+    int blockIndex = 0;
+    boolean isScanRequired = false;
+
+    if (isDimensionPresentInCurrentBlock == true) {
+      filterValues = dimColumnExecuterInfo.getFilterKeys();
+      columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
+      blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
+      isScanRequired =
+          isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
+
+    } else if (isMeasurePresentInCurrentBlock) {
+      filterValues = msrColumnExecutorInfo.getFilterKeys();
+      columnIndex = msrColumnEvaluatorInfo.getColumnIndex();
+      // blockIndex =
+      // segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex) + segmentProperties
+      //         .getLastDimensionColOrdinal();
+      blockIndex =
+          segmentProperties.getMeasuresOrdinalToBlockMapping().get(columnIndex) + segmentProperties
+              .getLastDimensionColOrdinal();
+      isScanRequired = isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues,
+          msrColumnEvaluatorInfo.getType());
+    }
 
-    boolean isScanRequired =  blockIndex >= blkMaxVal.length ||
-        isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
     if (isScanRequired) {
       bitSet.set(0);
     }
@@ -186,12 +314,60 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
     return isScanRequired;
   }
 
+  private boolean isScanRequired(byte[] maxValue, byte[] minValue, byte[][] filterValue,
+      DataType dataType) {
+    for (int i = 0; i < filterValue.length; i++) {
+      if (filterValue[i].length == 0 || maxValue.length == 0 || minValue.length == 0) {
+        return isScanRequired(maxValue, minValue, filterValue);
+      } else {
+        switch (dataType) {
+          case DOUBLE:
+            double maxValueDouble = ByteBuffer.wrap(maxValue).getDouble();
+            double minValueDouble = ByteBuffer.wrap(minValue).getDouble();
+            double filterValueDouble = ByteBuffer.wrap(filterValue[i]).getDouble();
+            if (filterValueDouble <= maxValueDouble && filterValueDouble >= minValueDouble) {
+              return true;
+            }
+            break;
+          case INT:
+          case SHORT:
+          case LONG:
+            long maxValueLong = ByteBuffer.wrap(maxValue).getLong();
+            long minValueLong = ByteBuffer.wrap(minValue).getLong();
+            long filterValueLong = ByteBuffer.wrap(filterValue[i]).getLong();
+            if (filterValueLong <= maxValueLong && filterValueLong >= minValueLong) {
+              return true;
+            }
+            break;
+          case DECIMAL:
+            BigDecimal maxDecimal = DataTypeUtil.byteToBigDecimal(maxValue);
+            BigDecimal minDecimal = DataTypeUtil.byteToBigDecimal(minValue);
+            BigDecimal filterDecimal = DataTypeUtil.byteToBigDecimal(filterValue[i]);
+            if (filterDecimal.compareTo(maxDecimal) <= 0
+                && filterDecimal.compareTo(minDecimal) >= 0) {
+              return true;
+            }
+        }
+      }
+    }
+    return false;
+  }
+
   @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
-    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
-        .get(dimColumnEvaluatorInfo.getColumnIndex());
-    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
-      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
-          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    if (isDimensionPresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+          .get(dimColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
+    } else if (isMeasurePresentInCurrentBlock == true) {
+      int blockIndex = segmentProperties.getMeasuresOrdinalToBlockMapping()
+          .get(msrColumnEvaluatorInfo.getColumnIndex());
+      if (null == blockChunkHolder.getMeasureRawDataChunk()[blockIndex]) {
+        blockChunkHolder.getMeasureRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+            .getMeasureChunk(blockChunkHolder.getFileReader(), blockIndex);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
new file mode 100644
index 0000000..cc7e837
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/MeasureColumnExecuterFilterInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.filter.executer;
+
+public class MeasureColumnExecuterFilterInfo {
+
+  byte[][] filterKeys;
+
+  public void setFilterKeys(byte[][] filterKeys) {
+    this.filterKeys = filterKeys;
+  }
+
+  public byte[][] getFilterKeys() {
+    return filterKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
index 65184fb..8f3d2b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java
@@ -23,8 +23,10 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.scan.filter.DimColumnFilterInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.filter.ColumnFilterInfo;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -42,7 +44,7 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
   protected boolean isDimensionDefaultValuePresentInFilterValues(
       DimColumnResolvedFilterInfo dimColumnEvaluatorInfo) {
     boolean isDefaultValuePresentInFilterValues = false;
-    DimColumnFilterInfo filterValues = dimColumnEvaluatorInfo.getFilterValues();
+    ColumnFilterInfo filterValues = dimColumnEvaluatorInfo.getFilterValues();
     CarbonDimension dimension = dimColumnEvaluatorInfo.getDimension();
     byte[] defaultValue = dimension.getDefaultValue();
     if (!dimension.hasEncoding(Encoding.DICTIONARY)) {
@@ -78,4 +80,32 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter {
     }
     return isDefaultValuePresentInFilterValues;
   }
+
+  /**
+   * This method will check whether a default value for the non-existing column is present
+   * in the filter values list
+   *
+   * @param measureColumnResolvedFilterInfo
+   * @return
+   */
+  protected boolean isMeasureDefaultValuePresentInFilterValues(
+      MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo) {
+    boolean isDefaultValuePresentInFilterValues = false;
+    ColumnFilterInfo filterValues = measureColumnResolvedFilterInfo.getFilterValues();
+    CarbonMeasure measure = measureColumnResolvedFilterInfo.getMeasure();
+    byte[] defaultValue = measure.getDefaultValue();
+    if (null == defaultValue) {
+      // default value for case where user gives is Null condition
+      defaultValue = new byte[0];
+    }
+    List<byte[]> measureFilterValuesList = filterValues.getMeasuresFilterValuesList();
+    for (byte[] filterValue : measureFilterValuesList) {
+      int compare = ByteUtil.UnsafeComparer.INSTANCE.compareTo(defaultValue, filterValue);
+      if (compare == 0) {
+        isDefaultValuePresentInFilterValues = true;
+        break;
+      }
+    }
+    return isDefaultValuePresentInFilterValues;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/266c473b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
index 2954c40..8e06894 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureExcludeFilterExecutorImpl.java
@@ -22,13 +22,16 @@ import java.util.BitSet;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
 import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
 import org.apache.carbondata.core.util.BitSetGroup;
 
 public class RestructureExcludeFilterExecutorImpl extends RestructureEvaluatorImpl {
 
   protected DimColumnResolvedFilterInfo dimColEvaluatorInfo;
+  protected MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo;
   protected SegmentProperties segmentProperties;
+  protected boolean isMeasure;
 
   /**
    * flag to check whether filter values contain the default value applied on the dimension column
@@ -37,11 +40,19 @@ public class RestructureExcludeFilterExecutorImpl extends RestructureEvaluatorIm
   protected boolean isDefaultValuePresentInFilterValues;
 
   public RestructureExcludeFilterExecutorImpl(DimColumnResolvedFilterInfo dimColEvaluatorInfo,
-      SegmentProperties segmentProperties) {
+      MeasureColumnResolvedFilterInfo measureColumnResolvedFilterInfo,
+      SegmentProperties segmentProperties, boolean isMeasure) {
     this.dimColEvaluatorInfo = dimColEvaluatorInfo;
+    this.measureColumnResolvedFilterInfo = measureColumnResolvedFilterInfo;
     this.segmentProperties = segmentProperties;
-    isDefaultValuePresentInFilterValues =
-        isDimensionDefaultValuePresentInFilterValues(dimColEvaluatorInfo);
+    this.isMeasure = isMeasure;
+    if (isMeasure) {
+      isDefaultValuePresentInFilterValues =
+          isMeasureDefaultValuePresentInFilterValues(measureColumnResolvedFilterInfo);
+    } else {
+      isDefaultValuePresentInFilterValues =
+          isDimensionDefaultValuePresentInFilterValues(dimColEvaluatorInfo);
+    }
   }
 
   @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {