You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/06/05 10:42:03 UTC

[18/26] carbondata git commit: [CARBONDATA-2389] Search mode support FG datamap

[CARBONDATA-2389] Search mode support FG datamap

Search mode support FG datamap

This closes #2290


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cb71ffe1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cb71ffe1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cb71ffe1

Branch: refs/heads/branch-1.4
Commit: cb71ffe1ac1a39ef34df43457e704232a4c1444e
Parents: 67766ab
Author: xubo245 <60...@qq.com>
Authored: Wed May 9 21:20:59 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Jun 5 16:04:20 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  12 +
 .../core/datamap/DataMapStoreManager.java       |  20 +-
 .../apache/carbondata/core/datamap/Segment.java |   2 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java |  16 +
 .../datamap/dev/expr/DataMapExprWrapper.java    |  13 +
 .../dev/expr/DataMapExprWrapperImpl.java        |   8 +
 .../datamap/dev/expr/OrDataMapExprWrapper.java  |  13 +
 .../LatestFilesReadCommittedScope.java          |  43 ++-
 .../core/readcommitter/ReadCommittedScope.java  |   2 +-
 .../TableStatusReadCommittedScope.java          |   2 +-
 .../lucene/LuceneDataMapFactoryBase.java        |   4 +-
 .../examples/LuceneDataMapExample.scala         |   2 -
 .../carbondata/hadoop/CarbonRecordReader.java   |   8 +-
 .../hadoop/api/CarbonInputFormat.java           |   6 +-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |   1 +
 ...eneFineGrainDataMapWithSearchModeSuite.scala | 328 +++++++++++++++++++
 .../detailquery/SearchModeTestCase.scala        |  27 ++
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |   2 +
 .../store/worker/SearchRequestHandler.java      |  37 ++-
 .../scala/org/apache/spark/rpc/Master.scala     |  13 +-
 21 files changed, 521 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8ebce9e..08aa704 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1658,6 +1658,18 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false";
 
   /**
+   * It's timeout threshold of carbon search query
+   */
+  @CarbonProperty
+  @InterfaceStability.Unstable
+  public static final String CARBON_SEARCH_QUERY_TIMEOUT = "carbon.search.query.timeout";
+
+  /**
+   * Default value is 10 seconds
+   */
+  public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s";
+
+  /**
    * The size of thread pool used for reading files in Work for search mode. By default,
    * it is number of cores in Worker
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 0fcf4cd..96d2b1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -96,13 +96,19 @@ public final class DataMapStoreManager {
       String dbName = carbonTable.getDatabaseName();
       String tableName = carbonTable.getTableName();
       String dmName = dataMap.getDataMapSchema().getDataMapName();
-      boolean isDmVisible = sessionInfo.getSessionParams().getProperty(
-          String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE,
-              dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true");
-      if (!isDmVisible) {
-        LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s",
-            dmName, dbName, tableName));
-        dataMapIterator.remove();
+      // TODO: need support get the visible status of datamap without sessionInfo in the future
+      if (sessionInfo != null) {
+        boolean isDmVisible = sessionInfo.getSessionParams().getProperty(
+            String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE,
+                dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true");
+        if (!isDmVisible) {
+          LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s",
+              dmName, dbName, tableName));
+          dataMapIterator.remove();
+        }
+      } else {
+        String message = "Carbon session info is null";
+        LOGGER.info(message);
       }
     }
     return allDataMaps;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 85c7176..7b63b84 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -115,7 +115,7 @@ public class Segment implements Serializable {
 
   public SegmentRefreshInfo getSegmentRefreshInfo(UpdateVO updateVo)
       throws IOException {
-    return readCommittedScope.getCommitedSegmentRefreshInfo(this, updateVo);
+    return readCommittedScope.getCommittedSegmentRefreshInfo(this, updateVo);
   }
 
   public String getSegmentNo() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 1de16bc..ec674de 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -59,6 +60,21 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
     return andBlocklets;
   }
 
+  @Override
+  public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+      List<PartitionSpec> partitionsToPrune)
+          throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune);
+    List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
+    for (ExtendedBlocklet blocklet : leftPrune) {
+      if (rightPrune.contains(blocklet)) {
+        andBlocklets.add(blocklet);
+      }
+    }
+    return andBlocklets;
+  }
+
   @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
       throws IOException {
     List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
index 5a04529..901cfc7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -41,6 +42,18 @@ public interface DataMapExprWrapper extends Serializable {
       throws IOException;
 
   /**
+   * prune blocklet according distributable
+   *
+   * @param distributable     distributable
+   * @param partitionsToPrune partitions to prune
+   * @return the pruned ExtendedBlocklet list
+   * @throws IOException
+   */
+  List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+      List<PartitionSpec> partitionsToPrune)
+          throws IOException;
+
+  /**
    * It is used in case on distributable datamap. First using job it gets all blockets from all
    * related datamaps. These blocklets are passed to this method to apply expression.
    * @param blocklets

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index 38f2336..6537976 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
@@ -52,6 +53,13 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
     return dataMap.prune(segments, expression, partitionsToPrune);
   }
 
+  public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+      List<PartitionSpec> partitionsToPrune)
+      throws IOException {
+    List<DataMap> dataMaps = dataMap.getTableDataMaps(distributable);
+    return dataMap.prune(dataMaps, distributable, expression, partitionsToPrune);
+  }
+
   @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
       throws IOException {
     List<ExtendedBlocklet> blockletList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
index 4988903..bb98535 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -58,6 +59,18 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper {
     return new ArrayList<>(andBlocklets);
   }
 
+  @Override
+  public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
+      List<PartitionSpec> partitionsToPrune)
+          throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune);
+    Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
+    andBlocklets.addAll(leftPrune);
+    andBlocklets.addAll(rightPrune);
+    return new ArrayList<>(andBlocklets);
+  }
+
   @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)
       throws IOException {
     List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 14bba65..6a1234e 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -17,10 +17,7 @@
 package org.apache.carbondata.core.readcommitter;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -43,11 +40,20 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 public class LatestFilesReadCommittedScope implements ReadCommittedScope {
 
   private String carbonFilePath;
+  private String segmentId;
   private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
   private LoadMetadataDetails[] loadMetadataDetails;
 
-  public LatestFilesReadCommittedScope(String path)  {
+  /**
+   * a new constructor of this class
+   *
+   * @param path      carbon file path
+   * @param segmentId segment id
+   */
+  public LatestFilesReadCommittedScope(String path, String segmentId) {
+    Objects.requireNonNull(path);
     this.carbonFilePath = path;
+    this.segmentId = segmentId;
     try {
       takeCarbonIndexFileSnapShot();
     } catch (IOException ex) {
@@ -55,6 +61,15 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
     }
   }
 
