You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:43 UTC

[45/47] incubator-carbondata git commit: Merge remote-tracking branch 'carbon_master/master' into apache/master

Merge remote-tracking branch 'carbon_master/master' into apache/master

Conflicts:
	core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
	core/src/main/java/org/carbondata/query/aggregator/impl/CountAggregator.java
	core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java
	core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java
	core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java
	core/src/main/java/org/carbondata/scan/executor/util/QueryUtil.java
	core/src/main/java/org/carbondata/scan/filter/FilterExpressionProcessor.java
	core/src/main/java/org/carbondata/scan/filter/FilterUtil.java
	core/src/main/java/org/carbondata/scan/filter/resolver/AndFilterResolverImpl.java
	core/src/main/java/org/carbondata/scan/filter/resolver/LogicalFilterResolverImpl.java
	core/src/main/java/org/carbondata/scan/filter/resolver/resolverinfo/visitor/DictionaryColumnVisitor.java
	integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
	integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
	integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
	integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
	integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
	integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
	integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
	integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
	integration/spark/src/test/scala/org/carbondata/spark/testsuite/directdictionary/TimestampDataTypeDirectDictionaryTestCase.scala
	processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
	processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
	processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/50dfdf6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/50dfdf6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/50dfdf6c

Branch: refs/heads/master
Commit: 50dfdf6c8eea672665fe99c5ac4603107a435209
Parents: 10ed89a b82a960
Author: ravipesala <ra...@gmail.com>
Authored: Mon Aug 1 12:58:17 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Aug 1 12:58:17 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/ColumnIdentifier.java           |   4 +
 .../carbon/datastore/SegmentTaskIndexStore.java |   2 +-
 .../metadata/schema/table/CarbonTable.java      |  48 ++--
 .../schema/table/column/CarbonDimension.java    |   4 +
 .../core/carbon/path/CarbonStorePath.java       |   2 +-
 .../core/carbon/path/CarbonTablePath.java       |  18 +-
 .../carbon/querystatistics/QueryStatistic.java  |  12 +
 .../core/constants/CarbonCommonConstants.java   |  29 ++
 .../org/carbondata/core/load/BlockDetails.java  |   8 +-
 .../core/load/LoadMetadataDetails.java          |  20 ++
 .../core/util/CarbonLoadStatisticsDummy.java    |  12 +-
 .../core/util/CarbonLoadStatisticsImpl.java     |  65 ++---
 .../org/carbondata/core/util/DataTypeUtil.java  |  67 +++++
 .../carbondata/core/util/LoadStatistics.java    |   6 +-
 .../aggregator/impl/CountStarAggregator.java    |  51 ++++
 .../scan/executor/util/QueryUtil.java           |  54 ++--
 .../scan/filter/FilterExpressionProcessor.java  |  19 +-
 .../org/carbondata/scan/filter/FilterUtil.java  |  32 +++
 .../filter/resolver/AndFilterResolverImpl.java  |   4 +-
 .../resolver/LogicalFilterResolverImpl.java     |  10 +-
 .../visitor/CustomTypeDictionaryVisitor.java    |   8 +
 .../visitor/DictionaryColumnVisitor.java        |   4 +-
 .../examples/ComplexTypeExample.scala           |   6 +-
 .../hadoop/test/util/StoreCreator.java          |   2 +-
 .../spark/sql/common/util/CsvCompare.scala      |   1 -
 .../spark/merger/CompactionCallable.java        |  36 +--
 .../carbondata/spark/load/CarbonLoaderUtil.java |  22 +-
 .../spark/merger/CarbonDataMergerUtil.java      |  84 ++++--
 .../spark/sql/CarbonDatasourceRelation.scala    |  24 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 216 ++++++++-------
 .../spark/sql/SparkUnknownExpression.scala      |  24 +-
 .../execution/command/carbonTableSchema.scala   |  32 ++-
 .../spark/sql/hive/CarbonMetastoreCatalog.scala |  27 +-
 .../scala/org/apache/spark/util/FileUtils.scala |   6 +-
 .../org/apache/spark/util/SplitUtils.scala      |   6 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |  65 +++--
 .../spark/rdd/CarbonDataRDDFactory.scala        | 264 ++++++++++---------
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  37 ++-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 183 +++++++------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  51 +++-
 .../org/carbondata/spark/rdd/Compactor.scala    |  79 +++---
 .../spark/tasks/DictionaryWriterTask.scala      |  56 ++--
 .../carbondata/spark/util/CarbonScalaUtil.scala |  13 +
 .../spark/util/GlobalDictionaryUtil.scala       |  16 +-
 integration/spark/src/test/resources/array1.csv |   2 +
 .../resources/complexTypeDecimalNestedHive.csv  |   8 +
 .../spark/src/test/resources/datadelimiter.csv  |  11 +
 .../src/test/resources/datanullmeasurecol.csv   |   3 +
 .../spark/src/test/resources/datasample.csv     |   1 +
 .../resources/decimalBoundaryDataCarbon.csv     |  12 +
 .../test/resources/decimalBoundaryDataHive.csv  |  11 +
 .../test/resources/emptyDimensionDataHive.csv   |  20 ++
 .../spark/src/test/resources/struct_all.csv     |   4 +
 .../src/test/resources/structusingstruct.csv    |   2 +
 .../spark/src/test/resources/timestampdata.csv  |   2 +
 .../apache/spark/sql/TestCarbonSqlParser.scala  |  72 ++---
 ...plexPrimitiveTimestampDirectDictionary.scala |  12 +
 .../complexType/TestComplexTypeQuery.scala      |  52 +++-
 .../dataload/TestLoadDataWithJunkChars.scala    |  43 +++
 .../dataload/TestLoadDataWithNullMeasures.scala |   1 +
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  52 +++-
 .../createtable/TestCreateTableSyntax.scala     |  29 +-
 .../DataCompactionCardinalityBoundryTest.scala  |   6 +-
 .../datacompaction/DataCompactionTest.scala     |   6 +-
 .../MajorCompactionIgnoreInMinorTest.scala      | 130 +++++++++
 .../MajorCompactionStopsAfterCompaction.scala   | 125 +++++++++
 .../TestDataWithDicExcludeAndInclude.scala      |  22 +-
 .../dataload/TestLoadDataWithHiveSyntax.scala   |  59 ++++-
 .../dataload/TestLoadDataWithNoMeasure.scala    |   2 +-
 .../TestLoadDataWithNotProperInputFile.scala    |   6 +-
 .../NoDictionaryColumnTestCase.scala            |   6 +-
 ...estampDataTypeDirectDictionaryTestCase.scala |  49 +++-
 ...TypeDirectDictionaryWithNoDictTestCase.scala |   6 +-
 .../TimestampDataTypeNullDataTest.scala         |   2 +-
 .../filterexpr/AllDataTypesTestCaseFilter.scala |  14 +-
 .../filterexpr/CountStarTestCase.scala          |  72 +++++
 .../NullMeasureValueTestCaseFilter.scala        |   7 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |   2 +-
 .../util/GlobalDictionaryUtilTestCase.scala     |   4 +-
 .../carbondata/lcm/locks/CarbonLockFactory.java |  27 +-
 .../org/carbondata/lcm/locks/HdfsFileLock.java  |  25 +-
 .../org/carbondata/lcm/locks/LocalFileLock.java |  16 +-
 .../carbondata/lcm/locks/ZooKeeperLocking.java  |  49 +++-
 .../lcm/status/SegmentStatusManager.java        |   2 -
 .../processing/csvload/GraphExecutionUtil.java  |  31 +++
 .../csvreaderstep/CustomDataStream.java         | 108 ++++++++
 .../processing/csvreaderstep/CustomReader.java  | 157 -----------
 .../csvreaderstep/UnivocityCsvParser.java       |  13 +-
 .../processing/datatypes/PrimitiveDataType.java |  28 +-
 .../processing/mdkeygen/MDKeyGenStep.java       |   2 +-
 .../processing/mdkeygen/MDKeyGenStepMeta.java   |   3 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |   2 +-
 .../CarbonCSVBasedDimSurrogateKeyGen.java       |  21 ++
 .../csvbased/CarbonCSVBasedSeqGenMeta.java      |   3 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  72 ++++-
 .../FileStoreSurrogateKeyGenForCSV.java         |   3 +-
 .../util/CarbonDataProcessorUtil.java           |   7 +-
 .../processing/util/CarbonSchemaParser.java     |   1 +
 98 files changed, 2240 insertions(+), 914 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
