You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/04/21 15:44:33 UTC

[2/2] carbondata git commit: [CARBONDATA-2323]Distributed search mode using RPC

[CARBONDATA-2323]Distributed search mode using RPC

When user gives SQL statement that only includes projection and filter, we can use RPC calls to do distributed scan on the carbon files directly instead of using RDD to do the query. In this mode, RDD overhead like RDD construction and DAG scheduling is avoided.

This closes #2148


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3ff574d2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3ff574d2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3ff574d2

Branch: refs/heads/master
Commit: 3ff574d29155edd71da5273bad0d0236ea50c2bd
Parents: 6bef57b
Author: Jacky Li <ja...@qq.com>
Authored: Thu Apr 19 10:55:48 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Sat Apr 21 23:44:12 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  31 ++-
 .../carbondata/core/indexstore/Blocklet.java    |   2 +
 .../core/metadata/schema/table/CarbonTable.java |  61 -----
 .../scan/executor/QueryExecutorFactory.java     |   5 +-
 .../impl/SearchModeDetailQueryExecutor.java     |  13 +-
 .../SearchModeVectorDetailQueryExecutor.java    |  26 +-
 .../core/scan/model/QueryModelBuilder.java      |  97 ++++++--
 .../carbondata/core/util/CarbonProperties.java  |  41 ++++
 dev/findbugs-exclude.xml                        |   4 +
 .../benchmark/ConcurrentQueryBenchmark.scala    |  11 +-
 .../carbondata/benchmark/DataGenerator.scala    |  83 -------
 .../benchmark/SimpleQueryBenchmark.scala        |   1 +
 .../carbondata/examples/SearchModeExample.scala | 165 +++++++++++++
 .../carbondata/examples/util/ExampleUtils.scala |   1 +
 .../carbondata/hadoop/CarbonInputSplit.java     |   5 +
 .../hadoop/CarbonMultiBlockSplit.java           |  12 +-
 .../hadoop/api/CarbonInputFormat.java           |  47 ++--
 .../readsupport/impl/CarbonRowReadSupport.java  |  51 ++++
 .../hadoop/util/CarbonInputFormatUtil.java      |  83 +++++++
 .../hive/MapredCarbonInputFormat.java           |  23 +-
 .../TestTimeseriesTableSelection.scala          |   3 +-
 .../detailquery/SearchModeTestCase.scala        |  88 +++++--
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   1 -
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  20 --
 integration/spark2/pom.xml                      |  83 ++++---
 .../carbondata/spark/util/DataGenerator.scala   |  85 +++++++
 .../carbondata/store/SparkCarbonStore.scala     |  55 ++++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  17 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 127 +++++++++-
 .../spark/sql/optimizer/CarbonFilters.scala     |   1 +
 pom.xml                                         |   1 +
 processing/pom.xml                              |   2 +-
 .../loading/dictionary/InMemBiDictionary.java   |  80 ------
 .../merger/CarbonCompactionExecutor.java        |  12 +-
 .../partition/spliter/CarbonSplitExecutor.java  |   8 +-
 .../dictionary/InMemBiDictionaryTest.java       |  72 ------
 store/search/pom.xml                            |  78 ++++++
 .../store/worker/SearchRequestHandler.java      | 181 ++++++++++++++
 .../apache/carbondata/store/worker/Status.java  |  28 +++
 .../scala/org/apache/spark/rpc/Master.scala     | 242 +++++++++++++++++++
 .../scala/org/apache/spark/rpc/Worker.scala     | 118 +++++++++
 .../org/apache/spark/search/Registry.scala      |  51 ++++
 .../org/apache/spark/search/Searcher.scala      |  78 ++++++
 .../carbondata/store/SearchServiceTest.java     |  37 +++
 .../streaming/CarbonStreamRecordReader.java     |   4 +-
 46 files changed, 1750 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6ab1ce5..4e324fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.constants;
 
 import java.nio.charset.Charset;
 