+  /**
+   * a new constructor with path
+   *
+   * @param path carbon file path
+   */
+  public LatestFilesReadCommittedScope(String path) {
+    this(path, null);
+  }
+
   private void prepareLoadMetadata() {
     int loadCount = 0;
     Map<String, List<String>> snapshotMap =
@@ -101,13 +116,16 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
       segName = segment.getSegmentFileName();
     }
     List<String> index = snapShot.get(segName);
+    if (null == index) {
+      index = new LinkedList<>();
+    }
     for (String indexPath : index) {
       indexFileStore.put(indexPath, null);
     }
     return indexFileStore;
   }
 
-  @Override public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+  @Override public SegmentRefreshInfo getCommittedSegmentRefreshInfo(
       Segment segment, UpdateVO updateVo) throws IOException {
     Map<String, SegmentRefreshInfo> snapShot =
         readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap();
@@ -140,9 +158,10 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
     // Read the current file Path get the list of indexes from the path.
     CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
     CarbonFile[] files = file.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
+      @Override
+      public boolean accept(CarbonFile file) {
         return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
-            .endsWith(CarbonTablePath.CARBON_DATA_EXT);
+            .endsWith(CarbonTablePath.CARBON_DATA_EXT) || file.getName().endsWith("Fact");
       }
     });
     if (files.length == 0) {
@@ -152,8 +171,14 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
     }
     Map<String, List<String>> indexFileStore = new HashMap<>();
     Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>();
