You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/04/21 15:44:32 UTC

[1/2] carbondata git commit: [CARBONDATA-2323]Distributed search mode using RPC

Repository: carbondata
Updated Branches:
  refs/heads/master 6bef57b8b -> 3ff574d29


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 1d5a82d..c484a55 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -26,15 +26,22 @@ import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{Command, Union}
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.profiler.{Profiler, SQLStart}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.scan.expression.LiteralExpression
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.store.SparkCarbonStore
 import org.apache.carbondata.streaming.CarbonStreamingQueryListener
 
 /**
@@ -80,7 +87,44 @@ class CarbonSession(@transient val sc: SparkContext,
     new CarbonSession(sparkContext, Some(sharedState), useHiveMetaStore)
   }
 
+  /**
+   * Run search mode if enabled, otherwise run SparkSQL
+   */
   override def sql(sqlText: String): DataFrame = {
+    withProfiler(
+      sqlText,
+      (qe, sse) => {
+        if (isSearchModeEnabled) {
+          try {
+            trySearchMode(qe, sse)
+          } catch {
+            case e: Exception =>
+              logError(String.format(
+                "Exception when executing search mode: %s, fallback to SparkSQL", e.getMessage))
+              new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
+          }
+        } else {
+          new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
+        }
+      }
+    )
+  }
+
+  private def isSearchModeEnabled = carbonStore != null
+
+  /**
+   * Run SparkSQL directly
+   */
+  def sparkSql(sqlText: String): DataFrame = {
+    withProfiler(
+      sqlText,
+      (qe, sse) => new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
+    )
+  }
+
+  private def withProfiler(
+      sqlText: String,
+      generateDF: (QueryExecution, SQLStart) => DataFrame): DataFrame = {
     val sse = SQLStart(sqlText, CarbonSession.statementId.getAndIncrement())
     CarbonSession.threadStatementId.set(sse.statementId)
     sse.startTime = System.currentTimeMillis()
@@ -92,16 +136,12 @@ class CarbonSession(@transient val sc: SparkContext,
       val qe = sessionState.executePlan(logicalPlan)
       qe.assertAnalyzed()
       sse.isCommand = qe.analyzed match {
-        case c: Command =>
-          true
-        case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
-          true
-        case _ =>
-          false
+        case c: Command => true
+        case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => true
+        case _ => false
       }
       sse.analyzerEnd = System.currentTimeMillis()
-
-      new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
+      generateDF(qe, sse)
     } finally {
       Profiler.invokeIfEnable {
         if (sse.isCommand) {
@@ -113,6 +153,75 @@ class CarbonSession(@transient val sc: SparkContext,
       }
     }
   }
+
+  /**
+   * If the query is a simple query with filter, we will try to use Search Mode,
+   * otherwise execute in SparkSQL
+   */
+  private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = {
+    val analyzed = qe.analyzed
+    analyzed match {
+      case _@Project(columns, _@Filter(expr, s: SubqueryAlias))
+        if s.child.isInstanceOf[LogicalRelation] &&
+           s.child.asInstanceOf[LogicalRelation].relation
+             .isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        runSearch(analyzed, columns, expr, s.child.asInstanceOf[LogicalRelation])
+      case gl@GlobalLimit(_, ll@LocalLimit(_, p@Project(columns, _@Filter(expr, s: SubqueryAlias))))
+        if s.child.isInstanceOf[LogicalRelation] &&
+           s.child.asInstanceOf[LogicalRelation].relation
+             .isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        val logicalRelation = s.child.asInstanceOf[LogicalRelation]
+        runSearch(analyzed, columns, expr, logicalRelation, gl.maxRows, ll.maxRows)
+      case _ =>
+        new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
+    }
+  }
+
+  private var carbonStore: SparkCarbonStore = _
+
+  def startSearchMode(): Unit = {
+    CarbonProperties.enableSearchMode(true)
+    if (carbonStore == null) {
+      carbonStore = new SparkCarbonStore(this)
+      carbonStore.startSearchMode()
+    }
+  }
+
+  def stopSearchMode(): Unit = {
+    CarbonProperties.enableSearchMode(false)
+    if (carbonStore != null) {
+      try {
+        carbonStore.stopSearchMode()
+        carbonStore = null
+      } catch {
+        case e: RuntimeException =>
+          LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+            .error(s"Stop search mode failed: ${e.getMessage}")
+      }
+    }
+  }
+
+  private def runSearch(
+      logicalPlan: LogicalPlan,
+      columns: Seq[NamedExpression],
+      expr: Expression,
+      relation: LogicalRelation,
+      maxRows: Option[Long] = None,
+      localMaxRows: Option[Long] = None): DataFrame = {
+    val rows = carbonStore.search(
+        relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable,
+        columns.map(_.name).toArray,
+        if (expr != null) CarbonFilters.transformExpression(expr) else null,
+        maxRows.getOrElse(Long.MaxValue),
+        localMaxRows.getOrElse(Long.MaxValue))
+    val output = new java.util.ArrayList[Row]()
+    while (rows.hasNext) {
+      val row = rows.next()
+      output.add(Row.fromSeq(row.getData))
+    }
+    createDataFrame(output, logicalPlan.schema)
+  }
+
 }
 
 object CarbonSession {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 2290941..b267ca2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.CarbonEndsWith
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
+import org.apache.spark.sql.sources.Filter
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 287e052..7d420ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
     <module>integration/spark-common-test</module>
     <module>datamap/examples</module>
     <module>store/sdk</module>
+    <module>store/search</module>
     <module>assembly</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/processing/pom.xml
----------------------------------------------------------------------
diff --git a/processing/pom.xml b/processing/pom.xml
index 648810d..ef42471 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -39,7 +39,7 @@
       <artifactId>carbondata-core</artifactId>
       <version>${project.version}</version>
     </dependency>
-	  <dependency>
+    <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
       <version>2.2.1</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java
deleted file mode 100644
index 609fec6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.dictionary;
-
-import java.util.Map;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.devapi.GeneratingBiDictionary;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
-public class InMemBiDictionary<K, V> extends GeneratingBiDictionary<K, V> {
-
-  private BiMap<K, V> biMap;
-
-  /**
-   * Constructor to create a new dictionary, dictionary key will be generated by specified generator
-   * @param generator
-   */
-  public InMemBiDictionary(DictionaryGenerator generator) {
-    super(generator);
-    biMap = HashBiMap.create();
-  }
-
-  /**
-   * Constructor to create a pre-created dictionary
-   * @param preCreatedDictionary
-   */
-  public InMemBiDictionary(Map<K, V> preCreatedDictionary) {
-    super(new DictionaryGenerator<K, V>() {
-      @Override
-      public K generateKey(V value) throws DictionaryGenerationException {
-        // Since dictionary is provided by preCreated, normally it should not come here
-        throw new DictionaryGenerationException(
-            "encounter new dictionary value in pre-created dictionary:" + value);
-      }
-    });
-    biMap = HashBiMap.create(preCreatedDictionary);
-  }
-
-  @Override
-  public K getKey(V value) {
-    return biMap.inverse().get(value);
-  }
-
-  @Override
-  public V getValue(K key) {
-    return biMap.get(key);
-  }
-
-  @Override
-  protected void put(K key, V value) {
-    // dictionary is immutable, it is append only
-    assert (!biMap.containsKey(key));
-    assert (!biMap.containsValue(value));
-    biMap.put(key, value);
-  }
-
-  @Override
-  public int size() {
-    return biMap.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 306019c..6a6d834 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -100,9 +101,14 @@ public class CarbonCompactionExecutor {
     List<RawResultIterator> resultList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<TableBlockInfo> list = null;
-    queryModel = carbonTable.createQueryModelWithProjectAllColumns(dataTypeConverter);
-    queryModel.setReadPageByPage(enablePageLevelReaderForCompaction());
-    queryModel.setForcedDetailRawQuery(true);
+    QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
+        .projectAllColumns()
+        .dataConverter(dataTypeConverter)
+        .enableForcedDetailRawQuery();
+    if (enablePageLevelReaderForCompaction()) {
+      builder.enableReadPageByPage();
+    }
+    queryModel = builder.build();
     // iterate each seg ID
     for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
       String segmentId = taskMap.getKey();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
index 173a5c0..daabd24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
 import org.apache.carbondata.core.util.DataTypeConverter;
 
@@ -50,8 +51,11 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
       String segmentId, DataTypeConverter converter)
       throws QueryExecutionException, IOException {
     List<TableBlockInfo> list = null;
-    queryModel = carbonTable.createQueryModelWithProjectAllColumns(converter);
-    queryModel.setForcedDetailRawQuery(true);
+    queryModel = new QueryModelBuilder(carbonTable)
+        .projectAllColumns()
+        .dataConverter(converter)
+        .enableForcedDetailRawQuery()
+        .build();
     List<PartitionSpliterRawResultIterator> resultList
         = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java b/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
deleted file mode 100644
index 88ff377..0000000
--- a/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.dictionary;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class InMemBiDictionaryTest {
-
-  /**
-   * test pre-created dictionary
-   */
-  @Test public void testPreCreated() throws Exception {
-    Map<Integer, String> map = new HashMap<>();
-    map.put(1, "amy");
-    map.put(2, "bob");
-    BiDictionary<Integer, String> dict = new InMemBiDictionary<>(map);
-    Assert.assertEquals(1, dict.getKey("amy").intValue());
-    Assert.assertEquals(2, dict.getKey("bob").intValue());
-    Assert.assertEquals("amy", dict.getValue(1));
-    Assert.assertEquals("bob", dict.getValue(2));
-    Assert.assertEquals(2, dict.size());
-    try {
-      dict.getOrGenerateKey("cat");
-      Assert.fail("add dictionary successfully");
-    } catch (Exception e) {
-      // test pass
-    }
-  }
-
-  /**
-   * test generating dictionary on the fly
-   */
-  @Test public void testGenerateDict() throws Exception {
-    BiDictionary<Integer, String> dict = new InMemBiDictionary<>(
-        new DictionaryGenerator<Integer, String>() {
-          int sequence = 1;
-          @Override
-          public Integer generateKey(String value) throws DictionaryGenerationException {
-            return sequence++;
-          }
-        });
-    Assert.assertEquals(1, dict.getOrGenerateKey("amy").intValue());
-    Assert.assertEquals(2, dict.getOrGenerateKey("bob").intValue());
-    Assert.assertEquals(1, dict.getKey("amy").intValue());
-    Assert.assertEquals(2, dict.getKey("bob").intValue());
-    Assert.assertEquals("amy", dict.getValue(1));
-    Assert.assertEquals("bob", dict.getValue(2));
-    Assert.assertEquals(2, dict.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/pom.xml
----------------------------------------------------------------------
diff --git a/store/search/pom.xml b/store/search/pom.xml
new file mode 100644
index 0000000..00184ca
--- /dev/null
+++ b/store/search/pom.xml
@@ -0,0 +1,78 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-search</artifactId>
+  <name>Apache CarbonData :: Search </name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-hadoop</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
new file mode 100644
index 0000000..8296247
--- /dev/null
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.worker;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport;
+
+import org.apache.spark.search.SearchRequest;
+import org.apache.spark.search.SearchResult;
+import org.apache.spark.search.ShutdownRequest;
+import org.apache.spark.search.ShutdownResponse;
+
+/**
+ * Thread runnable for handling SearchRequest from master.
+ */
+@InterfaceAudience.Internal
+public class SearchRequestHandler {
+
+  private static final LogService LOG =
+      LogServiceFactory.getLogService(SearchRequestHandler.class.getName());
+
+  public SearchResult handleSearch(SearchRequest request) {
+    try {
+      List<CarbonRow> rows = handleRequest(request);
+      return createSuccessResponse(request, rows);
+    } catch (IOException | InterruptedException e) {
+      LOG.error(e);
+      return createFailureResponse(request, e);
+    }
+  }
+
+  public ShutdownResponse handleShutdown(ShutdownRequest request) {
+    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
+  }
+
+  /**
+   * Builds {@link QueryModel} and read data from files
+   */
+  private List<CarbonRow> handleRequest(SearchRequest request)
+      throws IOException, InterruptedException {
+    TableInfo tableInfo = request.tableInfo();
+    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+    QueryModel queryModel = createQueryModel(table, request);
+
+    // in search mode, plain reader is better since it requires less memory
+    queryModel.setVectorReader(false);
+    CarbonMultiBlockSplit mbSplit = request.split().value();
+    long limit = request.limit();
+    long rowCount = 0;
+
+    // If there is FGDataMap, prune the split by applying FGDataMap
+    queryModel = tryPruneByFGDataMap(table, queryModel, mbSplit);
+
+    // In search mode, reader will read multiple blocks by using a thread pool
+    CarbonRecordReader<CarbonRow> reader =
+        new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
+    reader.initialize(mbSplit, null);
+
+    // read all rows by the reader
+    List<CarbonRow> rows = new LinkedList<>();
+    try {
+      // loop to read required number of rows.
+      // By default, if user does not specify the limit value, limit is Long.MaxValue
+      while (reader.nextKeyValue() && rowCount < limit) {
+        rows.add(reader.getCurrentValue());
+        rowCount++;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      reader.close();
+    }
+    return rows;
+  }
+
+  /**
+   * If there is FGDataMap defined for this table and filter condition in the query,
+   * prune the splits by the DataMap and set the pruned split into the QueryModel and return
+   */
+  private QueryModel tryPruneByFGDataMap(
+      CarbonTable table, QueryModel queryModel, CarbonMultiBlockSplit mbSplit) throws IOException {
+    DataMapExprWrapper wrapper =
+        DataMapChooser.get().choose(table, queryModel.getFilterExpressionResolverTree());
+
+    if (wrapper.getDataMapType() == DataMapLevel.FG) {
+      List<Segment> segments = new LinkedList<>();
+      for (CarbonInputSplit split : mbSplit.getAllSplits()) {
+        segments.add(Segment.toSegment(
+            split.getSegmentId(), new LatestFilesReadCommittedScope(table.getTablePath())));
+      }
+      List<ExtendedBlocklet> prunnedBlocklets = wrapper.prune(segments, null);
+
+      List<String> pathToRead = new LinkedList<>();
+      for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
+        pathToRead.add(prunnedBlocklet.getPath());
+      }
+
+      List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
+      List<TableBlockInfo> blockToRead = new LinkedList<>();
+      for (TableBlockInfo block : blocks) {
+        if (pathToRead.contains(block.getFilePath())) {
+          blockToRead.add(block);
+        }
+      }
+      queryModel.setTableBlockInfos(blockToRead);
+    }
+    return queryModel;
+  }
+
+  private QueryModel createQueryModel(CarbonTable table, SearchRequest request) {
+    String[] projectColumns = request.projectColumns();
+    Expression filter = null;
+    if (request.filterExpression() != null) {
+      filter = request.filterExpression();
+    }
+    return new QueryModelBuilder(table)
+        .projectColumns(projectColumns)
+        .filterExpression(filter)
+        .build();
+  }
+
+  /**
+   * create a failure response
+   */
+  private SearchResult createFailureResponse(SearchRequest request, Throwable throwable) {
+    return new SearchResult(request.queryId(), Status.FAILURE.ordinal(), throwable.getMessage(),
+        new Object[0][]);
+  }
+
+  /**
+   * create a success response with result rows
+   */
+  private SearchResult createSuccessResponse(SearchRequest request, List<CarbonRow> rows) {
+    Iterator<CarbonRow> itor = rows.iterator();
+    Object[][] output = new Object[rows.size()][];
+    int i = 0;
+    while (itor.hasNext()) {
+      output[i++] = itor.next().getData();
+    }
+    return new SearchResult(request.queryId(), Status.SUCCESS.ordinal(), "", output);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
new file mode 100644
index 0000000..71df3e0
--- /dev/null
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/Status.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.worker;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Status of RPC response
+ */
+@InterfaceAudience.Internal
+public enum Status {
+  SUCCESS, FAILURE
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
new file mode 100644
index 0000000..df08ac4
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.io.IOException
+import java.net.InetAddress
+import java.util.{List => JList, Map => JMap, Objects, Random, Set => JSet, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{SecurityManager, SerializableWritable, SparkConf}
+import org.apache.spark.rpc.netty.NettyRpcEnvFactory
+import org.apache.spark.search._
+import org.apache.spark.util.ThreadUtils
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.block.Distributable
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.store.worker.Status
+
+/**
+ * Master of CarbonSearch.
+ * It listens to [[Master.port]] to wait for worker to register.
+ * And it provides search API to fire RPC call to workers.
+ */
+@InterfaceAudience.Internal
+class Master(sparkConf: SparkConf, port: Int) {
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // worker host address map to EndpointRef
+  private val workers = mutable.Map[String, RpcEndpointRef]()
+
+  private val random = new Random
+
+  private var rpcEnv: RpcEnv = _
+
+  def this(sparkConf: SparkConf) = {
+    this(sparkConf, CarbonProperties.getSearchMasterPort)
+  }
+
+  /** start service and listen on port passed in constructor */
+  def startService(): Unit = {
+    if (rpcEnv == null) {
+      new Thread(new Runnable {
+        override def run(): Unit = {
+          val hostAddress = InetAddress.getLocalHost.getHostAddress
+          val config = RpcEnvConfig(
+            sparkConf, "registry-service", hostAddress, "", CarbonProperties.getSearchMasterPort,
+            new SecurityManager(sparkConf), clientMode = false)
+          rpcEnv = new NettyRpcEnvFactory().create(config)
+          val registryEndpoint: RpcEndpoint = new Registry(rpcEnv, Master.this)
+          rpcEnv.setupEndpoint("registry-service", registryEndpoint)
+          rpcEnv.awaitTermination()
+        }
+      }).start()
+    }
+  }
+
+  def stopService(): Unit = {
+    if (rpcEnv != null) {
+      rpcEnv.shutdown()
+      rpcEnv = null
+    }
+  }
+
+  def stopAllWorkers(): Unit = {
+    val futures = workers.mapValues { ref =>
+      ref.ask[ShutdownResponse](ShutdownRequest("user"))
+    }
+    futures.foreach { case (hostname, future) =>
+      ThreadUtils.awaitResult(future, Duration.apply("10s"))
+      future.value match {
+        case Some(result) =>
+          result match {
+            case Success(response) => workers.remove(hostname)
+            case Failure(throwable) => throw new IOException(throwable.getMessage)
+          }
+        case None => throw new ExecutionTimeoutException
+      }
+    }
+  }
+
+  /** A new searcher is trying to register, add it to the map and connect to this searcher */
+  def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = {
+    LOG.info(s"Receive Register request from worker ${request.hostAddress}:${request.port} " +
+             s"with ${request.cores} cores")
+    val workerId = UUID.randomUUID().toString
+    val workerHostAddress = request.hostAddress
+    val workerPort = request.port
+    LOG.info(s"connecting to worker ${request.hostAddress}:${request.port}, workerId $workerId")
+
+    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
+      RpcAddress(workerHostAddress, workerPort), "search-service")
+
+    workers.put(workerHostAddress, endPointRef)
+    LOG.info(s"worker ${request.hostAddress}:${request.port} added")
+    RegisterWorkerResponse(workerId)
+  }
+
+  private def getEndpoint(workerIP: String) = {
+    try {
+      workers(workerIP)
+    } catch {
+      case e: NoSuchElementException =>
+        // no local worker available, choose one worker randomly
+        val index = new Random().nextInt(workers.size)
+        workers.toSeq(index)._2
+    }
+  }
+
+  /**
+   * Execute search by firing RPC call to worker, return the result rows
+   * @param table table to search
+   * @param columns projection column names
+   * @param filter filter expression
+   * @param globalLimit max number of rows required in Master
+   * @param localLimit max number of rows required in Worker
+   * @return
+   */
+  def search(table: CarbonTable, columns: Array[String], filter: Expression,
+      globalLimit: Long, localLimit: Long): Array[CarbonRow] = {
+    Objects.requireNonNull(table)
+    Objects.requireNonNull(columns)
+    if (globalLimit < 0 || localLimit < 0) {
+      throw new IllegalArgumentException("limit should be positive")
+    }
+    if (workers.isEmpty) {
+      throw new IOException("No worker is available")
+    }
+
+    val queryId = random.nextInt
+    // prune data and get a mapping of worker hostname to list of blocks,
+    // then add these blocks to the SearchRequest and fire the RPC call
+    val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter)
+    val futures = nodeBlockMapping.asScala.map { case (hostname, blocks) =>
+      // Build a SearchRequest
+      val split = new SerializableWritable[CarbonMultiBlockSplit](
+        new CarbonMultiBlockSplit(blocks, hostname))
+      val request = SearchRequest(queryId, split, table.getTableInfo, columns, filter, localLimit)
+
+      // fire RPC to worker asynchronously
+      getEndpoint(hostname).ask[SearchResult](request)
+    }
+    // get all results from RPC response and return to caller
+    var rowCount = 0
+    val output = new ArrayBuffer[CarbonRow]
+
+    // Loop to get the result of each Worker
+    futures.foreach { future: Future[SearchResult] =>
+
+      // if we have enough data already, we do not need to collect more result
+      if (rowCount < globalLimit) {
+        // wait on worker for 10s
+        ThreadUtils.awaitResult(future, Duration.apply("10s"))
+        future.value match {
+          case Some(response: Try[SearchResult]) =>
+            response match {
+              case Success(result) =>
+                if (result.queryId != queryId) {
+                  throw new IOException(
+                    s"queryId in response does not match request: ${ result.queryId } != $queryId")
+                }
+                if (result.status != Status.SUCCESS.ordinal()) {
+                  throw new IOException(s"failure in worker: ${ result.message }")
+                }
+
+                val itor = result.rows.iterator
+                while (itor.hasNext && rowCount < globalLimit) {
+                  output += new CarbonRow(itor.next())
+                  rowCount = rowCount + 1
+                }
+
+              case Failure(e) =>
+                throw new IOException(s"exception in worker: ${ e.getMessage }")
+            }
+          case None =>
+            throw new ExecutionTimeoutException()
+        }
+      }
+    }
+    output.toArray
+  }
+
+  /**
+   * Prune data by using CarbonInputFormat.getSplit
+   * Return a mapping of hostname to list of block
+   */
+  private def pruneBlock(
+      table: CarbonTable,
+      columns: Array[String],
+      filter: Expression): JMap[String, JList[Distributable]] = {
+    val jobConf = new JobConf(new Configuration)
+    val job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonTableInputFormat(
+      job, table, columns, filter, null, null)
+    val splits = format.getSplits(job)
+    val distributables = splits.asScala.map { split =>
+      split.asInstanceOf[Distributable]
+    }
+    CarbonLoaderUtil.nodeBlockMapping(
+      distributables.asJava,
+      -1,
+      workers.keySet.toList.asJava,
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST)
+  }
+
+  /** return hostname of all workers */
+  def getWorkers: JSet[String] = workers.keySet.asJava
+}
+
+// Exception if execution timed out in search mode
+class ExecutionTimeoutException extends RuntimeException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
new file mode 100644
index 0000000..39be35f
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Worker.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import java.io.IOException
+import java.net.{BindException, InetAddress}
+
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.rpc.netty.NettyRpcEnvFactory
+import org.apache.spark.search.{RegisterWorkerRequest, RegisterWorkerResponse, Searcher}
+import org.apache.spark.util.ThreadUtils
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+@InterfaceAudience.Internal
+object Worker {
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private var hostAddress = InetAddress.getLocalHost.getHostAddress
+  private var port: Int = _
+
+  def init(masterHostAddress: String, masterPort: Int): Unit = {
+    LOG.info(s"initializing worker...")
+    startService()
+    LOG.info(s"registering to master $masterHostAddress:$masterPort")
+    val workerId = registerToMaster(masterHostAddress, masterPort)
+    LOG.info(s"worker registered to master, workerId: $workerId")
+  }
+
+  /**
+   * Start to listen on port [[CarbonProperties.getSearchWorkerPort]]
+   */
+  private def startService(): Unit = {
+    new Thread(new Runnable {
+      override def run(): Unit = {
+        port = CarbonProperties.getSearchWorkerPort
+        val conf = new SparkConf()
+        var rpcEnv: RpcEnv = null
+        var exception: BindException = null
+        var numTry = 100  // we will try to create service at worse case 100 times
+        do {
+          try {
+            LOG.info(s"starting search-service on $hostAddress:$port")
+            val config = RpcEnvConfig(
+              conf, s"worker-$hostAddress", hostAddress, "", port,
+              new SecurityManager(conf), clientMode = false)
+            rpcEnv = new NettyRpcEnvFactory().create(config)
+            numTry = 0
+          } catch {
+            case e: BindException =>
+              // port is occupied, increase the port number and try again
+              exception = e
+              LOG.error(s"start search-service failed: ${e.getMessage}")
+              port = port + 1
+              numTry = numTry - 1
+          }
+        } while (numTry > 0)
+        if (rpcEnv == null) {
+          // we have tried many times, but still failed to find an available port
+          throw exception
+        }
+        val searchEndpoint: RpcEndpoint = new Searcher(rpcEnv)
+        rpcEnv.setupEndpoint("search-service", searchEndpoint)
+        LOG.info("search-service started")
+        rpcEnv.awaitTermination()
+      }
+    }).start()
+  }
+
+  private def registerToMaster(masterHostAddress: String, masterPort: Int): String = {
+    LOG.info(s"trying to register to master $masterHostAddress:$masterPort")
+    val conf = new SparkConf()
+    val config = RpcEnvConfig(conf, "registry-client", masterHostAddress, "", masterPort,
+      new SecurityManager(conf), clientMode = true)
+    val rpcEnv: RpcEnv = new NettyRpcEnvFactory().create(config)
+
+    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(
+      RpcAddress(masterHostAddress, masterPort), "registry-service")
+    val cores = Runtime.getRuntime.availableProcessors()
+
+    val request = RegisterWorkerRequest(hostAddress, port, cores)
+    val future = endPointRef.ask[RegisterWorkerResponse](request)
+    ThreadUtils.awaitResult(future, Duration.apply("10s"))
+    future.value match {
+      case Some(result) =>
+        result match {
+          case Success(response) =>
+            LOG.info("worker registered")
+            response.workerId
+          case Failure(throwable) =>
+            LOG.error(s"worker failed to registered: $throwable")
+            throw new IOException(throwable.getMessage)
+        }
+      case None =>
+        LOG.error("worker register timeout")
+        throw new ExecutionTimeoutException
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/main/scala/org/apache/spark/search/Registry.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Registry.scala b/store/search/src/main/scala/org/apache/spark/search/Registry.scala
new file mode 100644
index 0000000..22e766d
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/search/Registry.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.search
+
+import org.apache.spark.rpc.{Master, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * Registry service implementation. It adds worker to master.
+ */
+class Registry(override val rpcEnv: RpcEnv, master: Master) extends RpcEndpoint {
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  override def onStart(): Unit = {
+    LOG.info("Registry Endpoint started")
+  }
+
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+    case req@RegisterWorkerRequest(_, _, _) =>
+      val response = master.addWorker(req)
+      context.reply(response)
+  }
+
+  override def onStop(): Unit = {
+    LOG.info("Registry Endpoint stopped")
+  }
+
+}
+
+case class RegisterWorkerRequest(
+    hostAddress: String,
+    port: Int,
+    cores: Int)
+
+case class RegisterWorkerResponse(
+    workerId: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/search/Searcher.scala b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
new file mode 100644
index 0000000..4ed796e
--- /dev/null
+++ b/store/search/src/main/scala/org/apache/spark/search/Searcher.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.search
+
+import org.apache.spark.SerializableWritable
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
+import org.apache.carbondata.store.worker.SearchRequestHandler
+
+/**
+ * Search service implementation
+ */
+class Searcher(override val rpcEnv: RpcEnv) extends RpcEndpoint {
+  private val LOG = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def onStart(): Unit = {
+    LOG.info("Searcher Endpoint started")
+  }
+
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+    case req@SearchRequest(_, _, _, _, _, _) =>
+      val response = new SearchRequestHandler().handleSearch(req)
+      context.reply(response)
+
+    case req@ShutdownRequest(_) =>
+      val response = new SearchRequestHandler().handleShutdown(req)
+      context.reply(response)
+
+  }
+
+  override def onStop(): Unit = {
+    LOG.info("Searcher Endpoint stopped")
+  }
+}
+
+// Search request sent from master to worker
+case class SearchRequest(
+    queryId: Int,
+    split: SerializableWritable[CarbonMultiBlockSplit],
+    tableInfo: TableInfo,
+    projectColumns: Array[String],
+    filterExpression: Expression,
+    limit: Long)
+
+// Search result sent from worker to master
+case class SearchResult(
+    queryId: Int,
+    status: Int,
+    message: String,
+    rows: Array[Array[Object]])
+
+// Shutdown request sent from master to worker
+case class ShutdownRequest(
+    reason: String)
+
+// Shutdown response sent from worker to master
+case class ShutdownResponse(
+    status: Int,
+    message: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
----------------------------------------------------------------------
diff --git a/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
new file mode 100644
index 0000000..88d925f
--- /dev/null
+++ b/store/search/src/test/java/org/apache/carbondata/store/SearchServiceTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store;
+
+public class SearchServiceTest {
+//  @Test
+//  public void testStartStopService() throws IOException, ExecutionException, InterruptedException {
+//    Master master = new Master(9999);
+//    master.startService();
+//
+//    Worker worker = Worker.getInstance();
+//    worker.init(InetAddress.getLocalHost().getHostName(), 9999);
+//
+//    Set<String> workers = master.getWorkers();
+//    Assert.assertEquals(1, workers.size());
+//    Assert.assertEquals(InetAddress.getLocalHost().getHostName(), workers.toArray()[0]);
+//
+//    master.stopAllWorkers();
+//    master.stopService();
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
index 69d2a3b..d5f77f4 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -208,8 +208,8 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
 
     isProjectionRequired = new boolean[storageColumns.length];
     projectionMap = new int[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      for (int j = 0; j < projection.length; j++) {
+    for (int j = 0; j < projection.length; j++) {
+      for (int i = 0; i < storageColumns.length; i++) {
         if (storageColumns[i].getColName().equals(projection[j].getColName())) {
           isRequired[i] = true;
           isProjectionRequired[i] = true;


[2/2] carbondata git commit: [CARBONDATA-2323]Distributed search mode using RPC

Posted by ch...@apache.org.
[CARBONDATA-2323]Distributed search mode using RPC

When user gives SQL statement that only includes projection and filter, we can use RPC calls to do distributed scan on the carbon files directly instead of using RDD to do the query. In this mode, RDD overhead like RDD construction and DAG scheduling is avoided.

This closes #2148


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3ff574d2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3ff574d2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3ff574d2

Branch: refs/heads/master
Commit: 3ff574d29155edd71da5273bad0d0236ea50c2bd
Parents: 6bef57b
Author: Jacky Li <ja...@qq.com>
Authored: Thu Apr 19 10:55:48 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Sat Apr 21 23:44:12 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  31 ++-
 .../carbondata/core/indexstore/Blocklet.java    |   2 +
 .../core/metadata/schema/table/CarbonTable.java |  61 -----
 .../scan/executor/QueryExecutorFactory.java     |   5 +-
 .../impl/SearchModeDetailQueryExecutor.java     |  13 +-
 .../SearchModeVectorDetailQueryExecutor.java    |  26 +-
 .../core/scan/model/QueryModelBuilder.java      |  97 ++++++--
 .../carbondata/core/util/CarbonProperties.java  |  41 ++++
 dev/findbugs-exclude.xml                        |   4 +
 .../benchmark/ConcurrentQueryBenchmark.scala    |  11 +-
 .../carbondata/benchmark/DataGenerator.scala    |  83 -------
 .../benchmark/SimpleQueryBenchmark.scala        |   1 +
 .../carbondata/examples/SearchModeExample.scala | 165 +++++++++++++
 .../carbondata/examples/util/ExampleUtils.scala |   1 +
 .../carbondata/hadoop/CarbonInputSplit.java     |   5 +
 .../hadoop/CarbonMultiBlockSplit.java           |  12 +-
 .../hadoop/api/CarbonInputFormat.java           |  47 ++--
 .../readsupport/impl/CarbonRowReadSupport.java  |  51 ++++
 .../hadoop/util/CarbonInputFormatUtil.java      |  83 +++++++
 .../hive/MapredCarbonInputFormat.java           |  23 +-
 .../TestTimeseriesTableSelection.scala          |   3 +-
 .../detailquery/SearchModeTestCase.scala        |  88 +++++--
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   1 -
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  20 --
 integration/spark2/pom.xml                      |  83 ++++---
 .../carbondata/spark/util/DataGenerator.scala   |  85 +++++++
 .../carbondata/store/SparkCarbonStore.scala     |  55 ++++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  17 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 127 +++++++++-
 .../spark/sql/optimizer/CarbonFilters.scala     |   1 +
 pom.xml                                         |   1 +
 processing/pom.xml                              |   2 +-
 .../loading/dictionary/InMemBiDictionary.java   |  80 ------
 .../merger/CarbonCompactionExecutor.java        |  12 +-
 .../partition/spliter/CarbonSplitExecutor.java  |   8 +-
 .../dictionary/InMemBiDictionaryTest.java       |  72 ------
 store/search/pom.xml                            |  78 ++++++
 .../store/worker/SearchRequestHandler.java      | 181 ++++++++++++++
 .../apache/carbondata/store/worker/Status.java  |  28 +++
 .../scala/org/apache/spark/rpc/Master.scala     | 242 +++++++++++++++++++
 .../scala/org/apache/spark/rpc/Worker.scala     | 118 +++++++++
 .../org/apache/spark/search/Registry.scala      |  51 ++++
 .../org/apache/spark/search/Searcher.scala      |  78 ++++++
 .../carbondata/store/SearchServiceTest.java     |  37 +++
 .../streaming/CarbonStreamRecordReader.java     |   4 +-
 46 files changed, 1750 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6ab1ce5..4e324fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.constants;
 
 import java.nio.charset.Charset;
 
+import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.util.CarbonProperty;
 
 public final class CarbonCommonConstants {
@@ -1629,8 +1630,14 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_SYSTEM_FOLDER_LOCATION = "carbon.system.folder.location";
 
+  /**
+   * If set to true, will use CarbonReader to do distributed scan directly instead of using
+   * compute framework like spark, thus avoiding limitation of compute framework like SQL
+   * optimizer and task scheduling overhead.
+   */
   @CarbonProperty
-  public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.mode.enable";
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_ENABLE = "carbon.search.enabled";
 
   public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
 
@@ -1641,10 +1648,30 @@ public final class CarbonCommonConstants {
    * will call Executors.newFixedThreadPool(int nThreads) instead
    */
   @CarbonProperty
-  public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.mode.scan.thread";
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_SCAN_THREAD = "carbon.search.scan.thread";
 
   public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1";
 
+  /**
+   * In search mode, Master will listen on this port for worker registration
+   */
+  public static final String CARBON_SEARCH_MODE_MASTER_PORT = "carbon.search.master.port";
+
+  public static final String CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT = "10020";
+
+  /**
+   * In search mode, Worker will listen on this port for master request like searching.
+   * If Worker failed to start service with this port, it will try to increment the port number
+   * and try to bind again, until it is success
+   */
+  @CarbonProperty
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_MODE_WORKER_PORT = "carbon.search.worker.port";
+
+  public static final String CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT = "10021";
+
+
   /*
    * whether to enable prefetch for rowbatch to enhance row reconstruction during compaction
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
index 052d269..c3eda6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java
@@ -28,8 +28,10 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
  */
 public class Blocklet implements Writable,Serializable {
 
+  /** file path of this blocklet */
   private String blockId;
 
+  /** id to identify the blocklet inside the block (it is a sequential number) */
   private String blockletId;
 
   public Blocklet(String blockId, String blockletId) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index d3eab6c..88e00f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -50,9 +50,7 @@ import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryProjection;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.FileHeader;
@@ -907,65 +905,6 @@ public class CarbonTable implements Serializable {
     return dataSize + indexSize;
   }
 
-  /**
-   * Create a new QueryModel with projection all columns in the table.
-   */
-  public QueryModel createQueryModelWithProjectAllColumns(DataTypeConverter converter) {
-    QueryProjection projection = new QueryProjection();
-
-    List<CarbonDimension> dimensions = getDimensionByTableName(getTableName());
-    for (int i = 0; i < dimensions.size(); i++) {
-      projection.addDimension(dimensions.get(i), i);
-    }
-    List<CarbonMeasure> measures = getMeasureByTableName(getTableName());
-    for (int i = 0; i < measures.size(); i++) {
-      projection.addMeasure(measures.get(i), i);
-    }
-    QueryModel model = QueryModel.newInstance(this);
-    model.setProjection(projection);
-    model.setConverter(converter);
-    return model;
-  }
-
-  /**
-   * Create a new QueryModel with specified projection
-   */
-  public QueryModel createQueryWithProjection(String[] projectionColumnNames,
-      DataTypeConverter converter) {
-    QueryProjection projection = createProjection(projectionColumnNames);
-    QueryModel queryModel = QueryModel.newInstance(this);
-    queryModel.setProjection(projection);
-    queryModel.setConverter(converter);
-    return queryModel;
-  }
-
-  public QueryProjection createProjection(String[] projectionColumnNames) {
-    String factTableName = getTableName();
-    QueryProjection projection = new QueryProjection();
-    // fill dimensions
-    // If columns are null, set all dimensions and measures
-    int i = 0;
-    if (projectionColumnNames != null) {
-      for (String projectionColumnName : projectionColumnNames) {
-        CarbonDimension dimension = getDimensionByName(factTableName, projectionColumnName);
-        if (dimension != null) {
-          projection.addDimension(dimension, i);
-          i++;
-        } else {
-          CarbonMeasure measure = getMeasureByName(factTableName, projectionColumnName);
-          if (measure == null) {
-            throw new RuntimeException(projectionColumnName +
-                " column not found in the table " + factTableName);
-          }
-          projection.addMeasure(measure, i);
-          i++;
-        }
-      }
-    }
-
-    return projection;
-  }
-
   public void processFilterExpression(Expression filterExpression,
       boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
     QueryModel.processFilterExpression(this, filterExpression, isFilterDimensions,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
index 06fe4db..b790f1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.scan.executor;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.executor.impl.DetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
@@ -31,9 +30,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 public class QueryExecutorFactory {
 
   public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
-    if (CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
-            CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) {
+    if (CarbonProperties.isSearchModeEnabled()) {
       if (queryModel.isVectorReader()) {
         return new SearchModeVectorDetailQueryExecutor();
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
index c64755e..484cafd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java
@@ -35,14 +35,18 @@ import org.apache.carbondata.core.util.CarbonProperties;
 public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName());
-  private static ExecutorService executorService;
+  private static ExecutorService executorService = null;
 
   static {
+    initThreadPool();
+  }
+
+  private static synchronized void initThreadPool() {
     int nThread;
     try {
       nThread = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
-                      CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
+          .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
     } catch (NumberFormatException e) {
       nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
       LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
@@ -58,6 +62,9 @@ public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object>
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    if (executorService == null) {
+      initThreadPool();
+    }
     this.queryIterator = new SearchModeResultIterator(
         blockExecutionInfoList,
         queryModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
index 075d94a..02e8dc1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java
@@ -31,21 +31,30 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.SearchModeVectorResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD;
 
+/**
+ * Below class will be used to execute the detail query and returns columnar vectors.
+ */
 public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName());
-  private static ExecutorService executorService;
+  private static ExecutorService executorService = null;
 
   static {
+    initThreadPool();
+  }
+
+  private static synchronized void initThreadPool() {
     int nThread;
     try {
       nThread = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD,
+              .getProperty(CARBON_SEARCH_MODE_SCAN_THREAD,
                       CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT));
     } catch (NumberFormatException e) {
       nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT);
-      LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread);
+      LOGGER.warn("The " + CARBON_SEARCH_MODE_SCAN_THREAD + " is invalid. "
+          + "Using the default value " + nThread);
     }
     if (nThread > 0) {
       executorService = Executors.newFixedThreadPool(nThread);
@@ -54,10 +63,21 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O
     }
   }
 
+  public static synchronized void shutdownThreadPool() {
+    // shutdown all threads immediately
+    if (executorService != null) {
+      executorService.shutdownNow();
+      executorService = null;
+    }
+  }
+
   @Override
   public CarbonIterator<Object> execute(QueryModel queryModel)
       throws QueryExecutionException, IOException {
     List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    if (executorService == null) {
+      initThreadPool();
+    }
     this.queryIterator = new SearchModeVectorResultIterator(
         blockExecutionInfoList,
         queryModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index f40bd8b..f7b828e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -18,55 +18,104 @@
 package org.apache.carbondata.core.scan.model;
 
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.DataTypeConverter;
 
 public class QueryModelBuilder {
 
-  private CarbonTable carbonTable;
+  private CarbonTable table;
+  private QueryProjection projection;
+  private Expression filterExpression;
+  private DataTypeConverter dataTypeConverter;
+  private boolean forcedDetailRawQuery;
+  private boolean readPageByPage;
 
-  public QueryModelBuilder(CarbonTable carbonTable) {
-    this.carbonTable = carbonTable;
+  public QueryModelBuilder(CarbonTable table) {
+    this.table = table;
   }
 
-  public QueryModel build(String[] projectionColumnNames, Expression filterExpression) {
-    QueryModel queryModel = QueryModel.newInstance(carbonTable);
-    QueryProjection projection = carbonTable.createProjection(projectionColumnNames);
-    queryModel.setProjection(projection);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
-    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
-    return queryModel;
+  public QueryModelBuilder projectColumns(String[] projectionColumns) {
+    Objects.requireNonNull(projectionColumns);
+    String factTableName = table.getTableName();
+    QueryProjection projection = new QueryProjection();
+
+    int i = 0;
+    for (String projectionColumnName : projectionColumns) {
+      CarbonDimension dimension = table.getDimensionByName(factTableName, projectionColumnName);
+      if (dimension != null) {
+        projection.addDimension(dimension, i);
+        i++;
+      } else {
+        CarbonMeasure measure = table.getMeasureByName(factTableName, projectionColumnName);
+        if (measure == null) {
+          throw new RuntimeException(projectionColumnName +
+              " column not found in the table " + factTableName);
+        }
+        projection.addMeasure(measure, i);
+        i++;
+      }
+    }
+
+    this.projection = projection;
+    return this;
   }
 
-  public QueryModel build(Expression filterExpression) {
+  public QueryModelBuilder projectAllColumns() {
     QueryProjection projection = new QueryProjection();
-
-    List<CarbonDimension> dimensions = carbonTable.getDimensions();
+    List<CarbonDimension> dimensions = table.getDimensions();
     for (int i = 0; i < dimensions.size(); i++) {
       projection.addDimension(dimensions.get(i), i);
     }
-    List<CarbonMeasure> measures = carbonTable.getMeasures();
+    List<CarbonMeasure> measures = table.getMeasures();
     for (int i = 0; i < measures.size(); i++) {
       projection.addMeasure(measures.get(i), i);
     }
+    this.projection = projection;
+    return this;
+  }
+
+  public QueryModelBuilder filterExpression(Expression filterExpression) {
+    this.filterExpression = filterExpression;
+    return this;
+  }
+
+  public QueryModelBuilder dataConverter(DataTypeConverter dataTypeConverter) {
+    this.dataTypeConverter = dataTypeConverter;
+    return this;
+  }
 
-    QueryModel queryModel = QueryModel.newInstance(carbonTable);
+  public QueryModelBuilder enableForcedDetailRawQuery() {
+    this.forcedDetailRawQuery = true;
+    return this;
+  }
+
+  public QueryModelBuilder enableReadPageByPage() {
+    this.readPageByPage = true;
+    return this;
+  }
+
+  public QueryModel build() {
+    QueryModel queryModel = QueryModel.newInstance(table);
+    queryModel.setConverter(dataTypeConverter);
+    queryModel.setForcedDetailRawQuery(forcedDetailRawQuery);
+    queryModel.setReadPageByPage(readPageByPage);
     queryModel.setProjection(projection);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
-    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
+
+    // set the filter to the query model in order to filter blocklet before scan
+    boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()];
+    boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
+    table.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures);
     queryModel.setIsFilterDimensions(isFilterDimensions);
     queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null);
+    FilterResolverIntf filterIntf =
+        table.resolveFilter(filterExpression, new SingleTableProvider(table));
     queryModel.setFilterExpressionResolverTree(filterIntf);
     return queryModel;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 38f7513..82080dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1032,6 +1032,11 @@ public final class CarbonProperties {
     return enableAutoHandoffStr.equalsIgnoreCase("true");
   }
 
+  public boolean isEnableVectorReader() {
+    return getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).equalsIgnoreCase("true");
+  }
+
   /**
    * Validate the restrictions
    *
@@ -1460,4 +1465,40 @@ public final class CarbonProperties {
     return systemLocation + CarbonCommonConstants.FILE_SEPARATOR + "_system";
   }
 
+  /**
+   * Return true if search mode is enabled
+   */
+  public static boolean isSearchModeEnabled() {
+    String value = getInstance().getProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
+        CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT);
+    return Boolean.valueOf(value);
+  }
+
+  public static void enableSearchMode(boolean enable) {
+    getInstance().addProperty(
+        CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, String.valueOf(enable));
+  }
+
+  public static int getSearchMasterPort() {
+    try {
+      return Integer.parseInt(
+          getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT));
+    } catch (NumberFormatException e) {
+      return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_MASTER_PORT_DEFAULT);
+    }
+  }
+
+  public static int getSearchWorkerPort() {
+    try {
+      return Integer.parseInt(
+          getInstance().getProperty(
+              CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT,
+              CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT));
+    } catch (NumberFormatException e) {
+      return Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_PORT_DEFAULT);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/dev/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index b19db85..63b6bd5 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -24,6 +24,10 @@
     <Source name="~.*\.scala" />
   </Match>
 
+  <Match>
+    <Source name="~.*Test\.java" />
+  </Match>
+
   <!-- This method creates stream but the caller methods are responsible for closing the stream -->
   <Match>
     <Class name="org.apache.carbondata.core.datastore.impl.FileFactory"/>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
index 7da8c29..a1a1428 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
@@ -25,11 +25,11 @@ import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
 
 import scala.util.Random
 
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SaveMode, SparkSession}
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.spark.util.DataGenerator
 
 // scalastyle:off println
 /**
@@ -60,7 +60,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 object ConcurrentQueryBenchmark {
 
   // generate number of data
-  var totalNum = 1 * 10 * 1000
+  var totalNum = 10 * 1000 * 1000
   // the number of thread pool
   var threadNum = 16
   // task number of spark sql query
@@ -505,7 +505,8 @@ object ConcurrentQueryBenchmark {
       .addProperty("carbon.enable.vector.reader", "true")
       .addProperty("enable.unsafe.sort", "true")
       .addProperty("carbon.blockletgroup.size.in.mb", "32")
-      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "false")
     import org.apache.spark.sql.CarbonSession._
 
     // 1. initParameters
@@ -559,8 +560,10 @@ object ConcurrentQueryBenchmark {
     // 2. prepareTable
     prepareTable(spark, table1, table2)
 
+    spark.asInstanceOf[CarbonSession].startSearchMode()
     // 3. runTest
     runTest(spark, table1, table2)
+    spark.asInstanceOf[CarbonSession].stopSearchMode()
 
     if (deleteFile) {
       CarbonUtil.deleteFoldersAndFiles(new File(table1))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
deleted file mode 100644
index e3e67b1..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.benchmark
-
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.types._
-
-// scalastyle:off println
-object DataGenerator {
-  // Table schema:
-  // +-------------+-----------+-------------+-------------+------------+
-  // | Column name | Data type | Cardinality | Column type | Dictionary |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | id          | string    | 100,000,000 | dimension   | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | city        | string    | 6           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | country     | string    | 6           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | planet      | string    | 10,007      | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m1          | short     | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m2          | int       | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m3          | big int   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m4          | double    | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m5          | decimal   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  /**
-   * generate DataFrame with above table schema
-   *
-   * @param spark SparkSession
-   * @return Dataframe of test data
-   */
-  def generateDataFrame(spark: SparkSession, totalNum: Int): DataFrame = {
-    val rdd = spark.sparkContext
-      .parallelize(1 to totalNum, 4)
-      .map { x =>
-        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007,
-          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
-          BigDecimal.valueOf(x.toDouble / 11))
-      }.map { x =>
-      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
-    }
-
-    val schema = StructType(
-      Seq(
-        StructField("id", StringType, nullable = false),
-        StructField("city", StringType, nullable = false),
-        StructField("country", StringType, nullable = false),
-        StructField("planet", StringType, nullable = false),
-        StructField("m1", ShortType, nullable = false),
-        StructField("m2", IntegerType, nullable = false),
-        StructField("m3", LongType, nullable = false),
-        StructField("m4", DoubleType, nullable = false),
-        StructField("m5", DecimalType(30, 10), nullable = false)
-      )
-    )
-
-    val df = spark.createDataFrame(rdd, schema)
-    println(s"Start generate ${df.count} records, schema: ${df.schema}")
-    df
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
index 880f476..e9c880b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.spark.util.DataGenerator
 
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
new file mode 100644
index 0000000..03e724f
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SearchModeExample.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.util.concurrent.{Executors, ExecutorService}
+
+import org.apache.spark.sql.{CarbonSession, SparkSession}
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * An example that demonstrate how to run queries in search mode,
+ * and compare the performance between search mode and SparkSQL
+ */
+// scalastyle:off
+object SearchModeExample {
+
+  def main(args: Array[String]) {
+    val spark = ExampleUtils.createCarbonSession("SearchModeExample")
+    spark.sparkContext.setLogLevel("ERROR")
+    exampleBody(spark)
+    spark.close()
+  }
+
+  def exampleBody(spark : SparkSession): Unit = {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbonsession_table(
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbonsession_table
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
+       """.stripMargin)
+
+    val pool = Executors.newCachedThreadPool()
+
+    // start search mode
+    spark.asInstanceOf[CarbonSession].startSearchMode()
+    runAsynchrousSQL(spark, pool, 1)
+
+    println("search mode asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("search mode synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    // stop search mode
+    spark.asInstanceOf[CarbonSession].stopSearchMode()
+
+    println("sparksql asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("sparksql synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    // start search mode again
+    spark.asInstanceOf[CarbonSession].startSearchMode()
+
+    println("search mode asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("search mode synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    // stop search mode
+    spark.asInstanceOf[CarbonSession].stopSearchMode()
+
+    println("sparksql asynchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runAsynchrousSQL(spark, pool, 100)
+    }
+
+    println("sparksql synchronous query")
+    org.apache.spark.sql.catalyst.util.benchmark {
+      runSynchrousSQL(spark, 100)
+    }
+
+    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
+    pool.shutdownNow()
+  }
+
+  private def runAsynchrousSQL(spark: SparkSession, pool: ExecutorService, round: Int): Unit = {
+    val futures = (1 to round).map { i =>
+      pool.submit(new Runnable {
+        override def run(): Unit = {
+          spark.sql(
+            s"""
+             SELECT charField, stringField, intField, dateField
+             FROM carbonsession_table
+             WHERE stringfield = 'spark' AND decimalField > $i % 37
+              """.stripMargin
+          ).collect()
+        }
+      })
+    }
+
+    futures.foreach(_.get())
+  }
+
+  private def runSynchrousSQL(spark: SparkSession, round: Int): Unit = {
+    (1 to round).map { i =>
+      spark.sql(
+        s"""
+             SELECT charField, stringField, intField, dateField
+             FROM carbonsession_table
+             WHERE stringfield = 'spark' AND decimalField > $i % 37
+              """.stripMargin
+      ).collect()
+    }
+  }
+}
+// scalastyle:on
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
index 1cdaafe..e12c2f9 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala
@@ -52,6 +52,7 @@ object ExampleUtils {
       "local[" + workThreadNum.toString() + "]"
     }
     import org.apache.spark.sql.CarbonSession._
+
     val spark = SparkSession
       .builder()
       .master(masterUrl)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index c586f3c..02d272e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -430,4 +431,8 @@ public class CarbonInputSplit extends FileSplit
   public void setFormat(FileFormat fileFormat) {
     this.fileFormat = fileFormat;
   }
+
+  public Blocklet makeBlocklet() {
+    return new Blocklet(getPath().getName(), blockletId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 448cf28..0b991cb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -20,11 +20,13 @@ package org.apache.carbondata.hadoop;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 
 import org.apache.hadoop.io.Writable;
@@ -34,7 +36,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
  * This class wraps multiple blocks belong to a same node to one split.
  * So the scanning task will scan multiple blocks. This is an optimization for concurrent query.
  */
-public class CarbonMultiBlockSplit extends InputSplit implements Writable {
+public class CarbonMultiBlockSplit extends InputSplit implements Serializable, Writable {
 
   /*
    * Splits (HDFS Blocks) for task to scan.
@@ -56,6 +58,14 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable {
     length = 0;
   }
 
+  public CarbonMultiBlockSplit(List<Distributable> blocks, String hostname) {
+    this.splitList = new ArrayList<>(blocks.size());
+    for (Distributable block : blocks) {
+      this.splitList.add((CarbonInputSplit)block);
+    }
+    this.locations = new String[]{hostname};
+  }
+
   public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList,
       String[] locations) {
     this.splitList = splitList;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8016d90..a72a6bf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -43,10 +43,9 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.SingleTableProvider;
-import org.apache.carbondata.core.scan.filter.TableProvider;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -108,6 +107,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   private static final String PARTITIONS_TO_PRUNE =
       "mapreduce.input.carboninputformat.partitions.to.prune";
+  private static final String FGDATAMAP_PRUNING = "mapreduce.input.carboninputformat.fgdatamap";
 
   // record segment number and hit blocks
   protected int numSegments = 0;
@@ -221,6 +221,17 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return configuration.get(COLUMN_PROJECTION);
   }
 
+  public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
+    configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
+  }
+
+  public static boolean isFgDataMapPruningEnable(Configuration configuration) {
+    String enable = configuration.get(FGDATAMAP_PRUNING);
+
+    // if FDDATAMAP_PRUNING is not set, by default we will use FGDataMap
+    return (enable == null) || enable.equalsIgnoreCase("true");
+  }
+
   /**
    * Set list of segments to access
    */
@@ -352,7 +363,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
-    if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+    DataMapLevel dataMapLevel = dataMapExprWrapper.getDataMapType();
+    if (dataMapJob != null &&
+        (distributedCG ||
+        (dataMapLevel == DataMapLevel.FG && isFgDataMapPruningEnable(job.getConfiguration())))) {
       DistributableDataMapFormat datamapDstr =
           new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
               partitionsToPrune, BlockletDataMapFactory.class.getName());
@@ -426,27 +440,20 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
 
-    // query plan includes projection column
+    // set projection column in the query model
     String projectionString = getColumnProjection(configuration);
-    String[] projectionColumnNames = null;
+    String[] projectColumns;
     if (projectionString != null) {
-      projectionColumnNames = projectionString.split(",");
+      projectColumns = projectionString.split(",");
+    } else {
+      projectColumns = new String[]{};
     }
-    QueryModel queryModel = carbonTable
-        .createQueryWithProjection(projectionColumnNames, getDataTypeConverter(configuration));
-
-    // set the filter to the query model in order to filter blocklet before scan
-    Expression filter = getFilterPredicates(configuration);
-    boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
-    // getAllMeasures returns list of visible and invisible columns
-    boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
-    carbonTable.processFilterExpression(filter, isFilterDimensions, isFilterMeasures);
-    queryModel.setIsFilterDimensions(isFilterDimensions);
-    queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, tableProvider);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
+    QueryModel queryModel = new QueryModelBuilder(carbonTable)
+        .projectColumns(projectColumns)
+        .filterExpression(getFilterPredicates(configuration))
+        .dataConverter(getDataTypeConverter(configuration))
+        .build();
 
     // update the file level index store if there are invalid segment
     if (inputSplit instanceof CarbonMultiBlockSplit) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
new file mode 100644
index 0000000..e2b5e60
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/CarbonRowReadSupport.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.readsupport.impl;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+
+/**
+ * A read support implementation to return CarbonRow after handling
+ * global dictionary and direct dictionary (date/timestamp) conversion
+ */
+public class CarbonRowReadSupport extends DictionaryDecodeReadSupport<CarbonRow> {
+
+  @Override
+  public CarbonRow readRow(Object[] data) {
+    assert (data.length == dictionaries.length);
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+      }
+      if (dataTypes[i] == DataTypes.DATE) {
+        Calendar c = Calendar.getInstance();
+        c.setTime(new Date(0));
+        c.add(Calendar.DAY_OF_YEAR, (Integer) data[i]);
+        data[i] = new Date(c.getTime().getTime());
+      } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
+        data[i] = new Timestamp((long) data[i] / 1000);
+      }
+    }
+    return new CarbonRow(data);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index d6d6603..8ac2905 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -23,9 +23,20 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.api.DataMapJob;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -63,6 +74,78 @@ public class CarbonInputFormatUtil {
     return carbonTableInputFormat;
   }
 
+  public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
+      Job job,
+      CarbonTable carbonTable,
+      String[] projectionColumns,
+      Expression filterExpression,
+      List<PartitionSpec> partitionNames,
+      DataMapJob dataMapJob) throws IOException, InvalidConfigurationException {
+    Configuration conf = job.getConfiguration();
+    CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
+    CarbonInputFormat.setDatabaseName(conf, carbonTable.getTableInfo().getDatabaseName());
+    CarbonInputFormat.setTableName(conf, carbonTable.getTableInfo().getFactTable().getTableName());
+    if (partitionNames != null) {
+      CarbonInputFormat.setPartitionsToPrune(conf, partitionNames);
+    }
+    CarbonInputFormat.setUnmanagedTable(conf, carbonTable.getTableInfo().isUnManagedTable());
+    CarbonProjection columnProjection = new CarbonProjection(projectionColumns);
+    return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
+        filterExpression, columnProjection, dataMapJob);
+  }
+
+  private static <V> CarbonTableInputFormat<V> createInputFormat(
+      Configuration conf,
+      AbsoluteTableIdentifier identifier,
+      Expression filterExpression,
+      CarbonProjection columnProjection,
+      DataMapJob dataMapJob) throws InvalidConfigurationException, IOException {
+    CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>();
+    CarbonInputFormat.setTablePath(
+        conf,
+        identifier.appendWithLocalPrefix(identifier.getTablePath()));
+    CarbonInputFormat.setQuerySegment(conf, identifier);
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonInputFormat.setColumnProjection(conf, columnProjection);
+    if (dataMapJob != null &&
+        Boolean.valueOf(CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT))) {
+      CarbonInputFormat.setDataMapJob(conf, dataMapJob);
+    }
+    // when validate segments is disabled in thread local update it to CarbonTableInputFormat
+    CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
+    if (carbonSessionInfo != null) {
+      String tableUniqueKey = identifier.getDatabaseName() + "." + identifier.getTableName();
+      String validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+          tableUniqueKey;
+      CarbonInputFormat.setValidateSegmentsToAccess(
+          conf,
+          Boolean.valueOf(carbonSessionInfo.getThreadParams().getProperty(
+              validateInputSegmentsKey, "true")));
+      String queryOnPreAggStreamingKey = CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING +
+          tableUniqueKey;
+      boolean queryOnPreAggStreaming = Boolean.valueOf(carbonSessionInfo.getThreadParams()
+          .getProperty(queryOnPreAggStreamingKey, "false"));
+      String inputSegmentsKey = CarbonCommonConstants.CARBON_INPUT_SEGMENTS + tableUniqueKey;
+      CarbonInputFormat.setValidateSegmentsToAccess(conf,
+          Boolean.valueOf(carbonSessionInfo.getThreadParams()
+              .getProperty(validateInputSegmentsKey, "true")));
+      CarbonInputFormat.setQuerySegment(
+          conf,
+          carbonSessionInfo.getThreadParams().getProperty(
+              inputSegmentsKey,
+              CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")));
+      if (queryOnPreAggStreaming) {
+        CarbonInputFormat.setAccessStreamingSegments(conf, true);
+        carbonSessionInfo.getThreadParams().removeProperty(queryOnPreAggStreamingKey);
+        carbonSessionInfo.getThreadParams().removeProperty(inputSegmentsKey);
+        carbonSessionInfo.getThreadParams().removeProperty(validateInputSegmentsKey);
+      }
+    }
+    return format;
+  }
+
   public static String createJobTrackerID(java.util.Date date) {
     return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 4c9e417..1cf2369 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -27,11 +27,8 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.SingleTableProvider;
-import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
@@ -132,22 +129,16 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
   private QueryModel getQueryModel(Configuration configuration, String path)
       throws IOException, InvalidConfigurationException {
     CarbonTable carbonTable = getCarbonTable(configuration, path);
-    TableProvider tableProvider = new SingleTableProvider(carbonTable);
-    // getting the table absoluteTableIdentifier from the carbonTable
-    // to avoid unnecessary deserialization
-
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
-
     String projectionString = getProjection(configuration, carbonTable,
         identifier.getCarbonTableIdentifier().getTableName());
     String[] projectionColumns = projectionString.split(",");
-    QueryModel queryModel = carbonTable.createQueryWithProjection(
-        projectionColumns, new DataTypeConverterImpl());
-    // set the filter to the query model in order to filter blocklet before scan
-    Expression filter = getFilterPredicates(configuration);
-    carbonTable.processFilterExpression(filter, null, null);
-    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, tableProvider);
-    queryModel.setFilterExpressionResolverTree(filterIntf);
+    QueryModel queryModel =
+        new QueryModelBuilder(carbonTable)
+            .projectColumns(projectionColumns)
+            .filterExpression(getFilterPredicates(configuration))
+            .dataConverter(new DataTypeConverterImpl())
+            .build();
 
     return queryModel;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index ddacdb6..7083c54 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -119,7 +119,8 @@ class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test timeseries table selection 2") {
-    val df = sql("SELECT TIMESERIES(mytime,'hour') FROM mainTable GROUP BY TIMESERIES(mytime,'hour')")
+    val df = sql("SELECT TIMESERIES(mytime,'hour') FROM mainTable " +
+                 "GROUP BY TIMESERIES(mytime,'hour')")
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0_hour")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 7dc7493..b55fa75 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -17,48 +17,86 @@
 
 package org.apache.carbondata.spark.testsuite.detailquery
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonSession, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.DataGenerator
+
 /**
- * Test Class for detailed query on multiple datatypes
+ * Test Suite for search mode
  */
-
 class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
 
-  override def beforeAll {
-    sql("CREATE TABLE alldatatypestable (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
-    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE alldatatypestable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""")
+  val numRows = 500 * 1000
+  override def beforeAll = {
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
+    sql("DROP TABLE IF EXISTS main")
 
-    sql("CREATE TABLE alldatatypestable_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','")
-    sql(s"""LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO TABLE alldatatypestable_hive""")
+    val df = DataGenerator.generateDataFrame(sqlContext.sparkSession, numRows)
+    df.write
+      .format("carbondata")
+      .option("tableName", "main")
+      .option("table_blocksize", "5")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
 
+  override def afterAll = {
+    sql("DROP TABLE IF EXISTS main")
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
+  }
+
+  private def sparkSql(sql: String): Seq[Row] = {
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].sparkSql(sql).collect()
+  }
+
+  private def checkSearchAnswer(query: String) = {
+    checkAnswer(sql(query), sparkSql(query))
   }
 
   test("SearchMode Query: row result") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false")
-        checkAnswer(
-      sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"),
-      sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'"))
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
-      CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT)
+    checkSearchAnswer("select * from main where city = 'city3'")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
-          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
   }
+
   test("SearchMode Query: vector result") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true")
-    checkAnswer(
-      sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"),
-      sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'"))
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE,
-      CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT)
+    checkSearchAnswer("select * from main where city = 'city3'")
   }
 
-  override def afterAll {
-    sql("drop table alldatatypestable")
-    sql("drop table alldatatypestable_hive")
+  test("equal filter") {
+    checkSearchAnswer("select id from main where id = '100'")
+    checkSearchAnswer("select id from main where planet = 'planet100'")
   }
+
+  test("greater and less than filter") {
+    checkSearchAnswer("select id from main where m2 < 4")
+  }
+
+  test("IN filter") {
+    checkSearchAnswer("select id from main where id IN ('40', '50', '60')")
+  }
+
+  test("expression filter") {
+    checkSearchAnswer("select id from main where length(id) < 2")
+  }
+
+  test("filter with limit") {
+    checkSearchAnswer("select id from main where id = '3' limit 10")
+    checkSearchAnswer("select id from main where length(id) < 2 limit 10")
+  }
+
+  test("aggregate query") {
+    checkSearchAnswer("select city, sum(m1) from main where m2 < 10 group by city")
+  }
+
+  test("aggregate query with datamap and fallback to SparkSQL") {
+    sql("create datamap preagg on table main using 'preaggregate' as select city, count(*) from main group by city ")
+    checkSearchAnswer("select city, count(*) from main group by city")
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 6f248d2..b985459 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -76,7 +76,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     internalCompute(split, context)
   }
 
-  private def getConf: Configuration = {
+  def getConf: Configuration = {
     val configuration = new Configuration(false)
     val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
       .unCompressByte(confBytes))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 31d3715..6d67daf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
-import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index a2542ab..4bfdd3b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -198,16 +198,6 @@ class NewCarbonDataLoadRDD[K, V](
     CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
   }
 
-  private def getConf = {
-    val configuration = new Configuration(false)
-    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
-      .unCompressByte(confBytes))
-    val ois = new ObjectInputStream(bai)
-    configuration.readFields(ois)
-    ois.close()
-    configuration
-  }
-
   override def getPartitions: Array[Partition] = {
     blocksGroupBy.zipWithIndex.map { b =>
       new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
@@ -359,16 +349,6 @@ class NewDataFrameLoaderRDD[K, V](
     CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
   }
 
-  private def getConf = {
-    val configuration = new Configuration(false)
-    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
-      .unCompressByte(confBytes))
-    val ois = new ObjectInputStream(bai)
-    configuration.readFields(ois)
-    ois.close()
-    configuration
-  }
-
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val hadoopConf = getConf

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 46e1be0..aca2d3c 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -45,6 +45,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-search</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
     </dependency>
@@ -262,47 +267,47 @@
       </build>
     </profile>
     <profile>
-    <id>spark-2.2</id>
-    <activation>
-      <activeByDefault>true</activeByDefault>
-    </activation>
-    <properties>
-      <spark.version>2.2.1</spark.version>
-      <scala.binary.version>2.11</scala.binary.version>
-      <scala.version>2.11.8</scala.version>
-    </properties>
-    <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <exclude>src/main/spark2.1</exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>3.0.0</version>
-        <executions>
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
+      <id>spark-2.2</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
             <configuration>
-              <sources>
-                <source>src/main/spark2.2</source>
-              </sources>
+              <excludes>
+                <exclude>src/main/spark2.1</exclude>
+              </excludes>
             </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-    </build>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala
new file mode 100644
index 0000000..64c4e14
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/DataGenerator.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+object DataGenerator {
+  // Table schema:
+  // +-------------+-----------+-------------+-------------+------------+
+  // | Column name | Data type | Cardinality | Column type | Dictionary |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | id          | string    | 100,000,000 | dimension   | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | city        | string    | 6           | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | country     | string    | 6           | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | planet      | string    | 100,000     | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m1          | short     | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m2          | int       | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m3          | big int   | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m4          | double    | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m5          | decimal   | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  /**
+   * generate DataFrame with above table schema
+   *
+   * @param spark SparkSession
+   * @return Dataframe of test data
+   */
+  def generateDataFrame(spark: SparkSession, totalNum: Int): DataFrame = {
+    val rdd = spark.sparkContext
+      .parallelize(1 to totalNum, 4)
+      .map { x =>
+        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 100000,
+          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+          BigDecimal.valueOf(x.toDouble / 11))
+      }.map { x =>
+      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+    }
+
+    val schema = StructType(
+      Seq(
+        StructField("id", StringType, nullable = false),
+        StructField("city", StringType, nullable = false),
+        StructField("country", StringType, nullable = false),
+        StructField("planet", StringType, nullable = false),
+        StructField("m1", ShortType, nullable = false),
+        StructField("m2", IntegerType, nullable = false),
+        StructField("m3", LongType, nullable = false),
+        StructField("m4", DoubleType, nullable = false),
+        StructField("m5", DecimalType(30, 10), nullable = false)
+      )
+    )
+
+    val df = spark.createDataFrame(rdd, schema)
+
+    // scalastyle:off println
+    println(s"Start generate ${df.count} records, schema: ${df.schema}")
+    // scalastyle:on println
+
+    df
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index e29ee46..279e7b0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -18,16 +18,20 @@
 package org.apache.carbondata.store
 
 import java.io.IOException
+import java.net.InetAddress
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{CarbonInputMetrics, SparkConf}
+import org.apache.spark.rpc.{Master, Worker}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
@@ -36,8 +40,9 @@ import org.apache.carbondata.spark.rdd.CarbonScanRDD
  * with CarbonData query optimization capability
  */
 @InterfaceAudience.Internal
-private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
+class SparkCarbonStore extends MetaCachedCarbonStore {
   private var session: SparkSession = _
+  private var master: Master = _
 
   /**
    * Initialize SparkCarbonStore
@@ -54,10 +59,17 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
       .getOrCreateCarbonSession()
   }
 
+  def this(sparkSession: SparkSession) = {
+    this()
+    session = sparkSession
+  }
+
   @throws[IOException]
   override def scan(
       path: String,
       projectColumns: Array[String]): java.util.Iterator[CarbonRow] = {
+    require(path != null)
+    require(projectColumns != null)
     scan(path, projectColumns, null)
   }
 
@@ -95,4 +107,45 @@ private[store] class SparkCarbonStore extends MetaCachedCarbonStore {
       .asJava
   }
 
+  def startSearchMode(): Unit = {
+    master = new Master(session.sparkContext.getConf)
+    master.startService()
+    startAllWorkers()
+  }
+
+  def stopSearchMode(): Unit = {
+    master.stopAllWorkers()
+    master.stopService()
+    master = null
+  }
+
+  /** search mode */
+  def search(
+      table: CarbonTable,
+      projectColumns: Array[String],
+      filter: Expression,
+      globalLimit: Long,
+      localLimit: Long): java.util.Iterator[CarbonRow] = {
+    if (master == null) {
+      throw new IllegalStateException("search mode is not started")
+    }
+    master.search(table, projectColumns, filter, globalLimit, localLimit)
+      .iterator
+      .asJava
+  }
+
+  private def startAllWorkers(): Array[Int] = {
+    // TODO: how to ensure task is sent to every executor?
+    val numExecutors = session.sparkContext.getExecutorMemoryStatus.keySet.size
+    val masterIp = InetAddress.getLocalHost.getHostAddress
+    session.sparkContext.parallelize(1 to numExecutors * 10, numExecutors).mapPartitions { f =>
+      // start worker
+      Worker.init(masterIp, CarbonProperties.getSearchMasterPort)
+      new Iterator[Int] {
+        override def hasNext: Boolean = false
+        override def next(): Int = 1
+      }
+    }.collect()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff574d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 00e0aed..ecf2088 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -42,7 +42,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataE
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
-
 /**
  * Carbon Environment for unified context
  */
@@ -62,6 +61,14 @@ class CarbonEnv {
   var initialized = false
 
   def init(sparkSession: SparkSession): Unit = {
+    val properties = CarbonProperties.getInstance()
+    var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+    if (storePath == null) {
+      storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+      properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+    }
+    LOGGER.info(s"Initializing CarbonEnv, store location: $storePath")
+
     sparkSession.udf.register("getTupleId", () => "")
     // added for handling preaggregate table creation. when user will fire create ddl for
     // create table we are adding a udf so no need to apply PreAggregate rules.
@@ -94,13 +101,6 @@ class CarbonEnv {
         // add session params after adding DefaultCarbonParams
         config.addDefaultCarbonSessionParams()
         carbonMetastore = {
-          val properties = CarbonProperties.getInstance()
-          var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
-          if (storePath == null) {
-            storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
-            properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
-          }
-          LOGGER.info(s"carbon env initial: $storePath")
           // trigger event for CarbonEnv create
           val operationContext = new OperationContext
           val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
@@ -113,6 +113,7 @@ class CarbonEnv {
         initialized = true
       }
     }
+    LOGGER.info("Initialize CarbonEnv completed...")
   }
 }