+import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.util.CarbonProperty;
 
 public final class CarbonCommonConstants {
@@ -1629,8 +1630,14 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_SYSTEM_FOLDER_LOCATION = "carbon.system.folder.location";
 
+  /**
+   * If set to true, will use CarbonReader to do distributed scan directly instead of using
+   * compute framework like spark, thus avoiding limitation of compute framework like SQL
+   * optimizer and task scheduling overhead.
+   */
   @CarbonProperty
-  public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.mode.enable";
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.enabled";
 
   public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
 
@@ -1641,10 +1648,30 @@ public final class CarbonCommonConstants {
    * will call Executors.newFixedThreadPool(int nThreads) instead
    */
   @CarbonProperty
-  public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.mode.scan.thread";
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread";
 
   public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1";
 
+  /**
+   * In search mode, Master will listen on this port for worker registration
+   */
+  public static final String CARBON_SEARCH_MODE_MASTER_PORT = "carbon.search.master.port";
+
+  public static final String CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT = "10020";
+
+  /**
+   * In search mode, Worker will listen on this port for master request like searching.
+   * If Worker failed to start service with this port, it will try to increment the port number
+   * and try to bind again, until it is success
+   */
+  @CarbonProperty
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_WORKER_PORT = "carbon.search.worker.port";
+
+  public static final String CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT = "10021";
+
+
   /*
    * whether to enable prefetch for rowbatch to enhance row reconstruction during compaction
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 052d269..c3eda6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -28,8 +28,10 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
  */
 public class Blocklet implements Writable,Serializable {
 
+  /** file path of this blocklet */
   private String blockId;
 
+  /** id to identify the blocklet inside the block (it is a sequential number) */
   private String blockletId;
 
   public Blocklet(String blockId, String blockletId) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index d3eab6c..88e00f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -50,9 +50,7 @@ import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryProjection;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.FileHeader;
@@ -907,65 +905,6 @@ public class CarbonTable implements Serializable {
     return dataSize + indexSize;
   }
 
-  /**
-   * Create a new QueryModel with projection all columns in the table.
-   */
-  public QueryModel createQueryModelWithProjectAllColumns(DataTypeConverter converter) {
-    QueryProjection projection = new QueryProjection();
-
-    List<CarbonDimension> dimensions = getDimensionByTableName(getTableName());
-    for (int i = 0; i < dimensions.size(); i++) {
-      projection.addDimension(dimensions.get(i), i);
-    }
-    List<CarbonMeasure> measures = getMeasureByTableName(getTableName());
-    for (int i = 0; i < measures.size(); i++) {
-      projection.addMeasure(measures.get(i), i);
-    }
-    QueryModel model = QueryModel.newInstance(this);
-    model.setProjection(projection);
-    model.setConverter(converter);
-    return model;
-  }
-
-  /**
-   * Create a new QueryModel with specified projection
-   */
-  public QueryModel createQueryWithProjection(String[] projectionColumnNames,
-      DataTypeConverter converter) {
-    QueryProjection projection = createProjection(projectionColumnNames);
-    QueryModel queryModel = QueryModel.newInstance(this);
-    queryModel.setProjection(projection);
-    queryModel.setConverter(converter);
-    return queryModel;
-  }
-
-  public QueryProjection createProjection(String[] projectionColumnNames) {
-    String factTableName = getTableName();
-    QueryProjection projection = new QueryProjection();
-    // fill dimensions
-    // If columns are null, set all dimensions and measures
-    int i = 0;
-    if (projectionColumnNames != null) {
-      for (String projectionColumnName : projectionColumnNames) {
-        CarbonDimension dimension = getDimensionByName(factTableName, projectionColumnName);
-        if (dimension != null) {
-          projection.addDimension(dimension, i);
-          i++;
-        } else {
-          CarbonMeasure measure = getMeasureByName(factTableName, projectionColumnName);
-          if (measure == null) {
-            throw new RuntimeException(projectionColumnName +
-                " column not found in the table " + factTableName);
-          }
-          projection.addMeasure(measure, i);
-          i++;
-        }
-      }
-    }
-
-    return projection;
-  }
-
   public void processFilterExpression(Expression filterExpression,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     QueryModel.processFilterExpression(this, filterExpression, isFilterDimensions,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
index 06fe4db..b790f1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.scan.executor;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.executor.impl.DetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
@@ -31,9 +30,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 public class QueryExecutorFactory {
 
   public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
-    if (CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
-            CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) {
+    if (CarbonProperties.isSearchModeEnabled()) {
       if (queryModel.isVectorReader()) {
         return new SearchModeVectorDetailQueryExecutor();
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index c64755e..484cafd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -35,14 +35,18 @@ import org.apache.carbondata.core.util.CarbonProperties;
 public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
-  private static ExecutorService executorService;
+  private static ExecutorService executorService = null;
 
   static {
+    initThreadPool();
+  }
+
+  private static synchronized void initThreadPool() {
     int nThread;
     try {
       nThread = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
-                      CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
+          .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
     } catch (NumberFormatException e) {
       nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
       LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
@@ -58,6 +62,9 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object>
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    if (executorService == null) {
+      initThreadPool();
+    }
     this.queryIterator = new SearchModeResultIterator(
         blockExecutionInfoList,
         queryModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 075d94a..02e8dc1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -31,21 +31,30 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.SearchModeVectorResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
 
+/**
+ * Below class will be used to execute the detail query and returns columnar vectors.
+ */
 public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
-  private static ExecutorService executorService;
+  private static ExecutorService executorService = null;
 
   static {
+    initThreadPool();
+  }
+
+  private static synchronized void initThreadPool() {
     int nThread;
     try {
       nThread = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
+              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                       CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
     } catch (NumberFormatException e) {
       nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
-      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
+      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
+          + "Using the default value " + nThread);
     }
     if (nThread > 0) {
       executorService = Executors.newFixedThreadPool(nThread);
@@ -54,10 +63,21 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
     }
   }
 
+  public static synchronized void shutdownThreadPool() {
+    // shutdown all threads immediately
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;
+    }
+  }
+
   @Override
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    if (executorService == null) {
+      initThreadPool();
+    }
     this.queryIterator = new SearchModeVectorResultIterator(
         blockExecutionInfoList,
         queryModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index f40bd8b..f7b828e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -18,55 +18,104 @@
 package org.apache.carbondata.core.scan.model;
 
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.DataTypeConverter;
 
 public class QueryModelBuilder {
 
-  private CarbonTable carbonTable;
+  private CarbonTable table;
+  private QueryProjection projection;
+  private Expression filterExpression;
+  private DataTypeConverter dataTypeConverter;
+  private boolean forcedDetailRawQuery;
+  private boolean readPageByPage;
 
-  public QueryModelBuilder(CarbonTable carbonTable) {
-    this.carbonTable = carbonTable;
+  public QueryModelBuilder(CarbonTable table) {
+    this.table = table;
   }
 
-  public QueryModel build(String[] projectionColumnNames, Expression filterExpression) {
-    QueryModel queryModel = QueryModel.newInstance(carbonTable);
-    QueryProjection projection = carbonTable.createProjection(projectionColumnNames);
-    queryModel.setProjection(projection);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
-    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
-    return queryModel;
+  public QueryModelBuilder projectColumns(String[] projectionColumns) {
+    Objects.requireNonNull(projectionColumns);
+    String factTableName = table.getTableName();
+    QueryProjection projection = new QueryProjection();
+
+    int i = 0;
+    for (String projectionColumnName : projectionColumns) {
+      CarbonDimension dimension = table.getDimensionByName(factTableName, projectionColumnName);
+      if (dimension != null) {
+        projection.addDimension(dimension, i);
+        i++;
+      } else {
+        CarbonMeasure measure = table.getMeasureByName(factTableName, projectionColumnName);
+        if (measure == null) {
+          throw new RuntimeException(projectionColumnName +
+              " column not found in the table " + factTableName);
+        }
+        projection.addMeasure(measure, i);
+        i++;
+      }
+    }
+
+    this.projection = projection;
+    return this;
   }
 
-  public QueryModel build(Expression filterExpression) {
+  public QueryModelBuilder projectAllColumns() {
     QueryProjection projection = new QueryProjection();
-
-    List<CarbonDimension> dimensions = carbonTable.getDimensions();
+    List<CarbonDimension> dimensions = table.getDimensions();
     for (int i = 0; i < dimensions.size(); i++) {
       projection.addDimension(dimensions.get(i), i);
     }
-    List<CarbonMeasure> measures = carbonTable.getMeasures();
+    List<CarbonMeasure> measures = table.getMeasures();
     for (int i = 0; i < measures.size(); i++) {
       projection.addMeasure(measures.get(i), i);
     }
+    this.projection = projection;
+    return this;
+  }
+
+  public QueryModelBuilder filterExpression(Expression filterExpression) {
+    this.filterExpression = filterExpression;
+    return this;
+  }
+
+  public QueryModelBuilder dataConverter(DataTypeConverter dataTypeConverter) {
+    this.dataTypeConverter = dataTypeConverter;
+    return this;
+  }
 
-    QueryModel queryModel = QueryModel.newInstance(carbonTable);
+  public QueryModelBuilder enableForcedDetailRawQuery() {
+    this.forcedDetailRawQuery = true;
+    return this;
+  }
+
+  public QueryModelBuilder enableReadPageByPage() {
+    this.readPageByPage = true;
+    return this;
+  }
+
+  public QueryModel build() {
+    QueryModel queryModel = QueryModel.newInstance(table);
+    queryModel.setConverter(dataTypeConverter);
+    queryModel.setForcedDetailRawQuery(forcedDetailRawQuery);
+    queryModel.setReadPageByPage(readPageByPage);
     queryModel.setProjection(projection);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
-    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
+
+    // set the filter to the query model in order to filter blocklet before scan
+    boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
+    boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
+    table.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
     queryModel.setIsFilterDimensions(isFilterDimensions);
     queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null);
+    FilterResolverIntf filterIntf =
+        table.resolveFilter(filterExpression, new SingleTableProvider(table));
     queryModel.setFilterExpressionResolverTree(filterIntf);
     return queryModel;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 38f7513..82080dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1032,6 +1032,11 @@ public final class CarbonProperties {
     return enableAutoHandoffStr.equalsIgnoreCase("true");
   }
 
+  public boolean isEnableVectorReader() {
+    return getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).equalsIgnoreCase("true");
+  }
+
   /**
    * Validate the restrictions
    *
@@ -1460,4 +1465,40 @@ public final class CarbonProperties {
     return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system";
   }
 
+  /**
+   * Return true if search mode is enabled
+   */
+  public static boolean isSearchModeEnabled() {
+    String value = getInstance().getProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
+        CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT);
+    return Boolean.valueOf(value);
+  }
+
+  public static void enableSearchMode(boolean enable) {
+    getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, String.valueOf(enable));
+  }
+
+  public static int getSearchMasterPort() {
+    try {
+      return Integer.parseInt(
+          getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT));
+    } catch (NumberFormatException e) {
+      return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT);
+    }
+  }
+
+  public static int getSearchWorkerPort() {
+    try {
+      return Integer.parseInt(
+          getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT));
+    } catch (NumberFormatException e) {
+      return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/dev/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index b19db85..63b6bd5 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -24,6 +24,10 @@
     <Source name="~.*\.scala" />
   </Match>
 
+  <Match>
+    <Source name="~.*Test\.java" />
+  </Match>
+
   <!-- This method creates stream but the caller methods are responsible for closing the stream -->
   <Match>
     <Class name="org.apache.carbondata.core.datastore.impl.FileFactory"/>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
index 7da8c29..a1a1428 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
@@ -25,11 +25,11 @@ import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
 
 import scala.util.Random
 
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SaveMode, SparkSession}
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.spark.util.DataGenerator
 
 // scalastyle:off println
 /**
@@ -60,7 +60,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 object ConcurrentQueryBenchmark {
 
   // generate number of data
-  var totalNum = 1 * 10 * 1000
+  var totalNum = 10 * 1000 * 1000
   // the number of thread pool
   var threadNum = 16
   // task number of spark sql query
@@ -505,7 +505,8 @@ object ConcurrentQueryBenchmark {
       .addProperty("carbon.enable.vector.reader", "true")
       .addProperty("enable.unsafe.sort", "true")
       .addProperty("carbon.blockletgroup.size.in.mb", "32")
-      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "false")
     import org.apache.spark.sql.CarbonSession._
 
     // 1. initParameters
@@ -559,8 +560,10 @@ object ConcurrentQueryBenchmark {
     // 2. prepareTable
     prepareTable(spark, table1, table2)
 
+    spark.asInstanceOf[CarbonSession].startSearchMode()
     // 3. runTest
     runTest(spark, table1, table2)
+    spark.asInstanceOf[CarbonSession].stopSearchMode()
 
     if (deleteFile) {
       CarbonUtil.deleteFoldersAndFiles(new File(table1))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
deleted file mode 100644
index e3e67b1..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.benchmark
-
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.types._
-
-// scalastyle:off println
-object DataGenerator {
-  // Table schema:
-  // +-------------+-----------+-------------+-------------+------------+
-  // | Column name | Data type | Cardinality | Column type | Dictionary |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | id          | string    | 100,000,000 | dimension   | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | city        | string    | 6           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | country     | string    | 6           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | planet      | string    | 10,007      | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m1          | short     | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m2          | int       | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m3          | big int   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m4          | double    | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m5          | decimal   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  /**
-   * generate DataFrame with above table schema
-   *
-   * @param spark SparkSession
-   * @return Dataframe of test data
-   */
-  def generateDataFrame(spark: SparkSession, totalNum: Int): DataFrame = {
-    val rdd = spark.sparkContext
-      .parallelize(1 to totalNum, 4)
-      .map { x =>
-        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007,
-          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
-          BigDecimal.valueOf(x.toDouble / 11))
-      }.map { x =>
-      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
-    }
-
-    val schema = StructType(
-      Seq(
-        StructField("id", StringType, nullable = false),
-        StructField("city", StringType, nullable = false),
-        StructField("country", StringType, nullable = false),
-        StructField("planet", StringType, nullable = false),
-        StructField("m1", ShortType, nullable = false),
-        StructField("m2", IntegerType, nullable = false),
-        StructField("m3", LongType, nullable = false),
-        StructField("m4", DoubleType, nullable = false),
-        StructField("m5", DecimalType(30, 10), nullable = false)
-      )
-    )
-
-    val df = spark.createDataFrame(rdd, schema)
-    println(s"Start generate ${df.count} records, schema: ${df.schema}")
-    df
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
index 880f476..e9c880b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.spark.util.DataGenerator
 
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
new file mode 100644
index 0000000..03e724f
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.util.concurrent.{Executors, ExecutorService}
+
+import org.apache.spark.sql.{CarbonSession, SparkSession}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * An example that demonstrate how to run queries in search mode,
+ * and compare the performance between search mode and SparkSQL
+ */
+// scalastyle:off
+object SearchModeExample {
+
+  def main(args: Array[String]) {
+    val spark = ExampleUtils.createCarbonSession("SearchModeExample")
+    spark.sparkContext.setLogLevel("ERROR")
+    exampleBody(spark)
+    spark.close()
+  }
+
+  def exampleBody(spark : SparkSession): Unit = {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbonsession_table(
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbonsession_table
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
+       """.stripMargin)
+
+    val pool = Executors.newCachedThreadPool()
+
+    // start search mode
+    spark.asInstanceOf[CarbonSession].startSearchMode()
+    runAsynchrousSQL(spark, pool, 1)
+
+    println("search mode asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("search mode synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    // stop search mode
+    spark.asInstanceOf[CarbonSession].stopSearchMode()
+
+    println("sparksql asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("sparksql synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    // start search mode again
+    spark.asInstanceOf[CarbonSession].startSearchMode()
+
+    println("search mode asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("search mode synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    // stop search mode
+    spark.asInstanceOf[CarbonSession].stopSearchMode()
+
+    println("sparksql asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("sparksql synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
+    pool.shutdownNow()
+  }
+
+  private def runAsynchrousSQL(spark: SparkSession, pool: ExecutorService, round: Int): Unit = {
+    val futures = (1 to round).map { i =>
+      pool.submit(new Runnable {
+        override def run(): Unit = {
+          spark.sql(
+            s"""
+             SELECT charField, stringField, intField, dateField
+             FROM carbonsession_table
+             WHERE stringfield = 'spark' AND decimalField > $i % 37
+              """.stripMargin
+          ).collect()
+        }
+      })
+    }
+
+    futures.foreach(_.get())
+  }
+
+  private def runSynchrousSQL(spark: SparkSession, round: Int): Unit = {
+    (1 to round).map { i =>
+      spark.sql(
+        s"""
+             SELECT charField, stringField, intField, dateField
+             FROM carbonsession_table
+             WHERE stringfield = 'spark' AND decimalField > $i % 37
+              """.stripMargin
+      ).collect()
+    }
+  }
+}
+// scalastyle:on
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
index 1cdaafe..e12c2f9 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
@@ -52,6 +52,7 @@ object ExampleUtils {
       "local[" + workThreadNum.toString() + "]"
     }
     import org.apache.spark.sql.CarbonSession._
+
     val spark = SparkSession
       .builder()
       .master(masterUrl)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index c586f3c..02d272e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -430,4 +431,8 @@ public class CarbonInputSplit extends FileSplit
   public void setFormat(FileFormat fileFormat) {
     this.fileFormat = fileFormat;
   }
+
+  public Blocklet makeBlocklet() {
+    return new Blocklet(getPath().getName(), blockletId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 448cf28..0b991cb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -20,11 +20,13 @@ package org.apache.carbondata.hadoop;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 
 import org.apache.hadoop.io.Writable;
@@ -34,7 +36,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
  * This class wraps multiple blocks belong to a same node to one split.
  * So the scanning task will scan multiple blocks. This is an optimization for concurrent query.
  */
-public class CarbonMultiBlockSplit extends InputSplit implements Writable {
+public class CarbonMultiBlockSplit extends InputSplit implements Serializable, Writable {
 
   /*
    * Splits (HDFS Blocks) for task to scan.
@@ -56,6 +58,14 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     length = 0;
   }
 
+  public CarbonMultiBlockSplit(List<Distributable> blocks, String hostname) {
+    this.splitList = new ArrayList<>(blocks.size());
+    for (Distributable block : blocks) {
+      this.splitList.add((CarbonInputSplit)block);
+    }
+    this.locations = new String[]{hostname};
+  }
+
   public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList,
       String[] locations) {
     this.splitList = splitList;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8016d90..a72a6bf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -43,10 +43,9 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.SingleTableProvider;
-import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -108,6 +107,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   private static final String PARTITIONS_TO_PRUNE =
       "mapreduce.input.carboninputformat.partitions.to.prune";
+  private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
 
   // record segment number and hit blocks
   protected int numSegments = 0;
@@ -221,6 +221,17 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return configuration.get(COLUMN_PROJECTION);
   }
 
+  public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
+    configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
+  }
+
+  public static boolean isFgDataMapPruningEnable(Configuration configuration) {
+    String enable = configuration.get(FGDATAMAP_PRUNING);
+
+    // if FDDATAMAP_PRUNING is not set, by default we will use FGDataMap
+    return (enable == null) || enable.equalsIgnoreCase("true");
+  }
+
   /**
    * Set list of segments to access
    */
@@ -352,7 +363,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
+    if (dataMapJob != null &&
+        (distributedCG ||
+        (dataMapLevel == DataMapLevel.FG && isFgDataMapPruningEnable(job.getConfiguration())))) {
       DistributableDataMapFormat datamapDstr =
           new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
               partitionsToPrune, BlockletDataMapFactory.class.getName());
@@ -426,27 +440,20 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
 
-    // query plan includes projection column
+    // set projection column in the query model
     String projectionString = getColumnProjection(configuration);
-    String[] projectionColumnNames = null;
+    String[] projectColumns;
     if (projectionString != null) {
-      projectionColumnNames = projectionString.split(",");
+      projectColumns = projectionString.split(",");
+    } else {
+      projectColumns = new String[]{};
     }
-    QueryModel queryModel = carbonTable
-        .createQueryWithProjection(projectionColumnNames, getDataTypeConverter(configuration));
-
-    // set the filter to the query model in order to filter blocklet before scan
-    Expression filter = getFilterPredicates(configuration);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    // getAllMeasures returns list of visible and invisible columns
-    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
-    carbonTable.processFilterExpression(filter, isFilterDimensions, isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, tableProvider);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
+    QueryModel queryModel = new QueryModelBuilder(carbonTable)
+        .projectColumns(projectColumns)
+        .filterExpression(getFilterPredicates(configuration))
+        .dataConverter(getDataTypeConverter(configuration))
+        .build();
 
     // update the file level index store if there are invalid segment
     if (inputSplit instanceof CarbonMultiBlockSplit) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
new file mode 100644
index 0000000..e2b5e60
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.readsupport.impl;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+/**
+ * A read support implementation to return CarbonRow after handling
+ * global dictionary and direct dictionary (date/timestamp) conversion
+ */
+public class CarbonRowReadSupport extends DictionaryDecodeReadSupport<CarbonRow> {
+
+  @Override
+  public CarbonRow readRow(Object[] data) {
+    assert (data.length == dictionaries.length);
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+      }
+      if (dataTypes[i] == DataTypes.DATE) {
+        Calendar c = Calendar.getInstance();
+        c.setTime(new Date(0));
+        c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]);
+        data[i] = new Date(c.getTime().getTime());
+      } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
+        data[i] = new Timestamp((long) data[i] / 1000);
+      }
+    }
+    return new CarbonRow(data);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index d6d6603..8ac2905 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -23,9 +23,20 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.api.DataMapJob;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -63,6 +74,78 @@ public class CarbonInputFormatUtil {
     return carbonTableInputFormat;
   }
 
+  public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
+      Job job,
+      CarbonTable carbonTable,
+      String[] projectionColumns,
+      Expression filterExpression,
+      List<PartitionSpec> partitionNames,
+      DataMapJob dataMapJob) throws IOException, InvalidConfigurationException {
+    Configuration conf = job.getConfiguration();
+    CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
+    CarbonInputFormat.setDatabaseName(conf, carbonTable.getTableInfo().getDatabaseName());
+    CarbonInputFormat.setTableName(conf, carbonTable.getTableInfo().getFactTable().getTableName());
+    if (partitionNames != null) {
+      CarbonInputFormat.setPartitionsToPrune(conf, partitionNames);
+    }
+    CarbonInputFormat.setUnmanagedTable(conf, carbonTable.getTableInfo().isUnManagedTable());
+    CarbonProjection columnProjection = new CarbonProjection(projectionColumns);
+    return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
+        filterExpression, columnProjection, dataMapJob);
+  }
+
+  private static <V> CarbonTableInputFormat<V> createInputFormat(
+      Configuration conf,
+      AbsoluteTableIdentifier identifier,
+      Expression filterExpression,
+      CarbonProjection columnProjection,
+      DataMapJob dataMapJob) throws InvalidConfigurationException, IOException {
+    CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>();
+    CarbonInputFormat.setTablePath(
+        conf,
+        identifier.appendWithLocalPrefix(identifier.getTablePath()));
+    CarbonInputFormat.setQuerySegment(conf, identifier);
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonInputFormat.setColumnProjection(conf, columnProjection);
+    if (dataMapJob != null &&
+        Boolean.valueOf(CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT))) {
+      CarbonInputFormat.setDataMapJob(conf, dataMapJob);
+    }
+    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+    CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
+    if (carbonSessionInfo != null) {
+      String tableUniqueKey = identifier.getDatabaseName() + "." + identifier.getTableName();
+      String validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+          tableUniqueKey;
+      CarbonInputFormat.setValidateSegmentsToAccess(
+          conf,
+          Boolean.valueOf(carbonSessionInfo.getThreadParams().getProperty(
+              validateInputSegmentsKey, "true")));
+      String queryOnPreAggStreamingKey = CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING +
+          tableUniqueKey;
+      boolean queryOnPreAggStreaming = Boolean.valueOf(carbonSessionInfo.getThreadParams()
+          .getProperty(queryOnPreAggStreamingKey, "false"));
+      String inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey;
+      CarbonInputFormat.setValidateSegmentsToAccess(conf,
+          Boolean.valueOf(carbonSessionInfo.getThreadParams()
+              .getProperty(validateInputSegmentsKey, "true")));
+      CarbonInputFormat.setQuerySegment(
+          conf,
+          carbonSessionInfo.getThreadParams().getProperty(
+              inputSegmentsKey,
+              CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")));
+      if (queryOnPreAggStreaming) {
+        CarbonInputFormat.setAccessStreamingSegments(conf, true);
+        carbonSessionInfo.getThreadParams().removeProperty(queryOnPreAggStreamingKey);
+        carbonSessionInfo.getThreadParams().removeProperty(inputSegmentsKey);
+        carbonSessionInfo.getThreadParams().removeProperty(validateInputSegmentsKey);
+      }
+    }
+    return format;
+  }
+
   public static String createJobTrackerID(java.util.Date date) {
     return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 4c9e417..1cf2369 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -27,11 +27,8 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.SingleTableProvider;
-import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
@@ -132,22 +129,16 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
   private QueryModel getQueryModel(Configuration configuration, String path)
       throws IOException, InvalidConfigurationException {
     CarbonTable carbonTable = getCarbonTable(configuration, path);
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
-    // getting the table absoluteTableIdentifier from the carbonTable
-    // to avoid unnecessary deserialization
-
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
-
     String projectionString = getProjection(configuration, carbonTable,
         identifier.getCarbonTableIdentifier().getTableName());
     String[] projectionColumns = projectionString.split(",");
-    QueryModel queryModel = carbonTable.createQueryWithProjection(
-        projectionColumns, new DataTypeConverterImpl());
-    // set the filter to the query model in order to filter blocklet before scan
-    Expression filter = getFilterPredicates(configuration);
-    carbonTable.processFilterExpression(filter, null, null);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, tableProvider);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
+    QueryModel queryModel =
+        new QueryModelBuilder(carbonTable)
+            .projectColumns(projectionColumns)
+            .filterExpression(getFilterPredicates(configuration))
+            .dataConverter(new DataTypeConverterImpl())
+            .build();
 
     return queryModel;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index ddacdb6..7083c54 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -119,7 +119,8 @@ class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test timeseries table selection 2") {
-    val df = sql("SELECT TIMESERIES(mytime,'hour') FROM mainTable GROUP BY TIMESERIES(mytime,'hour')")
+    val df = sql("SELECT TIMESERIES(mytime,'hour') FROM mainTable " +
+                 "GROUP BY TIMESERIES(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 7dc7493..b55fa75 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -17,48 +17,86 @@
 
 package org.apache.carbondata.spark.testsuite.detailquery
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonSession, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.DataGenerator
+
 /**
- * Test Class for detailed query on multiple datatypes
+ * Test Suite for search mode
  */
-
 class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
 
-  override def beforeAll {
-    sql("CREATE TABLE alldatatypestable (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
-    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE alldatatypestable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+  val numRows = 500 * 1000
+  override def beforeAll = {
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
+    sql("DROP TABLE IF EXISTS main")
 
-    sql("CREATE TABLE alldatatypestable_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','")
-    sql(s"""LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO TABLE alldatatypestable_hive""")
+    val df = DataGenerator.generateDataFrame(sqlContext.sparkSession, numRows)
+    df.write
+      .format("carbondata")
+      .option("tableName", "main")
+      .option("table_blocksize", "5")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
 
+  override def afterAll = {
+    sql("DROP TABLE IF EXISTS main")
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
+  }
+
+  private def sparkSql(sql: String): Seq[Row] = {
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].sparkSql(sql).collect()
+  }
+
+  private def checkSearchAnswer(query: String) = {
+    checkAnswer(sql(query), sparkSql(query))
   }
 
   test("SearchMode Query: row result") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
-        checkAnswer(
-      sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"),
-      sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'"))
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
-      CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT)
+    checkSearchAnswer("select * from main where city = 'city3'")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
-          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
   }
+
   test("SearchMode Query: vector result") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true")
-    checkAnswer(
-      sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"),
-      sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'"))
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
-      CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT)
+    checkSearchAnswer("select * from main where city = 'city3'")
   }
 
-  override def afterAll {
-    sql("drop table alldatatypestable")
-    sql("drop table alldatatypestable_hive")
+  test("equal filter") {
+    checkSearchAnswer("select id from main where id = '100'")
+    checkSearchAnswer("select id from main where planet = 'planet100'")
   }
+
+  test("greater and less than filter") {
+    checkSearchAnswer("select id from main where m2 < 4")
+  }
+
+  test("IN filter") {
+    checkSearchAnswer("select id from main where id IN ('40', '50', '60')")
+  }
+
+  test("expression filter") {
+    checkSearchAnswer("select id from main where length(id) < 2")
+  }
+
+  test("filter with limit") {
+    checkSearchAnswer("select id from main where id = '3' limit 10")
+    checkSearchAnswer("select id from main where length(id) < 2 limit 10")
+  }
+
+  test("aggregate query") {
+    checkSearchAnswer("select city, sum(m1) from main where m2 < 10 group by city")
+  }
+
+  test("aggregate query with datamap and fallback to SparkSQL") {
+    sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ")
+    checkSearchAnswer("select city, count(*) from main group by city")
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 6f248d2..b985459 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -76,7 +76,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     internalCompute(split, context)
   }
 
-  private def getConf: Configuration = {
+  def getConf: Configuration = {
     val configuration = new Configuration(false)
     val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
       .unCompressByte(confBytes))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 31d3715..6d67daf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
-import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index a2542ab..4bfdd3b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -198,16 +198,6 @@ class NewCarbonDataLoadRDD[K, V](
     CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
   }
 
-  private def getConf = {
-    val configuration = new Configuration(false)
-    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
-      .unCompressByte(confBytes))
-    val ois = new ObjectInputStream(bai)
-    configuration.readFields(ois)
-    ois.close()
-    configuration
-  }
-
   override def getPartitions: Array[Partition] = {
     blocksGroupBy.zipWithIndex.map { b =>
       new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
@@ -359,16 +349,6 @@ class NewDataFrameLoaderRDD[K, V](
     CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
   }
 
-  private def getConf = {
-    val configuration = new Configuration(false)
-    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
-      .unCompressByte(confBytes))
-    val ois = new ObjectInputStream(bai)
-    configuration.readFields(ois)
-    ois.close()
-    configuration
-  }
-
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val hadoopConf = getConf

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 46e1be0..aca2d3c 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -45,6 +45,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-search</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
     </dependency>
@@ -262,47 +267,47 @@
       </build>
     </profile>
     <profile>
-    <id>spark-2.2</id>
-    <activation>
-      <activeByDefault>true</activeByDefault>
-    </activation>
-    <properties>
-      <spark.version>2.2.1</spark.version>
-      <scala.binary.version>2.11</scala.binary.version>
-      <scala.version>2.11.8</scala.version>
-    </properties>
-    <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <exclude>src/main/spark2.1</exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>3.0.0</version>
-        <executions>
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
+      <id>spark-2.2</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
-              <sources>
-                <source>src/main/spark2.2</source>
-              </sources>
+              <excludes>
+                <exclude>src/main/spark2.1</exclude>
+              </excludes>
             </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-    </build>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala
new file mode 100644
index 0000000..64c4e14
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+object DataGenerator {
+  // Table schema:
+  // +-------------+-----------+-------------+-------------+------------+
+  // | Column name | Data type | Cardinality | Column type | Dictionary |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | id          | string    | 100,000,000 | dimension   | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | city        | string    | 6           | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | country     | string    | 6           | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | planet      | string    | 100,000     | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m1          | short     | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m2          | int       | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m3          | big int   | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m4          | double    | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m5          | decimal   | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  /**
+   * generate DataFrame with above table schema
+   *
+   * @param spark SparkSession
+   * @return Dataframe of test data
+   */
+  def generateDataFrame(spark: SparkSession, totalNum: Int): DataFrame = {
+    val rdd = spark.sparkContext
+      .parallelize(1 to totalNum, 4)
+      .map { x =>
+        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 100000,
+          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+          BigDecimal.valueOf(x.toDouble / 11))
+      }.map { x =>
+      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+    }
+
+    val schema = StructType(
+      Seq(
+        StructField("id", StringType, nullable = false),
+        StructField("city", StringType, nullable = false),
+        StructField("country", StringType, nullable = false),
+        StructField("planet", StringType, nullable = false),
+        StructField("m1", ShortType, nullable = false),
+        StructField("m2", IntegerType, nullable = false),
+        StructField("m3", LongType, nullable = false),
+        StructField("m4", DoubleType, nullable = false),
+        StructField("m5", DecimalType(30, 10), nullable = false)
+      )
+    )
+
+    val df = spark.createDataFrame(rdd, schema)
+
+    // scalastyle:off println
+    println(s"Start generate ${df.count} records, schema: ${df.schema}")
+    // scalastyle:on println
+
+    df
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index e29ee46..279e7b0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -18,16 +18,20 @@
 package org.apache.carbondata.store
 
 import java.io.IOException
+import java.net.InetAddress
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{CarbonInputMetrics, SparkConf}
+import org.apache.spark.rpc.{Master, Worker}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
@@ -36,8 +40,9 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD
  * with CarbonData query optimization capability
  */
 @InterfaceAudience.Internal
-private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
+class SparkCarbonStore extends MetaCachedCarbonStore {
   private var session: SparkSession = _
+  private var master: Master = _
 
   /**
    * Initialize SparkCarbonStore
@@ -54,10 +59,17 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
       .getOrCreateCarbonSession()
   }
 
+  def this(sparkSession: SparkSession) = {
+    this()
+    session = sparkSession
+  }
+
   @throws[IOException]
   override def scan(
       path: String,
       projectColumns: Array[String]): java.util.Iterator[CarbonRow] = {
+    require(path != null)
+    require(projectColumns != null)
     scan(path, projectColumns, null)
   }
 
@@ -95,4 +107,45 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
       .asJava
   }
 
+  def startSearchMode(): Unit = {
+    master = new Master(session.sparkContext.getConf)
+    master.startService()
+    startAllWorkers()
+  }
+
+  def stopSearchMode(): Unit = {
+    master.stopAllWorkers()
+    master.stopService()
+    master = null
+  }
+
+  /** search mode */
+  def search(
+      table: CarbonTable,
+      projectColumns: Array[String],
+      filter: Expression,
+      globalLimit: Long,
+      localLimit: Long): java.util.Iterator[CarbonRow] = {
+    if (master == null) {
+      throw new IllegalStateException("search mode is not started")
+    }
+    master.search(table, projectColumns, filter, globalLimit, localLimit)
+      .iterator
+      .asJava
+  }
+
+  private def startAllWorkers(): Array[Int] = {
+    // TODO: how to ensure task is sent to every executor?
+    val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size
+    val masterIp = InetAddress.getLocalHost.getHostAddress
+    session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors).mapPartitions { f =>
+      // start worker
+      Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
+      new Iterator[Int] {
+        override def hasNext: Boolean = false
+        override def next(): Int = 1
+      }
+    }.collect()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 00e0aed..ecf2088 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -42,7 +42,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataE
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
-
 /**
  * Carbon Environment for unified context
  */
@@ -62,6 +61,14 @@ class CarbonEnv {
   var initialized = false
 
   def init(sparkSession: SparkSession): Unit = {
+    val properties = CarbonProperties.getInstance()
+    var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+    if (storePath == null) {
+      storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+      properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+    }
+    LOGGER.info(s"Initializing CarbonEnv, store location: $storePath")
+
     sparkSession.udf.register("getTupleId", () => "")
     // added for handling preaggregate table creation. when user will fire create ddl for
     // create table we are adding a udf so no need to apply PreAggregate rules.
@@ -94,13 +101,6 @@ class CarbonEnv {
         // add session params after adding DefaultCarbonParams
         config.addDefaultCarbonSessionParams()
         carbonMetastore = {
-          val properties = CarbonProperties.getInstance()
-          var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
-          if (storePath == null) {
-            storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
-            properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
-          }
-          LOGGER.info(s"carbon env initial: $storePath")
           // trigger event for CarbonEnv create
           val operationContext = new OperationContext
           val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
@@ -113,6 +113,7 @@ class CarbonEnv {
         initialized = true
       }
     }
+    LOGGER.info("Initialize CarbonEnv completed...")
   }
 }