+    CarbonFile[] carbonIndexFiles = null;
     if (file.isDirectory()) {
-      CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+      if (segmentId == null) {
+        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+      } else {
+        String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
+        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+      }
       for (int i = 0; i < carbonIndexFiles.length; i++) {
         // TODO. If Required to support merge index, then this code has to be modified.
         // TODO. Nested File Paths.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index 6ff4b89..d177a00 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -45,7 +45,7 @@ public interface ReadCommittedScope extends Serializable {
    */
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ;
 
-  public SegmentRefreshInfo getCommitedSegmentRefreshInfo(
+  public SegmentRefreshInfo getCommittedSegmentRefreshInfo(
       Segment segment, UpdateVO updateVo) throws IOException;
 
   public void takeCarbonIndexFileSnapShot() throws IOException;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 91ebd41..1f61aab 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -79,7 +79,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
     return indexFiles;
   }
 
-  public SegmentRefreshInfo getCommitedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
+  public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
       throws IOException {
     SegmentRefreshInfo segmentRefreshInfo;
     if (updateVo != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index fab0565..1da8edd 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandExcept
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
@@ -235,7 +236,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
     }
     for (CarbonFile indexDir : indexDirs) {
       // Filter out the tasks which are filtered through CG datamap.
-      if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
+      if (getDataMapLevel() != DataMapLevel.FG &&
+          !segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
         continue;
       }
       DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
index efe2a63..fe94f54 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala
@@ -61,8 +61,6 @@ object LuceneDataMapExample {
          | DMProperties('INDEX_COLUMNS'='id , name')
       """.stripMargin)
 
-    spark.sql("refresh datamap dm ON TABLE personTable")
-
     // 1. Compare the performance:
 
     def time(code: => Unit): Double = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index cad20fc..da84c00 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -78,8 +78,12 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
     } else {
       throw new RuntimeException("unsupported input split type: " + inputSplit);
     }
-    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
-    queryModel.setTableBlockInfos(tableBlockInfoList);
+    // It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty
+    // otherwise the prune is no use before this method
+    if (queryModel.getTableBlockInfos().isEmpty()) {
+      List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+      queryModel.setTableBlockInfos(tableBlockInfoList);
+    }
     readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
     try {
       carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index cf51162..05c70f8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -372,7 +372,7 @@ m filterExpression
     List<ExtendedBlocklet> prunedBlocklets =
         getPrunedBlocklets(job, carbonTable, resolver, segmentIds);
 
-    List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+    List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>();
     int partitionIndex = 0;
     List<Integer> partitionIdList = new ArrayList<>();
     if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
@@ -401,7 +401,7 @@ m filterExpression
         if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
           CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
           if (inputSplit != null) {
-            resultFilterredBlocks.add(inputSplit);
+            resultFilteredBlocks.add(inputSplit);
           }
         }
       }
@@ -409,7 +409,7 @@ m filterExpression
     statistic
         .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
     recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
-    return resultFilterredBlocks;
+    return resultFilteredBlocks;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 638d24d..f64a349 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -438,6 +438,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       .contains("Unsupported alter operation on hive table"))
     sql("drop datamap if exists dm2 on table datamap_test_table")
   }
+
   test("test Clean Files and check Lucene DataMap") {
     sql("DROP TABLE IF EXISTS datamap_test_table")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
new file mode 100644
index 0000000..0ceead8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+
+/**
+  * Test lucene fine grain datamap with search mode
+  */
+class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 500000
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
+    CarbonProperties
+      .getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s")
+    LuceneFineGrainDataMapSuite.createFile(file2, n)
+    sql("create database if not exists lucene")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+        CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+    sql("use lucene")
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+  }
+
+  test("test lucene fine grain data map with search mode") {
+
+    sqlContext.sparkSession.sparkContext.setLogLevel("WARN")
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='Name')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"),
+      sql(s"select * from datamap_test where name='n10'"))
+
+    sql("drop datamap dm on table datamap_test")
+  }
+
+  // TODO: optimize performance
+  ignore("test lucene fine grain data map with TEXT_MATCH 'AND' Filter") {
+    sql("drop datamap if exists dm on table datamap_test")
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(
+      sql("SELECT count(*) FROM datamap_test WHERE TEXT_MATCH('name:n2*') " +
+        "AND age=28 and id=200149"),
+      sql("SELECT count(*) FROM datamap_test WHERE name like 'n2%' " +
+        "AND age=28 and id=200149"))
+    sql("drop datamap if exists dm on table datamap_test")
+  }
+
+  // TODO: optimize performance
+  ignore("test lucene fine grain data map with TEXT_MATCH 'AND' and 'OR' Filter ") {
+    sql("drop datamap if exists dm on table datamap_test")
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='name , city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n1*') OR TEXT_MATCH ('city:c01*') " +
+      "AND TEXT_MATCH('city:C02*')"),
+      sql("select * from datamap_test where name like 'n1%' OR city like 'c01%' and city like" +
+        " 'c02%'"))
+    sql("drop datamap if exists dm on table datamap_test")
+  }
+
+  test("test lucene fine grain data map with compaction-Major ") {
+    sql("DROP TABLE IF EXISTS datamap_test_table")
+    sql(
+      """
+        | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test_table
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='name , city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"),
+      sql("select * from datamap_test_table where name='n10'"))
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+    sql("alter table datamap_test_table compact 'major'")
+    checkAnswer(sql("SELECT COUNT(*) FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"),
+      sql("select COUNT(*) from datamap_test_table where name='n10'"))
+    sql("drop datamap if exists dm on table datamap_test_table")
+    sql("DROP TABLE IF EXISTS datamap_test_table")
+  }
+
+  test("test lucene fine grain datamap rebuild") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test5
+         | USING 'lucene'
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    val map = DataMapStatusManager.readDataMapStatusMap()
+    assert(!map.get("dm").isEnabled)
+    sql("REBUILD DATAMAP dm ON TABLE datamap_test5")
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
+  }
+
+  test("test lucene fine grain datamap rebuild with table block size") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1')
+      """.stripMargin)
+    sql(
+    s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test5
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c00')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c00'"))
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c0100085')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c0100085'"))
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c09560')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c09560'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
+  }
+
+  test("test lucene fine grain multiple data map on table") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm2 ON TABLE datamap_test5
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='city')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test5
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='Name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('name:n10')"),
+      sql(s"select * from datamap_test5 where name='n10'"))
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
+  }
+
+  // TODO: support it  in the future
+  ignore("test lucene datamap and validate the visible and invisible status of datamap ") {
+    val tableName = "datamap_test2"
+    val dataMapName1 = "ggdatamap1";
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName(id INT, name STRING, city STRING, age INT)
+         | STORED BY 'org.apache.carbondata.format'
+         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    // register datamap writer
+    sql(
+      s"""
+         | CREATE DATAMAP ggdatamap1 ON TABLE $tableName
+         | USING 'lucene'
+         | DMPROPERTIES('index_columns'='name')
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE $tableName OPTIONS('header'='false')")
+
+    val df1 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").collect()
+    sql(s"SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").show()
+    println(df1(0).getString(0))
+    assertResult(
+      s"""== CarbonData Profiler ==
+         |Table Scan on datamap_test2
+         | - total blocklets: 1
+         | - filter: TEXT_MATCH('name:n502670')
+         | - pruned by Main DataMap
+         |    - skipped blocklets: 0
+         | - pruned by FG DataMap
+         |    - name: ggdatamap1
+         |    - provider: lucene
+         |    - skipped blocklets: 1
+         |""".stripMargin)(df1(0).getString(0))
+
+    sql(s"set ${CarbonCommonConstants.CARBON_DATAMAP_VISIBLE}default.$tableName.$dataMapName1 = false")
+
+    val df2 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670'").collect()
+    println(df2(0).getString(0))
+    assertResult(
+      s"""== CarbonData Profiler ==
+         |Table Scan on $tableName
+         | - total blocklets: 1
+         | - filter: (name <> null and name = n502670)
+         | - pruned by Main DataMap
+         |    - skipped blocklets: 0
+         |""".stripMargin)(df2(0).getString(0))
+
+    checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"),
+      sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"))
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  ignore("test lucene fine grain datamap rebuild with table block size, rebuild") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test5
+         | USING 'lucene'
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')")
+    sql("REBUILD DATAMAP dm ON TABLE datamap_test5")
+
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
+    sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show()
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode()
+    sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show()
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql("use default")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+        CarbonProperties.getStorePath)
+    sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode()
+  }
+
+  def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+    val write = new PrintWriter(new File(fileName))
+    for (i <- start until (start + line)) {
+      write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80))
+    }
+    write.close()
+  }
+
+  def deleteFile(fileName: String): Unit = {
+    val file = new File(fileName)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
index 2c94dab..d278fc5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala
@@ -109,4 +109,31 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll {
     sql("set carbon.search.enabled = false")
     assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled)
   }