index 765dc60,d0037e1..995bf17
--- a/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/DataTypeUtil.java
@@@ -285,61 -155,90 +285,128 @@@ public final class DataTypeUtil 
        return null;
      }
      try {
 +      switch (dataType) {
 +        case DOUBLE:
 +          return data;
 +        case LONG:
 +          return data;
 +        case DECIMAL:
 +          java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
 +          scala.math.BigDecimal scalaDecVal = new scala.math.BigDecimal(javaDecVal);
 +          org.apache.spark.sql.types.Decimal decConverter =
 +              new org.apache.spark.sql.types.Decimal();
 +          return decConverter.set(scalaDecVal);
 +        default:
 +          return data;
 +      }
 +    } catch (NumberFormatException ex) {
 +      LOGGER.error("Problem while converting data type" + data);
 +      return null;
 +    }
 +
 +  }
 +
 +  /**
 +   * This method will parse a given string value corresponding to its datatype
 +   *
 +   * @param value    value to parse
 +   * @param dataType datatype for that value
 +   * @return
 +   */
 +  public static boolean validateColumnValueForItsDataType(String value, DataType dataType) {
 +    try {
        Object parsedValue = null;
 -      switch (actualDataType) {
 +      // validation will not be done for timestamp datatype as for timestamp direct dictionary
 +      // is generated. No dictionary file is created for timestamp datatype column
 +      switch (dataType) {
 +        case DECIMAL:
 +          parsedValue = new BigDecimal(value);
 +          break;
          case INT:
 -          parsedValue = Integer.parseInt(data);
 +          parsedValue = Integer.parseInt(value);
            break;
          case LONG:
 -          parsedValue = Long.parseLong(data);
 +          parsedValue = Long.valueOf(value);
 +          break;
 +        case DOUBLE:
 +          parsedValue = Double.valueOf(value);
            break;
          default:
 -          return data;
 +          return true;
        }
 -      if(null != parsedValue) {
 -        return data;
 +      if (null != parsedValue) {
 +        return true;
        }
 -      return null;
 -    } catch (NumberFormatException ex) {
 -      return null;
 +      return false;
 +    } catch (Exception e) {
 +      return false;
      }
    }
+ 
+   /**
+    * This method will parse a given string value corresponding to its data type
+    *
+    * @param value     value to parse
+    * @param dimension dimension to get data type and precision and scale in case of decimal
+    *                  data type
+    * @return
+    */
+   public static String normalizeColumnValueForItsDataType(String value, CarbonDimension dimension) {
+     try {
+       Object parsedValue = null;
+       // validation will not be done for timestamp datatype as for timestamp direct dictionary
+       // is generated. No dictionary file is created for timestamp datatype column
+       switch (dimension.getDataType()) {
+         case DECIMAL:
+           return parseStringToBigDecimal(value, dimension);
+         case INT:
+         case LONG:
+           parsedValue = normalizeIntAndLongValues(value, dimension.getDataType());
+           break;
+         case DOUBLE:
+           parsedValue = Double.parseDouble(value);
+           break;
+         default:
+           return value;
+       }
+       if (null != parsedValue) {
+         return value;
+       }
+       return null;
+     } catch (Exception e) {
+       return null;
+     }
+   }
+ 
+   /**
+    * This method will parse a value to its datatype if datatype is decimal else will return
+    * the value passed
+    *
+    * @param value     value to be parsed
+    * @param dimension
+    * @return
+    */
+   public static String parseValue(String value, CarbonDimension dimension) {
+     try {
+       switch (dimension.getDataType()) {
+         case DECIMAL:
+           return parseStringToBigDecimal(value, dimension);
+         default:
+           return value;
+       }
+     } catch (Exception e) {
+       return null;
+     }
+   }
+ 
+   private static String parseStringToBigDecimal(String value, CarbonDimension dimension) {
+     BigDecimal bigDecimal = new BigDecimal(value)
+         .setScale(dimension.getColumnSchema().getScale(), RoundingMode.HALF_UP);
+     BigDecimal normalizedValue =
+         normalizeDecimalValue(bigDecimal, dimension.getColumnSchema().getPrecision());
+     if (null != normalizedValue) {
+       return normalizedValue.toString();
+     }
+     return null;
+   }
  }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/core/src/main/java/org/carbondata/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/executor/util/QueryUtil.java
index 3b978d7,0000000..8fee45a
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/carbondata/scan/executor/util/QueryUtil.java
@@@ -1,931 -1,0 +1,947 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.executor.util;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.TreeSet;
 +
 +import org.carbondata.core.cache.Cache;
 +import org.carbondata.core.cache.CacheProvider;
 +import org.carbondata.core.cache.CacheType;
 +import org.carbondata.core.cache.dictionary.Dictionary;
 +import org.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 +import org.carbondata.core.carbon.CarbonTableIdentifier;
 +import org.carbondata.core.carbon.datastore.block.SegmentProperties;
 +import org.carbondata.core.carbon.metadata.CarbonMetadata;
 +import org.carbondata.core.carbon.metadata.datatype.DataType;
 +import org.carbondata.core.carbon.metadata.encoder.Encoding;
 +import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 +import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 +import org.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 +import org.carbondata.core.constants.CarbonCommonConstants;
 +import org.carbondata.core.keygenerator.KeyGenException;
 +import org.carbondata.core.keygenerator.KeyGenerator;
 +import org.carbondata.core.util.CarbonUtil;
 +import org.carbondata.core.util.CarbonUtilException;
 +import org.carbondata.scan.complextypes.ArrayQueryType;
 +import org.carbondata.scan.complextypes.PrimitiveQueryType;
 +import org.carbondata.scan.complextypes.StructQueryType;
 +import org.carbondata.scan.executor.exception.QueryExecutionException;
 +import org.carbondata.scan.executor.infos.KeyStructureInfo;
 +import org.carbondata.scan.expression.ColumnExpression;
 +import org.carbondata.scan.expression.Expression;
 +import org.carbondata.scan.expression.logical.BinaryLogicalExpression;
 +import org.carbondata.scan.filter.GenericQueryType;
 +import org.carbondata.scan.filter.resolver.FilterResolverIntf;
 +import org.carbondata.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
 +import org.carbondata.scan.model.QueryDimension;
 +import org.carbondata.scan.model.QueryMeasure;
 +import org.carbondata.scan.model.QueryModel;
 +
 +import org.apache.commons.lang3.ArrayUtils;
 +
 +/**
 + * Utility class for query execution
 + */
 +public class QueryUtil {
 +
 +  /**
 +   * Below method will be used to get the masked byte range based on the query
 +   * dimension. It will give the range in the mdkey. This will be used to get
 +   * the actual key array from masked mdkey
 +   *
 +   * @param queryDimensions query dimension selected in query
 +   * @param keyGenerator    key generator
 +   * @return masked key
 +   */
 +  public static int[] getMaskedByteRange(List<QueryDimension> queryDimensions,
 +      KeyGenerator keyGenerator) {
 +    Set<Integer> byteRangeSet = new TreeSet<Integer>();
 +    int[] byteRange = null;
 +    for (int i = 0; i < queryDimensions.size(); i++) {
 +
 +      // as no dictionary column and complex type columns
 +      // are not selected in the mdkey
 +      // so we will not select the those dimension for calculating the
 +      // range
 +      if (queryDimensions.get(i).getDimension().getKeyOrdinal() == -1) {
 +        continue;
 +      }
 +      // get the offset of the dimension in the mdkey
 +      byteRange =
 +          keyGenerator.getKeyByteOffsets(queryDimensions.get(i).getDimension().getKeyOrdinal());
 +      for (int j = byteRange[0]; j <= byteRange[1]; j++) {
 +        byteRangeSet.add(j);
 +      }
 +    }
 +    int[] maksedByteRange = new int[byteRangeSet.size()];
 +    int index = 0;
 +    Iterator<Integer> iterator = byteRangeSet.iterator();
 +    // add the masked byte range
 +    while (iterator.hasNext()) {
 +      maksedByteRange[index++] = iterator.next();
 +    }
 +    return maksedByteRange;
 +  }
 +
 +  public static int[] getMaskedByteRangeBasedOrdinal(List<Integer> ordinals,
 +      KeyGenerator keyGenerator) {
 +    Set<Integer> byteRangeSet = new TreeSet<Integer>();
 +    int[] byteRange = null;
 +    for (int i = 0; i < ordinals.size(); i++) {
 +
 +      // get the offset of the dimension in the mdkey
 +      byteRange = keyGenerator.getKeyByteOffsets(ordinals.get(i));
 +      for (int j = byteRange[0]; j <= byteRange[1]; j++) {
 +        byteRangeSet.add(j);
 +      }
 +    }
 +    int[] maksedByteRange = new int[byteRangeSet.size()];
 +    int index = 0;
 +    Iterator<Integer> iterator = byteRangeSet.iterator();
 +    // add the masked byte range
 +    while (iterator.hasNext()) {
 +      maksedByteRange[index++] = iterator.next();
 +    }
 +    return maksedByteRange;
 +  }
 +
 +  /**
 +   * Below method will return the max key based on the dimension ordinal
 +   *
 +   * @param keyOrdinalList
 +   * @param generator
 +   * @return
 +   * @throws KeyGenException
 +   */
 +  public static byte[] getMaxKeyBasedOnOrinal(List<Integer> keyOrdinalList, KeyGenerator generator)
 +      throws KeyGenException {
 +    long[] max = new long[generator.getDimCount()];
 +    Arrays.fill(max, 0L);
 +
 +    for (int i = 0; i < keyOrdinalList.size(); i++) {
 +      // adding for dimension which is selected in query
 +      max[keyOrdinalList.get(i)] = Long.MAX_VALUE;
 +    }
 +    return generator.generateKey(max);
 +  }
 +
 +  /**
 +   * To get the max key based on dimensions. i.e. all other dimensions will be
 +   * set to 0 bits and the required query dimension will be masked with all
 +   * LONG.MAX so that we can mask key and then compare while aggregating This
 +   * can be useful during filter query when only few dimensions were selected
 +   * out of row group
 +   *
 +   * @param queryDimensions dimension selected in query
 +   * @param generator       key generator
 +   * @return max key for dimension
 +   * @throws KeyGenException if any problem while generating the key
 +   */
 +  public static byte[] getMaxKeyBasedOnDimensions(List<QueryDimension> queryDimensions,
 +      KeyGenerator generator) throws KeyGenException {
 +    long[] max = new long[generator.getDimCount()];
 +    Arrays.fill(max, 0L);
 +
 +    for (int i = 0; i < queryDimensions.size(); i++) {
 +      // as no dictionary column and complex type columns
 +      // are not selected in the mdkey
 +      // so we will not select the those dimension for calculating the
 +      // range
 +      if (queryDimensions.get(i).getDimension().getKeyOrdinal() == -1) {
 +        continue;
 +      }
 +      // adding for dimension which is selected in query
 +      max[queryDimensions.get(i).getDimension().getKeyOrdinal()] = Long.MAX_VALUE;
 +    }
 +
 +    return generator.generateKey(max);
 +  }
 +
 +  /**
 +   * Below method will be used to get the masked key for query
 +   *
 +   * @param keySize         size of the masked key
 +   * @param maskedKeyRanges masked byte range
 +   * @return masked bytes
 +   */
 +  public static int[] getMaskedByte(int keySize, int[] maskedKeyRanges) {
 +    int[] maskedKey = new int[keySize];
 +    // all the non selected dimension will be filled with -1
 +    Arrays.fill(maskedKey, -1);
 +    for (int i = 0; i < maskedKeyRanges.length; i++) {
 +      maskedKey[maskedKeyRanges[i]] = i;
 +    }
 +    return maskedKey;
 +  }
 +
 +  /**
 +   * Below method will be used to get the dimension block index in file based
 +   * on query dimension
 +   *
 +   * @param queryDimensions                query dimension
 +   * @param dimensionOrdinalToBlockMapping mapping of dimension block in file to query dimension
 +   * @return block index of file
 +   */
 +  public static int[] getDimensionsBlockIndexes(List<QueryDimension> queryDimensions,
 +      Map<Integer, Integer> dimensionOrdinalToBlockMapping,
 +      List<CarbonDimension> customAggregationDimension) {
 +    // using set as in row group columns will point to same block
 +    Set<Integer> dimensionBlockIndex = new HashSet<Integer>();
 +    int blockIndex = 0;
 +    for (int i = 0; i < queryDimensions.size(); i++) {
 +      blockIndex =
 +          dimensionOrdinalToBlockMapping.get(queryDimensions.get(i).getDimension().getOrdinal());
 +      dimensionBlockIndex.add(blockIndex);
-       addChildrenBlockIndex(blockIndex, dimensionBlockIndex, queryDimensions.get(i).getDimension());
++      if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
++        addChildrenBlockIndex(dimensionBlockIndex, queryDimensions.get(i).getDimension());
++      }
 +    }
 +    for (int i = 0; i < customAggregationDimension.size(); i++) {
 +      blockIndex =
 +          dimensionOrdinalToBlockMapping.get(customAggregationDimension.get(i).getOrdinal());
++      // not adding the children dimension as dimension aggregation
++      // is not push down in case of complex dimension
 +      dimensionBlockIndex.add(blockIndex);
-       addChildrenBlockIndex(blockIndex, dimensionBlockIndex, customAggregationDimension.get(i));
 +    }
 +    return ArrayUtils
 +        .toPrimitive(dimensionBlockIndex.toArray(new Integer[dimensionBlockIndex.size()]));
 +  }
 +
 +  /**
 +   * Below method will be used to add the children block index
 +   * this will be basically for complex dimension which will have children
 +   *
-    * @param startBlockIndex start block index
-    * @param blockIndexList  block index list
-    * @param dimension       parent dimension
++   * @param blockIndexes block indexes
++   * @param dimension    parent dimension
 +   */
-   private static void addChildrenBlockIndex(int startBlockIndex, Set<Integer> blockIndexList,
-       CarbonDimension dimension) {
++  private static void addChildrenBlockIndex(Set<Integer> blockIndexes, CarbonDimension dimension) {
 +    for (int i = 0; i < dimension.numberOfChild(); i++) {
-       blockIndexList.add(++startBlockIndex);
-       addChildrenBlockIndex(startBlockIndex, blockIndexList,
-           dimension.getListOfChildDimensions().get(i));
++      addChildrenBlockIndex(blockIndexes, dimension.getListOfChildDimensions().get(i));
++      blockIndexes.add(dimension.getListOfChildDimensions().get(i).getOrdinal());
 +    }
 +  }
 +
 +  /**
 +   * Below method will be used to get the dictionary mapping for all the
 +   * dictionary encoded dimension present in the query
 +   *
 +   * @param queryDimensions            query dimension present in the query this will be used to
 +   *                                   convert the result from surrogate key to actual data
 +   * @param absoluteTableIdentifier    absolute table identifier
 +   * @return dimension unique id to its dictionary map
 +   * @throws QueryExecutionException
 +   */
 +  public static Map<String, Dictionary> getDimensionDictionaryDetail(
 +      List<QueryDimension> queryDimensions,
 +      Set<CarbonDimension> filterComplexDimensions,
 +      AbsoluteTableIdentifier absoluteTableIdentifier) throws QueryExecutionException {
 +    // to store dimension unique column id list, this is required as
 +    // dimension can be present in
 +    // query dimension, as well as some aggregation function will be applied
 +    // in the same dimension
 +    // so we need to get only one instance of dictionary
 +    // direct dictionary skip is done only for the dictionary lookup
 +    Set<String> dictionaryDimensionFromQuery = new HashSet<String>();
 +    for (int i = 0; i < queryDimensions.size(); i++) {
 +      List<Encoding> encodingList = queryDimensions.get(i).getDimension().getEncoder();
 +      // TODO need to remove the data type check for parent column in complex type no need to
 +      // write encoding dictionary
 +      if (CarbonUtil.hasEncoding(encodingList, Encoding.DICTIONARY) && !CarbonUtil
 +          .hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {
 +
 +        if (queryDimensions.get(i).getDimension().numberOfChild() == 0) {
 +          dictionaryDimensionFromQuery.add(queryDimensions.get(i).getDimension().getColumnId());
 +        }
 +        if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
 +          getChildDimensionDictionaryDetail(queryDimensions.get(i).getDimension(),
 +              dictionaryDimensionFromQuery);
 +        }
 +      }
 +    }
 +    Iterator<CarbonDimension> iterator = filterComplexDimensions.iterator();
 +    while (iterator.hasNext()) {
 +      getChildDimensionDictionaryDetail(iterator.next(), dictionaryDimensionFromQuery);
 +    }
 +    // converting to list as api exposed needed list which i think
 +    // is not correct
 +    List<String> dictionaryColumnIdList =
 +        new ArrayList<String>(dictionaryDimensionFromQuery.size());
 +    dictionaryColumnIdList.addAll(dictionaryDimensionFromQuery);
 +    return getDictionaryMap(dictionaryColumnIdList, absoluteTableIdentifier);
 +  }
 +
 +  /**
 +   * Below method will be used to fill the children dimension column id
 +   *
 +   * @param queryDimensions              query dimension
 +   * @param dictionaryDimensionFromQuery dictionary dimension for query
 +   */
 +  private static void getChildDimensionDictionaryDetail(CarbonDimension queryDimensions,
 +      Set<String> dictionaryDimensionFromQuery) {
 +    for (int j = 0; j < queryDimensions.numberOfChild(); j++) {
 +      List<Encoding> encodingList = queryDimensions.getListOfChildDimensions().get(j).getEncoder();
 +      if (queryDimensions.getListOfChildDimensions().get(j).numberOfChild() > 0) {
 +        getChildDimensionDictionaryDetail(queryDimensions.getListOfChildDimensions().get(j),
 +            dictionaryDimensionFromQuery);
-       } else if(!CarbonUtil.hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {
++      } else if (!CarbonUtil.hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {
 +        dictionaryDimensionFromQuery
 +            .add(queryDimensions.getListOfChildDimensions().get(j).getColumnId());
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Below method will be used to get the column id to its dictionary mapping
 +   *
 +   * @param dictionaryColumnIdList  dictionary column list
 +   * @param absoluteTableIdentifier absolute table identifier
 +   * @return dictionary mapping
 +   * @throws QueryExecutionException
 +   */
 +  private static Map<String, Dictionary> getDictionaryMap(List<String> dictionaryColumnIdList,
 +      AbsoluteTableIdentifier absoluteTableIdentifier) throws QueryExecutionException {
 +    // this for dictionary unique identifier
 +    List<DictionaryColumnUniqueIdentifier> dictionaryColumnUniqueIdentifiers =
 +        getDictionaryColumnUniqueIdentifierList(dictionaryColumnIdList,
 +            absoluteTableIdentifier.getCarbonTableIdentifier());
 +    CacheProvider cacheProvider = CacheProvider.getInstance();
 +    Cache forwardDictionaryCache = cacheProvider
 +        .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
 +    List<Dictionary> columnDictionaryList = null;
 +    try {
 +      columnDictionaryList = forwardDictionaryCache.getAll(dictionaryColumnUniqueIdentifiers);
 +    } catch (CarbonUtilException e) {
 +      throw new QueryExecutionException(e);
 +    }
 +    Map<String, Dictionary> columnDictionaryMap = new HashMap<>(columnDictionaryList.size());
 +    for (int i = 0; i < dictionaryColumnUniqueIdentifiers.size(); i++) {
 +      // TODO: null check for column dictionary, if cache size is less it
 +      // might return null here, in that case throw exception
 +      columnDictionaryMap.put(dictionaryColumnIdList.get(i), columnDictionaryList.get(i));
 +    }
 +    return columnDictionaryMap;
 +  }
 +
 +  /**
 +   * Below method will be used to get the dictionary column unique identifier
 +   *
 +   * @param dictionaryColumnIdList dictionary
 +   * @param carbonTableIdentifier
 +   * @return
 +   */
 +  private static List<DictionaryColumnUniqueIdentifier> getDictionaryColumnUniqueIdentifierList(
 +      List<String> dictionaryColumnIdList, CarbonTableIdentifier carbonTableIdentifier)
 +      throws QueryExecutionException {
 +    CarbonTable carbonTable =
 +        CarbonMetadata.getInstance().getCarbonTable(carbonTableIdentifier.getTableUniqueName());
 +    List<DictionaryColumnUniqueIdentifier> dictionaryColumnUniqueIdentifiers =
 +        new ArrayList<>(dictionaryColumnIdList.size());
 +    for (String columnId : dictionaryColumnIdList) {
 +      CarbonDimension dimension = CarbonMetadata.getInstance()
 +          .getCarbonDimensionBasedOnColIdentifier(carbonTable, columnId);
 +      if (null == dimension) {
 +        throw new QueryExecutionException("The column id " + columnId + " could not be resolved.");
 +      }
 +      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
 +          new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
 +              dimension.getColumnIdentifier(), dimension.getDataType());
 +      dictionaryColumnUniqueIdentifiers.add(dictionaryColumnUniqueIdentifier);
 +    }
 +    return dictionaryColumnUniqueIdentifiers;
 +  }
 +
 +  /**
 +   * Below method will used to get the method will be used to get the measure
 +   * block indexes to be read from the file
 +   *
 +   * @param queryMeasures              query measure
 +   * @param expressionMeasure          measure present in the expression
 +   * @param ordinalToBlockIndexMapping measure ordinal to block mapping
 +   * @return block indexes
 +   */
 +  public static int[] getMeasureBlockIndexes(List<QueryMeasure> queryMeasures,
 +      List<CarbonMeasure> expressionMeasure, Map<Integer, Integer> ordinalToBlockIndexMapping) {
 +    Set<Integer> measureBlockIndex = new HashSet<Integer>();
 +    for (int i = 0; i < queryMeasures.size(); i++) {
 +      measureBlockIndex
 +          .add(ordinalToBlockIndexMapping.get(queryMeasures.get(i).getMeasure().getOrdinal()));
 +    }
 +    for (int i = 0; i < expressionMeasure.size(); i++) {
 +      measureBlockIndex.add(ordinalToBlockIndexMapping.get(expressionMeasure.get(i).getOrdinal()));
 +    }
 +    return ArrayUtils.toPrimitive(measureBlockIndex.toArray(new Integer[measureBlockIndex.size()]));
 +  }
 +
 +  /**
 +   * Below method will be used to get the masked byte range for dimension
 +   * which is present in order by
 +   *
 +   * @param orderByDimensions order by dimension
 +   * @param generator         key generator
 +   * @param maskedRanges      masked byte range for dimension
 +   * @return range of masked byte for order by dimension
 +   */
 +  public static int[][] getMaskedByteRangeForSorting(List<QueryDimension> orderByDimensions,
 +      KeyGenerator generator, int[] maskedRanges) {
 +    int[][] dimensionCompareIndex = new int[orderByDimensions.size()][];
 +    int index = 0;
 +    for (int i = 0; i < dimensionCompareIndex.length; i++) {
 +      Set<Integer> integers = new TreeSet<Integer>();
 +      if (!orderByDimensions.get(i).getDimension().getEncoder().contains(Encoding.DICTIONARY)
 +          || orderByDimensions.get(i).getDimension().numberOfChild() > 0) {
 +        continue;
 +      }
 +      int[] range =
 +          generator.getKeyByteOffsets(orderByDimensions.get(i).getDimension().getKeyOrdinal());
 +      for (int j = range[0]; j <= range[1]; j++) {
 +        integers.add(j);
 +      }
 +      dimensionCompareIndex[index] = new int[integers.size()];
 +      int j = 0;
 +      for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext(); ) {
 +        Integer integer = (Integer) iterator.next();
 +        dimensionCompareIndex[index][j++] = integer.intValue();
 +      }
 +      index++;
 +    }
 +    for (int i = 0; i < dimensionCompareIndex.length; i++) {
 +      if (null == dimensionCompareIndex[i]) {
 +        continue;
 +      }
 +      int[] range = dimensionCompareIndex[i];
 +      if (null != range) {
 +        for (int j = 0; j < range.length; j++) {
 +          for (int k = 0; k < maskedRanges.length; k++) {
 +            if (range[j] == maskedRanges[k]) {
 +              range[j] = k;
 +              break;
 +            }
 +          }
 +        }
 +      }
 +
 +    }
 +    return dimensionCompareIndex;
 +  }
 +
 +  /**
 +   * Below method will be used to get the masked key for sorting
 +   *
 +   * @param orderDimensions           query dimension
 +   * @param generator                 key generator
 +   * @param maskedByteRangeForSorting masked byte range for sorting
 +   * @param maskedRanges              masked range
 +   * @return masked byte range
 +   * @throws QueryExecutionException
 +   */
 +  public static byte[][] getMaksedKeyForSorting(List<QueryDimension> orderDimensions,
 +      KeyGenerator generator, int[][] maskedByteRangeForSorting, int[] maskedRanges)
 +      throws QueryExecutionException {
 +    byte[][] maskedKey = new byte[orderDimensions.size()][];
 +    byte[] mdKey = null;
 +    long[] key = null;
 +    byte[] maskedMdKey = null;
 +    try {
 +      if (null != maskedByteRangeForSorting) {
 +        for (int i = 0; i < maskedByteRangeForSorting.length; i++) {
 +          if (null == maskedByteRangeForSorting[i]) {
 +            continue;
 +          }
 +          key = new long[generator.getDimCount()];
 +          maskedKey[i] = new byte[maskedByteRangeForSorting[i].length];
 +          key[orderDimensions.get(i).getDimension().getKeyOrdinal()] = Long.MAX_VALUE;
 +          mdKey = generator.generateKey(key);
 +          maskedMdKey = new byte[maskedRanges.length];
 +          for (int k = 0; k < maskedMdKey.length; k++) { // CHECKSTYLE:OFF
 +            // Approval
 +            // No:Approval-V1R2C10_001
 +            maskedMdKey[k] = mdKey[maskedRanges[k]];
 +          }
 +          for (int j = 0; j < maskedByteRangeForSorting[i].length; j++) {
 +            maskedKey[i][j] = maskedMdKey[maskedByteRangeForSorting[i][j]];
 +          }// CHECKSTYLE:ON
 +
 +        }
 +      }
 +    } catch (KeyGenException e) {
 +      throw new QueryExecutionException(e);
 +    }
 +    return maskedKey;
 +  }
 +
 +  /**
 +   * Below method will be used to get mapping whether dimension is present in
 +   * order by or not
 +   *
 +   * @param sortedDimensions sort dimension present in order by query
 +   * @param queryDimensions  query dimension
 +   * @return sort dimension indexes
 +   */
 +  public static byte[] getSortDimensionIndexes(List<QueryDimension> sortedDimensions,
 +      List<QueryDimension> queryDimensions) {
 +    byte[] sortedDims = new byte[queryDimensions.size()];
 +    int indexOf = 0;
 +    for (int i = 0; i < sortedDims.length; i++) {
 +      indexOf = sortedDimensions.indexOf(queryDimensions.get(i));
 +      if (indexOf > -1) {
 +        sortedDims[i] = 1;
 +      }
 +    }
 +    return sortedDims;
 +  }
 +
 +  /**
 +   * Below method will be used to get the mapping of block index and its
 +   * restructuring info
 +   *
 +   * @param queryDimensions   query dimension from query model
 +   * @param segmentProperties segment properties
 +   * @return map of block index to its restructuring info
 +   * @throws KeyGenException if problem while key generation
 +   */
 +  public static Map<Integer, KeyStructureInfo> getColumnGroupKeyStructureInfo(
 +      List<QueryDimension> queryDimensions, SegmentProperties segmentProperties)
 +      throws KeyGenException {
 +    Map<Integer, KeyStructureInfo> rowGroupToItsRSInfo = new HashMap<Integer, KeyStructureInfo>();
 +    // get column group id and its ordinal mapping of column group
 +    Map<Integer, List<Integer>> columnGroupAndItsOrdinalMappingForQuery =
 +        getColumnGroupAndItsOrdinalMapping(queryDimensions);
 +    Map<Integer, KeyGenerator> columnGroupAndItsKeygenartor =
 +        segmentProperties.getColumnGroupAndItsKeygenartor();
 +
 +    Iterator<Entry<Integer, List<Integer>>> iterator =
 +        columnGroupAndItsOrdinalMappingForQuery.entrySet().iterator();
 +    KeyStructureInfo restructureInfos = null;
 +    while (iterator.hasNext()) {
 +      Entry<Integer, List<Integer>> next = iterator.next();
 +      KeyGenerator keyGenerator = columnGroupAndItsKeygenartor.get(next.getKey());
 +      restructureInfos = new KeyStructureInfo();
 +      // sort the ordinal
 +      List<Integer> ordinal = next.getValue();
 +      List<Integer> mdKeyOrdinal = new ArrayList<Integer>();
 +      for (Integer ord : ordinal) {
 +        mdKeyOrdinal.add(segmentProperties.getColumnGroupMdKeyOrdinal(next.getKey(), ord));
 +      }
 +      Collections.sort(mdKeyOrdinal);
 +      // get the masked byte range for column group
 +      int[] maskByteRanges = getMaskedByteRangeBasedOrdinal(mdKeyOrdinal, keyGenerator);
 +      // max key for column group
 +      byte[] maxKey = getMaxKeyBasedOnOrinal(mdKeyOrdinal, keyGenerator);
 +      // get masked key for column group
 +      int[] maksedByte = getMaskedByte(keyGenerator.getKeySizeInBytes(), maskByteRanges);
 +      restructureInfos.setKeyGenerator(keyGenerator);
 +      restructureInfos.setMaskByteRanges(maskByteRanges);
 +      restructureInfos.setMaxKey(maxKey);
 +      restructureInfos.setMaskedBytes(maksedByte);
 +      rowGroupToItsRSInfo
 +          .put(segmentProperties.getDimensionOrdinalToBlockMapping().get(ordinal.get(0)),
 +              restructureInfos);
 +    }
 +    return rowGroupToItsRSInfo;
 +  }
 +
 +  /**
 +   * return true if given key is found in array
 +   *
 +   * @param data
 +   * @param key
 +   * @return
 +   */
 +  public static boolean searchInArray(int[] data, int key) {
 +    for (int i = 0; i < data.length; i++) {
 +      if (key == data[i]) {
 +        return true;
 +      }
 +    }
 +    return false;
 +  }
 +
 +  /**
 +   * Below method will be used to create a mapping of column group columns
 +   * this mapping will have column group id to all the dimension ordinal
 +   * present in the column group This mapping will be used during query
 +   * execution, to create a mask key for the column group dimension which will
 +   * be used in aggregation and filter query as column group dimension will be
 +   * stored in bit level
 +   */
 +  private static Map<Integer, List<Integer>> getColumnGroupAndItsOrdinalMapping(
 +      List<QueryDimension> origdimensions) {
 +
 +    List<QueryDimension> dimensions = new ArrayList<QueryDimension>(origdimensions.size());
 +    dimensions.addAll(origdimensions);
 +    /**
 +     * sort based on column group id
 +     */
 +    Collections.sort(dimensions, new Comparator<QueryDimension>() {
 +
 +      @Override public int compare(QueryDimension o1, QueryDimension o2) {
 +        return Integer
 +            .compare(o1.getDimension().columnGroupId(), o2.getDimension().columnGroupId());
 +      }
 +    });
 +    // list of row groups this will store all the row group column
 +    Map<Integer, List<Integer>> columnGroupAndItsOrdinalsMapping =
 +        new HashMap<Integer, List<Integer>>();
 +    // to store a column group
 +    List<Integer> currentColumnGroup = null;
 +    // current index
 +    int index = 0;
 +    // previous column group to check all the column of row id has bee
 +    // selected
 +    int prvColumnGroupId = -1;
 +    while (index < dimensions.size()) {
 +      // if dimension group id is not zero and it is same as the previous
 +      // column group id
 +      // then we need to add ordinal of that column as it belongs to same
 +      // column group
 +      if (!dimensions.get(index).getDimension().isColumnar()
 +          && dimensions.get(index).getDimension().columnGroupId() == prvColumnGroupId
 +          && null != currentColumnGroup) {
 +        currentColumnGroup.add(dimensions.get(index).getDimension().getOrdinal());
 +      }
 +
 +      // if dimension is not a columnar then it is column group column
 +      else if (!dimensions.get(index).getDimension().isColumnar()) {
 +        currentColumnGroup = new ArrayList<Integer>();
 +        columnGroupAndItsOrdinalsMapping
 +            .put(dimensions.get(index).getDimension().columnGroupId(), currentColumnGroup);
 +        currentColumnGroup.add(dimensions.get(index).getDimension().getOrdinal());
 +      }
 +      // update the row id every time,this is required to group the
 +      // columns
 +      // of the same row group
 +      prvColumnGroupId = dimensions.get(index).getDimension().columnGroupId();
 +      index++;
 +    }
 +    return columnGroupAndItsOrdinalsMapping;
 +  }
 +
 +  /**
 +   * Below method will be used to get masked byte
 +   *
 +   * @param data           actual data
 +   * @param maxKey         max key
 +   * @param maskByteRanges mask byte range
 +   * @param byteCount
 +   * @return masked byte
 +   */
 +  public static byte[] getMaskedKey(byte[] data, byte[] maxKey, int[] maskByteRanges,
 +      int byteCount) {
 +    byte[] maskedKey = new byte[byteCount];
 +    int counter = 0;
 +    int byteRange = 0;
 +    for (int i = 0; i < byteCount; i++) {
 +      byteRange = maskByteRanges[i];
 +      if (byteRange != -1) {
 +        maskedKey[counter++] = (byte) (data[byteRange] & maxKey[byteRange]);
 +      }
 +    }
 +    return maskedKey;
 +  }
 +
 +  /**
 +   * Below method will be used to fill block indexes of the query dimension
 +   * which will be used in creating a output row Here is method we are passing
 +   * two list which store the indexes one for dictionary column other for not
 +   * dictionary column. This is done for specific purpose so that in one
 +   * iteration we will be able to fill both type dimension block indexes
 +   *
 +   * @param queryDimensions                  dimension present in the query
 +   * @param columnOrdinalToBlockIndexMapping column ordinal to block index mapping
 +   * @param dictionaryDimensionBlockIndex    list to store dictionary column block indexes
 +   * @param noDictionaryDimensionBlockIndex  list to store no dictionary block indexes
 +   */
 +  public static void fillQueryDimensionsBlockIndexes(List<QueryDimension> queryDimensions,
 +      Map<Integer, Integer> columnOrdinalToBlockIndexMapping,
 +      Set<Integer> dictionaryDimensionBlockIndex, List<Integer> noDictionaryDimensionBlockIndex) {
 +    for (QueryDimension queryDimension : queryDimensions) {
 +      if (CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)
 +          && queryDimension.getDimension().numberOfChild() == 0) {
 +        dictionaryDimensionBlockIndex
 +            .add(columnOrdinalToBlockIndexMapping.get(queryDimension.getDimension().getOrdinal()));
-       } else if(queryDimension.getDimension().numberOfChild() == 0){
++      } else if (queryDimension.getDimension().numberOfChild() == 0) {
 +        noDictionaryDimensionBlockIndex
 +            .add(columnOrdinalToBlockIndexMapping.get(queryDimension.getDimension().getOrdinal()));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Below method will be used to resolve the query model
 +   * resolve will be setting the actual dimension and measure object
 +   * as from driver only column name will be passes to avoid the heavy object
 +   * serialization
 +   *
 +   * @param queryModel query model
 +   */
 +  public static void resolveQueryModel(QueryModel queryModel) {
 +    CarbonMetadata.getInstance().addCarbonTable(queryModel.getTable());
 +    // TODO need to load the table from table identifier
 +    CarbonTable carbonTable = queryModel.getTable();
 +    String tableName =
 +        queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableName();
 +    // resolve query dimension
 +    for (QueryDimension queryDimension : queryModel.getQueryDimension()) {
 +      queryDimension
 +          .setDimension(carbonTable.getDimensionByName(tableName, queryDimension.getColumnName()));
 +    }
 +    // resolve sort dimension
 +    for (QueryDimension sortDimension : queryModel.getSortDimension()) {
 +      sortDimension
 +          .setDimension(carbonTable.getDimensionByName(tableName, sortDimension.getColumnName()));
 +    }
 +    // resolve query measure
 +    for (QueryMeasure queryMeasure : queryModel.getQueryMeasures()) {
 +      // in case of count start column name will  be count * so
 +      // first need to check any measure is present or not and as if measure
 +      // if measure is present and if first measure is not a default
 +      // measure than add measure otherwise
 +      // than add first dimension as a measure
 +      //as currently if measure is not present then
 +      //we are adding default measure so first condition will
 +      //never come false but if in future we can remove so not removing first if check
 +      if (queryMeasure.getColumnName().equals("count(*)")) {
 +        if (carbonTable.getMeasureByTableName(tableName).size() > 0 && !carbonTable
 +            .getMeasureByTableName(tableName).get(0).getColName()
 +            .equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) {
 +          queryMeasure.setMeasure(carbonTable.getMeasureByTableName(tableName).get(0));
 +        } else {
 +          CarbonMeasure dummyMeasure = new CarbonMeasure(
 +              carbonTable.getDimensionByTableName(tableName).get(0).getColumnSchema(), 0);
 +          queryMeasure.setMeasure(dummyMeasure);
 +        }
 +      } else {
 +        queryMeasure
 +            .setMeasure(carbonTable.getMeasureByName(tableName, queryMeasure.getColumnName()));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Below method will be used to get the index of number type aggregator
 +   *
 +   * @param aggType
 +   * @return index in aggregator
 +   */
 +  public static int[] getNumberTypeIndex(List<String> aggType) {
 +    List<Integer> indexList = new ArrayList<Integer>();
 +    for (int i = 0; i < aggType.size(); i++) {
 +      if (CarbonCommonConstants.SUM.equals(aggType.get(i)) || CarbonCommonConstants.AVERAGE
 +          .equals(aggType.get(i))) {
 +        indexList.add(i);
 +      }
 +    }
 +    return ArrayUtils.toPrimitive(indexList.toArray(new Integer[indexList.size()]));
 +  }
 +
 +  /**
 +   * below method will be used to get the actual type aggregator
 +   *
 +   * @param aggType
 +   * @return index in aggrgetor
 +   */
 +  public static int[] getActualTypeIndex(List<String> aggType) {
 +    List<Integer> indexList = new ArrayList<Integer>();
 +    for (int i = 0; i < aggType.size(); i++) {
 +      if (!CarbonCommonConstants.SUM.equals(aggType.get(i)) && !CarbonCommonConstants.AVERAGE
 +          .equals(aggType.get(i))) {
 +        indexList.add(i);
 +      }
 +    }
 +    return ArrayUtils.toPrimitive(indexList.toArray(new Integer[indexList.size()]));
 +  }
 +
 +  /**
 +   * Below method will be used to get the key structure for the column group
 +   *
 +   * @param segmentProperties      segment properties
 +   * @param dimColumnEvaluatorInfo dimension evaluator info
 +   * @return key structure info for column group dimension
 +   * @throws KeyGenException
 +   */
 +  public static KeyStructureInfo getKeyStructureInfo(SegmentProperties segmentProperties,
 +      DimColumnResolvedFilterInfo dimColumnEvaluatorInfo) throws KeyGenException {
 +    int colGrpId = getColumnGroupId(segmentProperties, dimColumnEvaluatorInfo.getColumnIndex());
 +    KeyGenerator keyGenerator = segmentProperties.getColumnGroupAndItsKeygenartor().get(colGrpId);
 +    List<Integer> mdKeyOrdinal = new ArrayList<Integer>();
 +
 +    mdKeyOrdinal.add(segmentProperties
 +        .getColumnGroupMdKeyOrdinal(colGrpId, dimColumnEvaluatorInfo.getColumnIndex()));
 +    int[] maskByteRanges = QueryUtil.getMaskedByteRangeBasedOrdinal(mdKeyOrdinal, keyGenerator);
 +    byte[] maxKey = QueryUtil.getMaxKeyBasedOnOrinal(mdKeyOrdinal, keyGenerator);
 +    int[] maksedByte = QueryUtil.getMaskedByte(keyGenerator.getKeySizeInBytes(), maskByteRanges);
 +    KeyStructureInfo restructureInfos = new KeyStructureInfo();
 +    restructureInfos.setKeyGenerator(keyGenerator);
 +    restructureInfos.setMaskByteRanges(maskByteRanges);
 +    restructureInfos.setMaxKey(maxKey);
 +    restructureInfos.setMaskedBytes(maksedByte);
 +    return restructureInfos;
 +  }
 +
 +  /**
 +   * Below method will be used to get the column group id based on the ordinal
 +   *
 +   * @param segmentProperties segment properties
 +   * @param ordinal           ordinal to be searched
 +   * @return column group id
 +   */
 +  public static int getColumnGroupId(SegmentProperties segmentProperties, int ordinal) {
 +    int[][] columnGroups = segmentProperties.getColumnGroups();
 +    int colGrpId = -1;
 +    for (int i = 0; i < columnGroups.length; i++) {
 +      if (columnGroups[i].length > 1) {
 +        colGrpId++;
 +        if (QueryUtil.searchInArray(columnGroups[i], ordinal)) {
 +          break;
 +        }
 +      }
 +    }
 +    return colGrpId;
 +  }
 +
 +  /**
 +   * Below method will be used to get the map of for complex dimension and its type
 +   * which will be used to during query execution to
 +   *
 +   * @param queryDimensions          complex dimension in query
 +   * @param dimensionToBlockIndexMap dimension to block index in file map
 +   * @return complex dimension and query type
 +   */
 +  public static Map<Integer, GenericQueryType> getComplexDimensionsMap(
 +      List<QueryDimension> queryDimensions, Map<Integer, Integer> dimensionToBlockIndexMap,
 +      int[] eachComplexColumnValueSize, Map<String, Dictionary> columnIdToDictionaryMap,
 +      Set<CarbonDimension> filterDimensions) {
 +    Map<Integer, GenericQueryType> complexTypeMap = new HashMap<Integer, GenericQueryType>();
 +    for (QueryDimension dimension : queryDimensions) {
 +      CarbonDimension actualDimension = dimension.getDimension();
 +      if (actualDimension.getNumberOfChild() == 0) {
 +        continue;
 +      }
 +      fillParentDetails(dimensionToBlockIndexMap, actualDimension, complexTypeMap,
 +          eachComplexColumnValueSize, columnIdToDictionaryMap);
 +    }
 +    if (null != filterDimensions) {
 +      for (CarbonDimension filterDimension : filterDimensions) {
 +        fillParentDetails(dimensionToBlockIndexMap, filterDimension, complexTypeMap,
 +            eachComplexColumnValueSize, columnIdToDictionaryMap);
 +      }
 +    }
 +    return complexTypeMap;
 +  }
 +
 +  private static GenericQueryType fillParentDetails(Map<Integer, Integer> dimensionToBlockIndexMap,
 +      CarbonDimension dimension, Map<Integer, GenericQueryType> complexTypeMap,
 +      int[] eachComplexColumnValueSize, Map<String, Dictionary> columnIdToDictionaryMap) {
 +    int parentBlockIndex = dimensionToBlockIndexMap.get(dimension.getOrdinal());
 +    GenericQueryType parentQueryType = dimension.getDataType().equals(DataType.ARRAY) ?
 +        new ArrayQueryType(dimension.getColName(), dimension.getColName(), parentBlockIndex) :
 +        new StructQueryType(dimension.getColName(), dimension.getColName(),
 +            dimensionToBlockIndexMap.get(dimension.getOrdinal()));
 +    complexTypeMap.put(dimension.getOrdinal(), parentQueryType);
 +    parentBlockIndex =
 +        fillChildrenDetails(eachComplexColumnValueSize, columnIdToDictionaryMap, parentBlockIndex,
 +            dimension, parentQueryType);
 +    return parentQueryType;
 +  }
 +
 +  private static int fillChildrenDetails(int[] eachComplexColumnValueSize,
 +      Map<String, Dictionary> columnIdToDictionaryMap, int parentBlockIndex,
 +      CarbonDimension dimension, GenericQueryType parentQueryType) {
 +    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
 +      switch (dimension.getListOfChildDimensions().get(i).getDataType()) {
 +        case ARRAY:
 +          parentQueryType.addChildren(
 +              new ArrayQueryType(dimension.getListOfChildDimensions().get(i).getColName(),
 +                  dimension.getColName(), ++parentBlockIndex));
 +          break;
 +        case STRUCT:
 +          parentQueryType.addChildren(
 +              new StructQueryType(dimension.getListOfChildDimensions().get(i).getColName(),
 +                  dimension.getColName(), ++parentBlockIndex));
 +          break;
 +        default:
-           boolean isDirectDictionary = CarbonUtil.hasEncoding(
-               dimension.getListOfChildDimensions().get(i).getEncoder(),
-               Encoding.DIRECT_DICTIONARY);
++          boolean isDirectDictionary = CarbonUtil
++              .hasEncoding(dimension.getListOfChildDimensions().get(i).getEncoder(),
++                  Encoding.DIRECT_DICTIONARY);
 +          parentQueryType.addChildren(
 +              new PrimitiveQueryType(dimension.getListOfChildDimensions().get(i).getColName(),
 +                  dimension.getColName(), ++parentBlockIndex,
 +                  dimension.getListOfChildDimensions().get(i).getDataType(),
 +                  eachComplexColumnValueSize[dimension.getListOfChildDimensions().get(i)
 +                      .getComplexTypeOrdinal()], columnIdToDictionaryMap
 +                  .get(dimension.getListOfChildDimensions().get(i).getColumnId()),
 +                  isDirectDictionary));
 +      }
 +      if (dimension.getListOfChildDimensions().get(i).getNumberOfChild() > 0) {
 +        parentBlockIndex = fillChildrenDetails(eachComplexColumnValueSize, columnIdToDictionaryMap,
 +            parentBlockIndex, dimension.getListOfChildDimensions().get(i), parentQueryType);
 +      }
 +    }
 +    return parentBlockIndex;
 +  }
 +
 +  public static Set<CarbonDimension> getAllFilterDimensions(FilterResolverIntf filterResolverTree) {
 +    Set<CarbonDimension> filterDimensions = new HashSet<CarbonDimension>();
 +    if (null == filterResolverTree) {
 +      return filterDimensions;
 +    }
 +    List<ColumnExpression> dimensionResolvedInfos = new ArrayList<ColumnExpression>();
 +    Expression filterExpression = filterResolverTree.getFilterExpression();
-     if (filterExpression instanceof BinaryLogicalExpression) {
-       BinaryLogicalExpression logicalExpression = (BinaryLogicalExpression) filterExpression;
-       dimensionResolvedInfos.addAll(logicalExpression.getColumnList());
-     }
++    addColumnDimensions(filterExpression, filterDimensions);
 +    for (ColumnExpression info : dimensionResolvedInfos) {
 +      if (info.isDimension() && info.getDimension().getNumberOfChild() > 0) {
 +        filterDimensions.add(info.getDimension());
 +      }
 +    }
 +    return filterDimensions;
 +
 +  }
 +
++  /**
++   * This method will check if a given expression contains a column expression
++   * recursively and add the dimension instance to the set which holds the dimension
++   * instances of the complex filter expressions.
++   *
++   * @param filterDimensions
++   * @return
++   */
++  private static void addColumnDimensions(Expression expression,
++      Set<CarbonDimension> filterDimensions) {
++    if (null != expression && expression instanceof ColumnExpression
++        && ((ColumnExpression) expression).isDimension()) {
++      filterDimensions.add(((ColumnExpression) expression).getDimension());
++      return;
++    }
++    for (Expression child : expression.getChildren()) {
++      addColumnDimensions(child, filterDimensions);
++    }
++  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/50dfdf6c/core/src/main/java/org/carbondata/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/carbondata/scan/filter/FilterExpressionProcessor.java
index c58331d,0000000..a2c6f28
mode 100644,000000..100644
--- a/core/src/main/java/org/carbondata/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/carbondata/scan/filter/FilterExpressionProcessor.java
@@@ -1,350 -1,0 +1,353 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.carbondata.scan.filter;
 +
 +import java.util.ArrayList;
 +import java.util.BitSet;
 +import java.util.List;
 +
 +import org.carbondata.common.logging.LogService;
 +import org.carbondata.common.logging.LogServiceFactory;
 +import org.carbondata.core.carbon.AbsoluteTableIdentifier;
 +import org.carbondata.core.carbon.datastore.DataRefNode;
 +import org.carbondata.core.carbon.datastore.DataRefNodeFinder;
 +import org.carbondata.core.carbon.datastore.IndexKey;
 +import org.carbondata.core.carbon.datastore.block.AbstractIndex;
 +import org.carbondata.core.carbon.datastore.block.SegmentProperties;
 +import org.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
 +import org.carbondata.core.carbon.metadata.datatype.DataType;
 +import org.carbondata.core.carbon.metadata.encoder.Encoding;
 +import org.carbondata.core.keygenerator.KeyGenException;
 +import org.carbondata.scan.executor.exception.QueryExecutionException;
 +import org.carbondata.scan.expression.BinaryExpression;
 +import org.carbondata.scan.expression.Expression;
 +import org.carbondata.scan.expression.conditional.BinaryConditionalExpression;
 +import org.carbondata.scan.expression.conditional.ConditionalExpression;
 +import org.carbondata.scan.expression.exception.FilterUnsupportedException;
 +import org.carbondata.scan.expression.logical.BinaryLogicalExpression;
 +import org.carbondata.scan.filter.executer.FilterExecuter;
 +import org.carbondata.scan.filter.intf.ExpressionType;
 +import org.carbondata.scan.filter.resolver.ConditionalFilterResolverImpl;
 +import org.carbondata.scan.filter.resolver.FilterResolverIntf;
 +import org.carbondata.scan.filter.resolver.LogicalFilterResolverImpl;
 +import org.carbondata.scan.filter.resolver.RowLevelFilterResolverImpl;
 +import org.carbondata.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
 +
 +public class FilterExpressionProcessor implements FilterProcessor {
 +
 +  private static final LogService LOGGER =
 +      LogServiceFactory.getLogService(FilterExpressionProcessor.class.getName());
 +
 +  /**
 +   * Implementation will provide the resolved form of filters based on the
 +   * filter expression tree which is been passed in Expression instance.
 +   *
 +   * @param expressionTree  , filter expression tree
 +   * @param tableIdentifier ,contains carbon store informations
 +   * @return a filter resolver tree
 +   * @throws QueryExecutionException
 +   * @throws FilterUnsupportedException
 +   */
 +  public FilterResolverIntf getFilterResolver(Expression expressionTree,
 +      AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException {
 +    if (null != expressionTree && null != tableIdentifier) {
 +      return getFilterResolvertree(expressionTree, tableIdentifier);
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * This API will scan the Segment level all btrees and selects the required
 +   * block reference  nodes inorder to push the same to executer for applying filters
 +   * on the respective data reference node.
 +   * Following Algorithm is followed in below API
 +   * Step:1 Get the start end key based on the filter tree resolver information
 +   * Step:2 Prepare the IndexKeys inorder to scan the tree and get the start and end reference
 +   * node(block)
 +   * Step:3 Once data reference node ranges retrieved traverse the node within this range
 +   * and select the node based on the block min and max value and the filter value.
 +   * Step:4 The selected blocks will be send to executers for applying the filters with the help
 +   * of Filter executers.
 +   *
 +   * @throws QueryExecutionException
 +   */
 +  public List<DataRefNode> getFilterredBlocks(DataRefNode btreeNode,
 +      FilterResolverIntf filterResolver, AbstractIndex tableSegment,
 +      AbsoluteTableIdentifier tableIdentifier) throws QueryExecutionException {
 +    // Need to get the current dimension tables
 +    List<DataRefNode> listOfDataBlocksToScan = new ArrayList<DataRefNode>();
 +    // getting the start and end index key based on filter for hitting the
 +    // selected block reference nodes based on filter resolver tree.
-     LOGGER.info("preparing the start and end key for finding"
++    LOGGER.debug("preparing the start and end key for finding"
 +        + "start and end block as per filter resolver");
 +    List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2);
 +    FilterUtil.traverseResolverTreeAndGetStartAndEndKey(tableSegment.getSegmentProperties(),
 +        tableIdentifier, filterResolver, listOfStartEndKeys);
 +    // reading the first value from list which has start key
 +    IndexKey searchStartKey = listOfStartEndKeys.get(0);
 +    // reading the last value from list which has end key
 +    IndexKey searchEndKey = listOfStartEndKeys.get(1);
 +    if (null == searchStartKey && null == searchEndKey) {
 +      try {
 +        // TODO need to handle for no dictionary dimensions
 +        searchStartKey =
 +            FilterUtil.prepareDefaultStartIndexKey(tableSegment.getSegmentProperties());
 +        // TODO need to handle for no dictionary dimensions
 +        searchEndKey = FilterUtil.prepareDefaultEndIndexKey(tableSegment.getSegmentProperties());
 +      } catch (KeyGenException e) {
 +        return listOfDataBlocksToScan;
 +      }
 +    }
 +
-     LOGGER.info("Successfully retrieved the start and end key");
++    LOGGER.debug(
++        "Successfully retrieved the start and end key" + "Dictionary Start Key: " + searchStartKey
++            .getDictionaryKeys() + "No Dictionary Start Key " + searchStartKey.getNoDictionaryKeys()
++            + "Dictionary End Key: " + searchEndKey.getDictionaryKeys() + "No Dictionary End Key "
++            + searchEndKey.getNoDictionaryKeys());
 +    long startTimeInMillis = System.currentTimeMillis();
 +    DataRefNodeFinder blockFinder = new BTreeDataRefNodeFinder(
 +        tableSegment.getSegmentProperties().getEachDimColumnValueSize());
 +    DataRefNode startBlock = blockFinder.findFirstDataBlock(btreeNode, searchStartKey);
 +    DataRefNode endBlock = blockFinder.findLastDataBlock(btreeNode, searchEndKey);
 +    FilterExecuter filterExecuter =
 +        FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties(),null);
 +    while (startBlock != endBlock) {
 +      addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock,
 +          tableSegment.getSegmentProperties());
 +      startBlock = startBlock.getNextDataRefNode();
 +    }
 +    addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock,
 +        tableSegment.getSegmentProperties());
 +    LOGGER.info("Total Time in retrieving the data reference node" + "after scanning the btree " + (
 +        System.currentTimeMillis() - startTimeInMillis)
 +        + " Total number of data reference node for executing filter(s) " + listOfDataBlocksToScan
 +        .size());
 +
 +    return listOfDataBlocksToScan;
 +  }
 +
 +  /**
 +   * Selects the blocks based on col max and min value.
 +   *
 +   * @param filterResolver
 +   * @param listOfDataBlocksToScan
 +   * @param dataRefNode
 +   * @param segmentProperties
 +   */
 +  private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter,
 +      List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode,
 +      SegmentProperties segmentProperties) {
 +
 +    BitSet bitSet = filterExecuter
 +        .isScanRequired(dataRefNode.getColumnsMaxValue(), dataRefNode.getColumnsMinValue());
 +    if (!bitSet.isEmpty()) {
 +      listOfDataBlocksToScan.add(dataRefNode);
 +
 +    }
 +  }
 +
 +  /**
 +   * API will return a filter resolver instance which will be used by
 +   * executers to evaluate or execute the filters.
 +   *
 +   * @param expressionTree , resolver tree which will hold the resolver tree based on
 +   *                       filter expression.
 +   * @return FilterResolverIntf type.
 +   * @throws QueryExecutionException
 +   * @throws FilterUnsupportedException
 +   */
 +  private FilterResolverIntf getFilterResolvertree(Expression expressionTree,
 +      AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException {
 +    FilterResolverIntf filterEvaluatorTree =
 +        createFilterResolverTree(expressionTree, tableIdentifier, null);
 +    traverseAndResolveTree(filterEvaluatorTree, tableIdentifier);
 +    return filterEvaluatorTree;
 +  }
 +
 +  /**
 +   * constructing the filter resolver tree based on filter expression.
 +   * this method will visit each node of the filter resolver and prepares
 +   * the surrogates of the filter members which are involved filter
 +   * expression.
 +   *
 +   * @param filterResolverTree
 +   * @param tableIdentifier
 +   * @throws FilterUnsupportedException
 +   * @throws QueryExecutionException
 +   */
 +  private void traverseAndResolveTree(FilterResolverIntf filterResolverTree,
 +      AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException {
 +    if (null == filterResolverTree) {
 +      return;
 +    }
 +    traverseAndResolveTree(filterResolverTree.getLeft(), tableIdentifier);
 +
 +    filterResolverTree.resolve(tableIdentifier);
 +
 +    traverseAndResolveTree(filterResolverTree.getRight(), tableIdentifier);
 +  }
 +
 +  /**
 +   * Pattern used : Visitor Pattern
 +   * Method will create filter resolver tree based on the filter expression tree,
 +   * in this algorithm based on the expression instance the resolvers will created
 +   *
 +   * @param expressionTree
 +   * @param tableIdentifier
 +   * @return
 +   */
 +  private FilterResolverIntf createFilterResolverTree(Expression expressionTree,
 +      AbsoluteTableIdentifier tableIdentifier, Expression intermediateExpression) {
 +    ExpressionType filterExpressionType = expressionTree.getFilterExpressionType();
 +    BinaryExpression currentExpression = null;
-     BinaryLogicalExpression logicalExpression = null;
 +    switch (filterExpressionType) {
 +      case OR:
 +        currentExpression = (BinaryExpression) expressionTree;
 +        return new LogicalFilterResolverImpl(
 +            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
 +                currentExpression),
 +            createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
-                 currentExpression), filterExpressionType);
++                currentExpression),currentExpression);
 +      case AND:
-         logicalExpression = (BinaryLogicalExpression) expressionTree;
++        currentExpression = (BinaryExpression) expressionTree;
 +        return new LogicalFilterResolverImpl(
-             createFilterResolverTree(logicalExpression.getLeft(), tableIdentifier,
++            createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
 +                currentExpression),
-             createFilterResolverTree(logicalExpression.getRight(), tableIdentifier,
-                 currentExpression), filterExpressionType);
++            createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
++                currentExpression), currentExpression);
 +      case EQUALS:
 +      case IN:
 +        return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS, false, expressionTree,
 +            tableIdentifier, expressionTree);
 +      case GREATERTHAN:
 +      case GREATERTHAN_EQUALTO:
 +      case LESSTHAN:
 +      case LESSTHAN_EQUALTO:
 +        return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS, true, expressionTree,
 +            tableIdentifier, expressionTree);
 +
 +      case NOT_EQUALS:
 +      case NOT_IN:
 +        return getFilterResolverBasedOnExpressionType(ExpressionType.NOT_EQUALS, false,
 +            expressionTree, tableIdentifier, expressionTree);
 +
 +      default:
 +        return getFilterResolverBasedOnExpressionType(ExpressionType.UNKNOWN, false, expressionTree,
 +            tableIdentifier, expressionTree);
 +    }
 +  }
 +
 +  /**
 +   * Factory method which will return the resolver instance based on filter expression
 +   * expressions.
 +   */
 +  private FilterResolverIntf getFilterResolverBasedOnExpressionType(
 +      ExpressionType filterExpressionType, boolean isExpressionResolve, Expression expression,
 +      AbsoluteTableIdentifier tableIdentifier, Expression expressionTree) {
 +    BinaryConditionalExpression currentCondExpression = null;
 +    ConditionalExpression condExpression = null;
 +    switch (filterExpressionType) {
 +      case EQUALS:
 +        currentCondExpression = (BinaryConditionalExpression) expression;
 +        if (currentCondExpression.isSingleDimension()
 +            && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
 +            != DataType.ARRAY
 +            && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
 +            != DataType.STRUCT) {
 +          // getting new dim index.
 +          if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
 +              .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
 +              .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
 +            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
 +                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
 +                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
 +                    || FilterUtil
 +                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
 +              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, true,
 +                  tableIdentifier);
 +            }
 +            if (currentCondExpression.getFilterExpressionType() == ExpressionType.GREATERTHAN
 +                || currentCondExpression.getFilterExpressionType() == ExpressionType.LESSTHAN
 +                || currentCondExpression.getFilterExpressionType()
 +                == ExpressionType.GREATERTHAN_EQUALTO
 +                || currentCondExpression.getFilterExpressionType()
 +                == ExpressionType.LESSTHAN_EQUALTO) {
 +              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, true,
 +                  tableIdentifier);
 +            }
 +          }
 +          return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true);
 +
 +        }
 +        break;
 +      case NOT_EQUALS:
 +        currentCondExpression = (BinaryConditionalExpression) expression;
 +        if (currentCondExpression.isSingleDimension()
 +            && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
 +            != DataType.ARRAY
 +            && currentCondExpression.getColumnList().get(0).getCarbonColumn().getDataType()
 +            != DataType.STRUCT) {
 +          if (!currentCondExpression.getColumnList().get(0).getCarbonColumn()
 +              .hasEncoding(Encoding.DICTIONARY) || currentCondExpression.getColumnList().get(0)
 +              .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
 +            if (FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getLeft())
 +                && FilterUtil.checkIfExpressionContainsColumn(currentCondExpression.getRight()) || (
 +                FilterUtil.checkIfRightExpressionRequireEvaluation(currentCondExpression.getRight())
 +                    || FilterUtil
 +                    .checkIfLeftExpressionRequireEvaluation(currentCondExpression.getLeft()))) {
 +              return new RowLevelFilterResolverImpl(expression, isExpressionResolve, false,
 +                  tableIdentifier);
 +            }
 +            if (expressionTree.getFilterExpressionType() == ExpressionType.GREATERTHAN
 +                || expressionTree.getFilterExpressionType() == ExpressionType.LESSTHAN
 +                || expressionTree.getFilterExpressionType() == ExpressionType.GREATERTHAN_EQUALTO
 +                || expressionTree.getFilterExpressionType() == ExpressionType.LESSTHAN_EQUALTO) {
 +
 +              return new RowLevelRangeFilterResolverImpl(expression, isExpressionResolve, false,
 +                  tableIdentifier);
 +            }
 +
 +            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false);
 +          }
 +          return new ConditionalFilterResolverImpl(expression, isExpressionResolve, false);
 +        }
 +        break;
 +      default:
 +        condExpression = (ConditionalExpression) expression;
 +        if (condExpression.isSingleDimension()
 +            && condExpression.getColumnList().get(0).getCarbonColumn().getDataType()
 +            != DataType.ARRAY
 +            && condExpression.getColumnList().get(0).getCarbonColumn().getDataType()
 +            != DataType.STRUCT) {
 +          condExpression = (ConditionalExpression) expression;
 +          if (condExpression.getColumnList().get(0).getCarbonColumn()
 +              .hasEncoding(Encoding.DICTIONARY) && !condExpression.getColumnList().get(0)
 +              .getCarbonColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
 +            return new ConditionalFilterResolverImpl(expression, true, true);
 +          } else {
 +            return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
 +          }
 +        } else {
 +          return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
 +        }
 +    }
 +    return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier);
 +  }
 +
 +}