+
+  test("test lucene datamap with search mode") {
+    sql("DROP DATAMAP IF EXISTS dm ON TABLE main")
+    sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ")
+    checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
+      sql(s"SELECT * FROM main WHERE id='100000'"))
+    sql("DROP DATAMAP if exists dm ON TABLE main")
+  }
+
+  test("test lucene datamap with search mode 2") {
+    sql("drop datamap if exists dm3 ON TABLE main")
+    sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ")
+    checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
+      sql("SELECT * FROM main WHERE city='city6'"))
+    sql("DROP DATAMAP if exists dm3 ON TABLE main")
+  }
+
+  test("test lucene datamap with search mode, two column") {
+    sql("drop datamap if exists dm3 ON TABLE main")
+    sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ")
+    checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"),
+      sql("SELECT * FROM main WHERE city='city6'"))
+    checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"),
+      sql(s"SELECT * FROM main WHERE id='100000'"))
+    sql("DROP DATAMAP if exists dm3 ON TABLE main")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 29dcec9..186e39e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -68,10 +68,10 @@ case class CarbonSetCommand(command: SetCommand)
   override val output: Seq[Attribute] = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
+    val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
     command.kv match {
       case Some((key, Some(value))) =>
-        CarbonSetCommand.validateAndSetValue(sessionParms, key, value)
+        CarbonSetCommand.validateAndSetValue(sessionParams, key, value)
 
         // handle search mode start/stop for ThriftServer usage
         if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 07a444f..c052cd7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -382,6 +382,8 @@ object CarbonFilters {
             CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
         new AndExpression(l, r)
       case StringTrim(child) => transformExpression(child)
+      case s: ScalaUDF =>
+        new MatchExpression(s.children.head.toString())
       case _ =>
         new SparkUnknownExpression(expr.transform {
           case AttributeReference(name, dataType, _, _) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 9727352..f6406c7 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.store.worker;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -27,7 +28,9 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
@@ -112,6 +115,8 @@ public class SearchRequestHandler {
     queryModel.setVectorReader(false);
 
     CarbonMultiBlockSplit mbSplit = request.split().value();
+    List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits());
+    queryModel.setTableBlockInfos(list);
     long limit = request.limit();
     long rowCount = 0;
 
@@ -158,22 +163,38 @@ public class SearchRequestHandler {
       CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException {
     Objects.requireNonNull(datamap);
     List<Segment> segments = new LinkedList<>();
+    HashMap<String, Integer> uniqueSegments = new HashMap<>();
     for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-      segments.add(
-          Segment.toSegment(split.getSegmentId(),
-              new LatestFilesReadCommittedScope(table.getTablePath())));
+      String segmentId = split.getSegmentId();
+      if (uniqueSegments.get(segmentId) == null) {
+        segments.add(Segment.toSegment(
+                segmentId,
+                new LatestFilesReadCommittedScope(table.getTablePath(), segmentId)));
+        uniqueSegments.put(segmentId, 1);
+      } else {
+        uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
+      }
+    }
+
+    List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments);
+    List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>();
+    for (int i = 0; i < distributables.size(); i++) {
+      DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable();
+      prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null));
     }
-    List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null);
 
-    List<String> pathToRead = new LinkedList<>();
-    for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) {
-      pathToRead.add(prunnedBlocklet.getPath());
+    HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>();
+    for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) {
+      pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet);
     }
 
     List<TableBlockInfo> blocks = queryModel.getTableBlockInfos();
     List<TableBlockInfo> blockToRead = new LinkedList<>();
     for (TableBlockInfo block : blocks) {
-      if (pathToRead.contains(block.getFilePath())) {
+      if (pathToRead.keySet().contains(block.getFilePath())) {
+        // If not set this, it will can't create FineGrainBlocklet object in
+        // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData
+        block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath());
         blockToRead.add(block);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
----------------------------------------------------------------------
diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
index 26de74c..f48f5e4 100644
--- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
+++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala
@@ -38,8 +38,7 @@ import org.apache.spark.util.ThreadUtils
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapChooser
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -232,10 +231,14 @@ class Master(sparkConf: SparkConf) {
 
       // if we have enough data already, we do not need to collect more result
       if (rowCount < globalLimit) {
-        // wait for worker for 10s
-        ThreadUtils.awaitResult(future, Duration.apply("10s"))
+        // wait for worker
+        val timeout = CarbonProperties
+          .getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT,
+            CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT)
+        ThreadUtils.awaitResult(future, Duration.apply(timeout))
         LOG.info(s"[SearchId:$queryId] receive search response from worker " +
-                 s"${worker.address}:${worker.port}")
+          s"${worker.address}:${worker.port}")
         try {
           future.value match {
             case Some(response: Try[SearchResult]) =>