You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 15:48:48 UTC

[2/2] carbondata git commit: [CARBONDATA-2206] support lucene index datamap

[CARBONDATA-2206] support lucene index datamap

This PR is an initial effort to integrate lucene as an index datamap into carbondata.
A new module called carbondata-lucene is added to support lucene datamap:

1.Add LuceneFineGrainDataMap, implement FineGrainDataMap interface.
2.Add LuceneCoarseGrainDataMap, implement CoarseGrainDataMap interface.
3.Support writing lucene index via LuceneDataMapWriter.
4.Implement LuceneDataMapFactory
5.A UDF called TEXT_MATCH is added, use it to do filtering on string column by lucene

This closes #2003


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/24ebfbbd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/24ebfbbd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/24ebfbbd

Branch: refs/heads/datamap
Commit: 24ebfbbdadbb056da7b7f4ef83d4c87777007220
Parents: 06c572d
Author: Jacky Li <ja...@qq.com>
Authored: Mon Feb 26 16:30:38 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 23:48:27 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapChooser.java |   4 +
 .../core/datamap/DataMapStoreManager.java       |   5 +-
 .../carbondata/core/datamap/dev/DataMap.java    |   2 +-
 .../core/datamap/dev/DataMapFactory.java        |   2 +-
 .../core/datamap/dev/DataMapWriter.java         |   7 +-
 .../cgdatamap/CoarseGrainDataMapFactory.java    |   1 +
 .../core/scan/filter/intf/ExpressionType.java   |   3 +-
 .../carbondata/core/util/DataTypeUtil.java      |   2 +
 datamap/lucene/pom.xml                          | 149 +++++++++
 .../lucene/LuceneCoarseGrainDataMap.java        | 232 +++++++++++++
 .../lucene/LuceneCoarseGrainDataMapFactory.java |  72 ++++
 .../lucene/LuceneDataMapDistributable.java      |  36 ++
 .../lucene/LuceneDataMapFactoryBase.java        | 180 ++++++++++
 .../datamap/lucene/LuceneDataMapWriter.java     | 328 +++++++++++++++++++
 .../datamap/lucene/LuceneFineGrainDataMap.java  | 280 ++++++++++++++++
 .../lucene/LuceneFineGrainDataMapFactory.java   |  68 ++++
 .../lucene/LuceneCoarseGrainDataMapSuite.scala  |  73 +++++
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  98 ++++++
 integration/spark-common-test/pom.xml           |   6 +
 .../testsuite/datamap/FGDataMapTestCase.scala   |   2 +-
 .../carbondata/datamap/DataMapProvider.java     |   4 +-
 .../datamap/IndexDataMapProvider.java           |   4 +-
 .../datamap/expression/MatchExpression.java     |  56 ++++
 .../carbondata/datamap/TextMatchUDF.scala       |  34 ++
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   5 +
 .../strategy/CarbonLateDecodeStrategy.scala     |   9 +
 .../spark/sql/optimizer/CarbonFilters.scala     |   4 +
 pom.xml                                         |   3 +
 .../datamap/DataMapWriterListener.java          |   6 +-
 .../store/writer/AbstractFactDataWriter.java    |  12 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   4 +-
 31 files changed, 1673 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 94b48c6..c8c971d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -228,6 +228,10 @@ public class DataMapChooser {
 
   private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions,
       Set<ExpressionType> expressionTypes) {
+    if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH)) {
+      // TODO: fix it with right logic
+      return true;
+    }
     if (mapMeta.getIndexedColumns().size() == 0 || columnExpressions.size() == 0) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index e38f4f9..0223ae2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -144,7 +145,7 @@ public final class DataMapStoreManager {
    * The datamap is created using datamap name, datamap factory class and table identifier.
    */
   private TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
-      DataMapSchema dataMapSchema) throws MalformedDataMapCommandException {
+      DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
     DataMapFactory dataMapFactory;
     try {
       // try to create datamap by reflection to test whether it is a valid DataMapFactory class
@@ -162,7 +163,7 @@ public final class DataMapStoreManager {
   }
 
   public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier,
-      DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory) {
+      DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory) throws IOException {
     String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
     getTableSegmentRefresher(identifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index cdd7387..82e20fa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -44,7 +44,7 @@ public interface DataMap<T extends Blocklet> {
    * @return
    */
   List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<String> partitions);
+      List<String> partitions) throws IOException;
 
   // TODO Move this method to Abstract class
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 77f2249..7bf04c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -34,7 +34,7 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema);
+  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) throws IOException;
 
   /**
    * Return a new write for this datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 18252b6..8092efa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -53,12 +53,12 @@ public abstract class DataMapWriter {
    *
    * @param blockId file name of the carbondata file
    */
-  public abstract void onBlockStart(String blockId);
+  public abstract void onBlockStart(String blockId) throws IOException;
 
   /**
    * End of block notification
    */
-  public abstract void onBlockEnd(String blockId);
+  public abstract void onBlockEnd(String blockId) throws IOException;
 
   /**
    * Start of new blocklet notification.
@@ -80,7 +80,8 @@ public abstract class DataMapWriter {
    * Implementation should copy the content of `pages` as needed, because `pages` memory
    * may be freed after this method returns, if using unsafe column page.
    */
-  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages);
+  public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
+      throws IOException;
 
   /**
    * This is called during closing of writer.So after this call no more data will be sent to this

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
index 4d20cdb..f9fdafb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.core.datamap.dev.cgdatamap;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
index 831acc8..d66c5b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -42,5 +42,6 @@ public enum ExpressionType {
   TRUE,
   STARTSWITH,
   ENDSWITH,
-  CONTAINSWITH
+  CONTAINSWITH,
+  TEXT_MATCH
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 6d224cf..f4b06e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -726,6 +726,8 @@ public final class DataTypeUtil {
    */
   public static void setDataTypeConverter(DataTypeConverter converterLocal) {
     converter = converterLocal;
+    timeStampformatter.remove();
+    dateformatter.remove();
   }
 
   public static DataTypeConverter getDataTypeConverter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/lucene/pom.xml b/datamap/lucene/pom.xml
new file mode 100644
index 0000000..ee504c6
--- /dev/null
+++ b/datamap/lucene/pom.xml
@@ -0,0 +1,149 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-lucene</artifactId>
+  <name>Apache CarbonData :: Lucene Index DataMap</name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+    <lucene.version>6.3.0</lucene.version>
+    <solr.version>6.3.0</solr.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+      <version>${lucene.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analyzers-common</artifactId>
+      <version>${lucene.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-queryparser</artifactId>
+      <version>${lucene.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-sandbox</artifactId>
+      <version>${lucene.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-core</artifactId>
+      <version>${solr.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-solrj</artifactId>
+      <version>${solr.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <resources>
+      <resource>
+        <directory>src/resources</directory>
+      </resource>
+      <resource>
+        <directory>.</directory>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
new file mode 100644
index 0000000..0b7df86
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+@InterfaceAudience.Internal
+public class LuceneCoarseGrainDataMap extends CoarseGrainDataMap {
+
+  /**
+   * log information
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneCoarseGrainDataMap.class.getName());
+
+  public static final int BLOCKID_ID = 0;
+
+  public static final int BLOCKLETID_ID = 1;
+
+  public static final int PAGEID_ID = 2;
+
+  public static final int ROWID_ID = 3;
+  /**
+   * searcher object for this datamap
+   */
+  private IndexSearcher indexSearcher = null;
+
+  /**
+   * default max values to return
+   */
+  private static int MAX_RESULT_NUMBER = 100;
+
+  /**
+   * analyzer for lucene index
+   */
+  private Analyzer analyzer;
+
+  LuceneCoarseGrainDataMap(Analyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  @Override
+  public void init(DataMapModel dataMapModel) throws MemoryException, IOException {
+    // get this path from file path
+    Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+
+    LOGGER.info("Lucene index read path " + indexPath.toString());
+
+    // get file system , use hdfs file system , realized in solr project
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // check this path valid
+    if (!fs.exists(indexPath)) {
+      String errorMessage = String.format("index directory %s not exists.", indexPath);
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    if (!fs.isDirectory(indexPath)) {
+      String errorMessage = String.format("error index path %s, must be directory", indexPath);
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    // open this index path , use HDFS default configuration
+    Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+
+    IndexReader indexReader = DirectoryReader.open(indexDir);
+    if (indexReader == null) {
+      throw new RuntimeException("failed to create index reader object");
+    }
+
+    // create a index searcher object
+    indexSearcher = new IndexSearcher(indexReader);
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   */
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      List<String> partitions) throws IOException {
+
+    // convert filter expr into lucene list query
+    List<String> fields = new ArrayList<String>();
+
+    // only for test , query all data
+    String strQuery = "*:*";
+
+    String[] sFields = new String[fields.size()];
+    fields.toArray(sFields);
+
+    // get analyzer
+    if (analyzer == null) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // use MultiFieldQueryParser to parser query
+    QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
+    Query query;
+    try {
+      query = queryParser.parse(strQuery);
+    } catch (ParseException e) {
+      String errorMessage = String
+          .format("failed to filter block with query %s, detail is %s", strQuery, e.getMessage());
+      LOGGER.error(errorMessage);
+      return null;
+    }
+
+    // execute index search
+    TopDocs result;
+    try {
+      result = indexSearcher.search(query, MAX_RESULT_NUMBER);
+    } catch (IOException e) {
+      String errorMessage =
+          String.format("failed to search lucene data, detail is %s", e.getMessage());
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    // temporary data, delete duplicated data
+    // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+    Map<String, Set<Number>> mapBlocks = new HashMap<String, Set<Number>>();
+
+    for (ScoreDoc scoreDoc : result.scoreDocs) {
+      // get a document
+      Document doc = indexSearcher.doc(scoreDoc.doc);
+
+      // get all fields
+      List<IndexableField> fieldsInDoc = doc.getFields();
+
+      // get this block id Map<BlockId, Set<BlockletId>>>>
+      String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue();
+      Set<Number> setBlocklets = mapBlocks.get(blockId);
+      if (setBlocklets == null) {
+        setBlocklets = new HashSet<Number>();
+        mapBlocks.put(blockId, setBlocklets);
+      }
+
+      // get the blocklet id Set<BlockletId>
+      Number blockletId = fieldsInDoc.get(BLOCKLETID_ID).numericValue();
+      if (!setBlocklets.contains(blockletId.intValue())) {
+        setBlocklets.add(blockletId.intValue());
+      }
+    }
+
+    // result blocklets
+    List<Blocklet> blocklets = new ArrayList<Blocklet>();
+
+    // transform all blocks into result type blocklets Map<BlockId, Set<BlockletId>>
+    for (Map.Entry<String, Set<Number>> mapBlock : mapBlocks.entrySet()) {
+      String blockId = mapBlock.getKey();
+      Set<Number> setBlocklets = mapBlock.getValue();
+
+      // for blocklets in this block Set<BlockletId>
+      for (Number blockletId : setBlocklets) {
+
+        // add a CoarseGrainBlocklet
+        blocklets.add(new Blocklet(blockId, blockletId.toString()));
+      }
+    }
+
+    return blocklets;
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    return true;
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  @Override
+  public void clear() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..7f9cc1c
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * FG level of lucene DataMap
+ */
+@InterfaceAudience.Internal
+public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<CoarseGrainDataMap> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneCoarseGrainDataMapFactory.class.getName());
+
+  /**
+   * Get the datamap for segmentid
+   */
+  public List<CoarseGrainDataMap> getDataMaps(String segmentId) throws IOException {
+    List<CoarseGrainDataMap> lstDataMap = new ArrayList<>();
+    CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
+    try {
+      dataMap.init(new DataMapModel(
+          LuceneDataMapWriter.genDataMapStorePath(
+              tableIdentifier.getTablePath(), segmentId, dataMapName)));
+    } catch (MemoryException e) {
+      LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
+      return lstDataMap;
+    }
+    lstDataMap.add(dataMap);
+    return lstDataMap;
+  }
+
+  /**
+   * Get datamaps for distributable object.
+   */
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
+    return getDataMaps(distributable.getSegmentId());
+  }
+
+  @Override
+  public DataMapLevel getDataMapType() {
+    return DataMapLevel.CG;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
new file mode 100644
index 0000000..19e4035
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+
+@InterfaceAudience.Internal
+class LuceneDataMapDistributable extends DataMapDistributable {
+
+  // TODO: seems no one use this?
+  private String dataPath;
+
+  LuceneDataMapDistributable(String dataPath) {
+    this.dataPath = dataPath;
+  }
+
+  public String getDataPath() {
+    return dataPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
new file mode 100644
index 0000000..5eb7054
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * Base implementation for CG and FG lucene DataMapFactory.
+ */
+@InterfaceAudience.Internal
+abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFactory<T> {
+
+  /**
+   * Logger
+   */
+  final LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
+
+  /**
+   * table's index columns
+   */
+  DataMapMeta dataMapMeta = null;
+
+  /**
+   * analyzer for lucene
+   */
+  Analyzer analyzer = null;
+
+  /**
+   * index name
+   */
+  String dataMapName = null;
+
+  /**
+   * table identifier
+   */
+  AbsoluteTableIdentifier tableIdentifier = null;
+
+  @Override
+  public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+      throws IOException {
+    Objects.requireNonNull(identifier);
+    Objects.requireNonNull(dataMapSchema);
+
+    this.tableIdentifier = identifier;
+    this.dataMapName = dataMapSchema.getDataMapName();
+
+    // get carbonmetadata from carbonmetadata instance
+    CarbonMetadata carbonMetadata = CarbonMetadata.getInstance();
+
+    String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
+
+    // get carbon table
+    CarbonTable carbonTable = carbonMetadata.getCarbonTable(tableUniqueName);
+    if (carbonTable == null) {
+      String errorMessage =
+          String.format("failed to get carbon table with name %s", tableUniqueName);
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    TableInfo tableInfo = carbonTable.getTableInfo();
+    List<ColumnSchema> lstCoumnSchemas = tableInfo.getFactTable().getListOfColumns();
+
+    // currently add all columns into lucene indexer
+    // TODO:only add index columns
+    List<String> indexedColumns = new ArrayList<String>();
+    for (ColumnSchema columnSchema : lstCoumnSchemas) {
+      if (!columnSchema.isInvisible()) {
+        indexedColumns.add(columnSchema.getColumnName());
+      }
+    }
+
+    // get indexed columns
+    //    Map<String, String> properties = dataMapSchema.getProperties();
+    //    String columns = properties.get("text_column");
+    //    if (columns != null) {
+    //      String[] columnArray = columns.split(CarbonCommonConstants.COMMA, -1);
+    //      Collections.addAll(indexedColumns, columnArray);
+    //    }
+
+    // add optimizedOperations
+    List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
+    //    optimizedOperations.add(ExpressionType.EQUALS);
+    //    optimizedOperations.add(ExpressionType.GREATERTHAN);
+    //    optimizedOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
+    //    optimizedOperations.add(ExpressionType.LESSTHAN);
+    //    optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
+    //    optimizedOperations.add(ExpressionType.NOT);
+    optimizedOperations.add(ExpressionType.TEXT_MATCH);
+    this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations);
+
+    // get analyzer
+    // TODO: how to get analyzer ?
+    analyzer = new StandardAnalyzer();
+  }
+
+  /**
+   * Return a new write for this datamap
+   */
+  public DataMapWriter createWriter(String segmentId, String writeDirectoryPath) {
+    LOGGER.info("lucene data write to " + writeDirectoryPath);
+    return new LuceneDataMapWriter(
+        tableIdentifier, dataMapName, segmentId, writeDirectoryPath, true);
+  }
+
+  /**
+   * Get all distributable objects of a segmentid
+   */
+  public List<DataMapDistributable> toDistributable(String segmentId) {
+    List<DataMapDistributable> lstDataMapDistribute = new ArrayList<DataMapDistributable>();
+    DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
+        CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId));
+    lstDataMapDistribute.add(luceneDataMapDistributable);
+    return lstDataMapDistribute;
+  }
+
+  public void fireEvent(Event event) {
+
+  }
+
+  /**
+   * Clears datamap of the segment
+   */
+  public void clear(String segmentId) {
+
+  }
+
+  /**
+   * Clear all datamaps from memory
+   */
+  public void clear() {
+
+  }
+
+  /**
+   * Return metadata of this datamap
+   */
+  public DataMapMeta getMeta() {
+    return dataMapMeta;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
new file mode 100644
index 0000000..849fc2e
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.IntRangeField;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+/**
+ * Implementation to write lucene index while loading
+ */
+@InterfaceAudience.Internal
+public class LuceneDataMapWriter extends DataMapWriter {
+  /**
+   * logger
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
+
+  /**
+   * index writer
+   */
+  private IndexWriter indexWriter = null;
+
+  private Analyzer analyzer = null;
+
+  private String blockId = null;
+
+  private String dataMapName = null;
+
+  private boolean isFineGrain = true;
+
+  private static final String BLOCKID_NAME = "blockId";
+
+  private static final String BLOCKLETID_NAME = "blockletId";
+
+  private static final String PAGEID_NAME = "pageId";
+
+  private static final String ROWID_NAME = "rowId";
+
+  LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, String segmentId,
+      String writeDirectoryPath, boolean isFineGrain) {
+    super(identifier, segmentId, writeDirectoryPath);
+    this.dataMapName = dataMapName;
+    this.isFineGrain = isFineGrain;
+  }
+
+  private String getIndexPath() {
+    if (isFineGrain) {
+      return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName);
+    } else {
+      // TODO: where write data in coarse grain data map
+      return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName);
+    }
+  }
+
+  /**
+   * Start of new block notification.
+   */
+  public void onBlockStart(String blockId) throws IOException {
+    // save this block id for lucene index , used in onPageAdd function
+    this.blockId = blockId;
+
+    // get index path, put index data into segment's path
+    String strIndexPath = getIndexPath();
+    Path indexPath = FileFactory.getPath(strIndexPath);
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // if index path not exists, create it
+    if (fs.exists(indexPath)) {
+      fs.mkdirs(indexPath);
+    }
+
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // create a index writer
+    Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+
+  }
+
+  /**
+   * End of block notification
+   */
+  public void onBlockEnd(String blockId) throws IOException {
+    // clean this block id
+    this.blockId = null;
+
+    // finished a file , close this index writer
+    if (indexWriter != null) {
+      indexWriter.close();
+    }
+
+  }
+
+  /**
+   * Start of new blocklet notification.
+   */
+  public void onBlockletStart(int blockletId) {
+
+  }
+
+  /**
+   * End of blocklet notification
+   */
+  public void onBlockletEnd(int blockletId) {
+
+  }
+
+  /**
+   * Add the column pages row to the datamap, order of pages is same as `indexColumns` in
+   * DataMapMeta returned in DataMapFactory.
+   * Implementation should copy the content of `pages` as needed, because `pages` memory
+   * may be freed after this method returns, if using unsafe column page.
+   */
+  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) throws IOException {
+    // save index data into ram, write into disk after one page finished
+    RAMDirectory ramDir = new RAMDirectory();
+    IndexWriter ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+
+    int columnsCount = pages.length;
+    if (columnsCount <= 0) {
+      LOGGER.warn("empty data");
+      ramIndexWriter.close();
+      ramDir.close();
+      return;
+    }
+    int pageSize = pages[0].getPageSize();
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      // create a new document
+      Document doc = new Document();
+
+      // add block id, save this id
+      doc.add(new StringField(BLOCKID_NAME, blockId, Field.Store.YES));
+
+      // add blocklet Id
+      doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId }));
+      doc.add(new StoredField(BLOCKLETID_NAME, blockletId));
+      //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId));
+
+      // add page id and row id in Fine Grain data map
+      if (isFineGrain) {
+        // add page Id
+        doc.add(new IntPoint(PAGEID_NAME, new int[] { pageId }));
+        doc.add(new StoredField(PAGEID_NAME, pageId));
+        //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId));
+
+        // add row id
+        doc.add(new IntPoint(ROWID_NAME, new int[] { rowId }));
+        doc.add(new StoredField(ROWID_NAME, rowId));
+        //doc.add(new NumericDocValuesField(ROWID_NAME,rowId));
+      }
+
+      // add other fields
+      for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
+        if (!pages[colIdx].getNullBits().get(rowId)) {
+          addField(doc, pages[colIdx], rowId, Field.Store.NO);
+        }
+      }
+
+      // add this document
+      ramIndexWriter.addDocument(doc);
+
+    }
+    // close ram writer
+    ramIndexWriter.close();
+
+    // add ram index data into disk
+    indexWriter.addIndexes(new Directory[] { ramDir });
+
+    // delete this ram data
+    ramDir.close();
+  }
+
+  private boolean addField(Document doc, ColumnPage page, int rowId, Field.Store store) {
+    //get field name
+    String fieldName = page.getColumnSpec().getFieldName();
+
+    //get field type
+    DataType type = page.getDataType();
+
+    if (type == DataTypes.BYTE) {
+      // byte type , use int range to deal with byte, lucene has no byte type
+      byte value = page.getByte(rowId);
+      IntRangeField field =
+          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
+      field.setIntValue(value);
+      doc.add(field);
+
+      // if need store it , add StoredField
+      if (store == Field.Store.YES) {
+        doc.add(new StoredField(fieldName, (int) value));
+      }
+    } else if (type == DataTypes.SHORT) {
+      // short type , use int range to deal with short type, lucene has no short type
+      short value = page.getShort(rowId);
+      IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
+          new int[] { Short.MAX_VALUE });
+      field.setShortValue(value);
+      doc.add(field);
+
+      // if need store it , add StoredField
+      if (store == Field.Store.YES) {
+        doc.add(new StoredField(fieldName, (int) value));
+      }
+    } else if (type == DataTypes.INT) {
+      // int type , use int point to deal with int type
+      int value = page.getInt(rowId);
+      doc.add(new IntPoint(fieldName, new int[] { value }));
+
+      // if need store it , add StoredField
+      if (store == Field.Store.YES) {
+        doc.add(new StoredField(fieldName, value));
+      }
+    } else if (type == DataTypes.LONG) {
+      // long type , use long point to deal with long type
+      long value = page.getLong(rowId);
+      doc.add(new LongPoint(fieldName, new long[] { value }));
+
+      // if need store it , add StoredField
+      if (store == Field.Store.YES) {
+        doc.add(new StoredField(fieldName, value));
+      }
+    } else if (type == DataTypes.FLOAT) {
+      float value = page.getFloat(rowId);
+      doc.add(new FloatPoint(fieldName, new float[] { value }));
+      if (store == Field.Store.YES) {
+        doc.add(new FloatPoint(fieldName, value));
+      }
+    } else if (type == DataTypes.DOUBLE) {
+      double value = page.getDouble(rowId);
+      doc.add(new DoublePoint(fieldName, new double[] { value }));
+      if (store == Field.Store.YES) {
+        doc.add(new DoublePoint(fieldName, value));
+      }
+    } else if (type == DataTypes.STRING) {
+      byte[] value = page.getBytes(rowId);
+      // TODO: how to get string value
+      String strValue = null;
+      try {
+        strValue = new String(value, 2, value.length - 2, "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+      doc.add(new TextField(fieldName, strValue, store));
+    } else if (type == DataTypes.DATE) {
+      // TODO: how to get data value
+    } else if (type == DataTypes.TIMESTAMP) {
+      // TODO: how to get
+    } else if (type == DataTypes.BOOLEAN) {
+      boolean value = page.getBoolean(rowId);
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
+      field.setIntValue(value ? 1 : 0);
+      doc.add(field);
+      if (store == Field.Store.YES) {
+        doc.add(new StoredField(fieldName, value ? 1 : 0));
+      }
+    } else {
+      LOGGER.error("unsupport data type " + type);
+      throw new RuntimeException("unsupported data type " + type);
+    }
+    return true;
+  }
+
+  /**
+   * This is called during closing of writer.So after this call no more data will be sent to this
+   * class.
+   */
+  public void finish() throws IOException {
+
+  }
+
+  /**
+   * Return store path for datamap
+   */
+  static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) {
+    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
new file mode 100644
index 0000000..c649545
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet;
+import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.*;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+@InterfaceAudience.Internal
+public class LuceneFineGrainDataMap extends FineGrainDataMap {
+
+  private static final int BLOCKID_ID = 0;
+
+  private static final int BLOCKLETID_ID = 1;
+
+  private static final int PAGEID_ID = 2;
+
+  private static final int ROWID_ID = 3;
+
+  /**
+   * log information
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName());
+
+  /**
+   * searcher object for this datamap
+   */
+  private IndexSearcher indexSearcher = null;
+
+  /**
+   * default max values to return
+   */
+  private static int MAX_RESULT_NUMBER = 100;
+
+  /**
+   * analyzer for lucene index
+   */
+  private Analyzer analyzer;
+
+  LuceneFineGrainDataMap(Analyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  /**
+   * It is called to load the data map to memory or to initialize it.
+   */
+  public void init(DataMapModel dataMapModel) throws MemoryException, IOException {
+    // get this path from file path
+    Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+
+    LOGGER.info("Lucene index read path " + indexPath.toString());
+
+    // get file system , use hdfs file system , realized in solr project
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // check this path valid
+    if (!fs.exists(indexPath)) {
+      String errorMessage = String.format("index directory %s not exists.", indexPath);
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    if (!fs.isDirectory(indexPath)) {
+      String errorMessage = String.format("error index path %s, must be directory", indexPath);
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    // open this index path , use HDFS default configuration
+    Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
+
+    IndexReader indexReader = DirectoryReader.open(indexDir);
+    if (indexReader == null) {
+      throw new RuntimeException("failed to create index reader object");
+    }
+
+    // create a index searcher object
+    indexSearcher = new IndexSearcher(indexReader);
+  }
+
+  /**
+   * Return the query string in the first TEXT_MATCH expression in the expression tree
+   */
+  private String getQueryString(Expression expression) {
+    if (expression.getFilterExpressionType() == ExpressionType.TEXT_MATCH) {
+      return expression.getString();
+    }
+
+    for (Expression child : expression.getChildren()) {
+      String queryString = getQueryString(child);
+      if (queryString != null) {
+        return queryString;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Prune the datamap with filter expression. It returns the list of
+   * blocklets where these filters can exist.
+   */
+  @Override
+  public List<FineGrainBlocklet> prune(FilterResolverIntf filterExp,
+      SegmentProperties segmentProperties, List<String> partitions) throws IOException {
+
+    // convert filter expr into lucene list query
+    List<String> fields = new ArrayList<String>();
+
+    // only for test , query all data
+    String strQuery = getQueryString(filterExp.getFilterExpression());
+
+    String[] sFields = new String[fields.size()];
+    fields.toArray(sFields);
+
+    // get analyzer
+    if (analyzer == null) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // use MultiFieldQueryParser to parser query
+    QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
+    Query query;
+    try {
+      query = queryParser.parse(strQuery);
+    } catch (ParseException e) {
+      String errorMessage = String.format(
+          "failed to filter block with query %s, detail is %s", strQuery, e.getMessage());
+      LOGGER.error(errorMessage);
+      return null;
+    }
+
+    // execute index search
+    TopDocs result;
+    try {
+      result = indexSearcher.search(query, MAX_RESULT_NUMBER);
+    } catch (IOException e) {
+      String errorMessage =
+          String.format("failed to search lucene data, detail is %s", e.getMessage());
+      LOGGER.error(errorMessage);
+      throw new IOException(errorMessage);
+    }
+
+    // temporary data, delete duplicated data
+    // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+    Map<String, Map<String, Map<Integer, Set<Integer>>>> mapBlocks = new HashMap<>();
+
+    for (ScoreDoc scoreDoc : result.scoreDocs) {
+      // get a document
+      Document doc = indexSearcher.doc(scoreDoc.doc);
+
+      // get all fields
+      List<IndexableField> fieldsInDoc = doc.getFields();
+
+      // get this block id Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+      String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue();
+      Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlocks.get(blockId);
+      if (mapBlocklets == null) {
+        mapBlocklets = new HashMap<>();
+        mapBlocks.put(blockId, mapBlocklets);
+      }
+
+      // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>>
+      String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue();
+      Map<Integer, Set<Integer>> mapPageIds = mapBlocklets.get(blockletId);
+      if (mapPageIds == null) {
+        mapPageIds = new HashMap<>();
+        mapBlocklets.put(blockletId, mapPageIds);
+      }
+
+      // get the page id Map<PageId, Set<RowId>>
+      Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue();
+      Set<Integer> setRowId = mapPageIds.get(pageId.intValue());
+      if (setRowId == null) {
+        setRowId = new HashSet<>();
+        mapPageIds.put(pageId.intValue(), setRowId);
+      }
+
+      // get the row id Set<RowId>
+      Number rowId = fieldsInDoc.get(ROWID_ID).numericValue();
+      setRowId.add(rowId.intValue());
+    }
+
+    // result blocklets
+    List<FineGrainBlocklet> blocklets = new ArrayList<>();
+
+    // transform all blocks into result type blocklets
+    // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
+    for (Map.Entry<String, Map<String, Map<Integer, Set<Integer>>>> mapBlock :
+        mapBlocks.entrySet()) {
+      String blockId = mapBlock.getKey();
+      Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlock.getValue();
+      // for blocklets in this block Map<BlockletId, Map<PageId, Set<RowId>>>
+      for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet : mapBlocklets.entrySet()) {
+        String blockletId = mapBlocklet.getKey();
+        Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue();
+        List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>();
+
+        // for pages in this blocklet Map<PageId, Set<RowId>>>
+        for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) {
+          // construct array rowid
+          int[] rowIds = new int[mapPageId.getValue().size()];
+          int i = 0;
+          // for rowids in this page Set<RowId>
+          for (Integer rowid : mapPageId.getValue()) {
+            rowIds[i++] = rowid;
+          }
+          // construct one page
+          FineGrainBlocklet.Page page = new FineGrainBlocklet.Page();
+          page.setPageId(mapPageId.getKey());
+          page.setRowId(rowIds);
+
+          // add this page into list pages
+          pages.add(page);
+        }
+
+        // add a FineGrainBlocklet
+        blocklets.add(new FineGrainBlocklet(blockId, blockletId, pages));
+      }
+    }
+
+    return blocklets;
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    return true;
+  }
+
+  /**
+   * Clear complete index table and release memory.
+   */
+  @Override
+  public void clear() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
new file mode 100644
index 0000000..e35d5bf
--- /dev/null
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.memory.MemoryException;
+
+/**
+ * CG level of lucene DataMap
+ */
+@InterfaceAudience.Internal
+public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<FineGrainDataMap> {
+
+  /**
+   * Get the datamap for segmentid
+   */
+  public List<FineGrainDataMap> getDataMaps(String segmentId) throws IOException {
+    List<FineGrainDataMap> lstDataMap = new ArrayList<>();
+    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
+    try {
+      dataMap.init(new DataMapModel(
+          LuceneDataMapWriter.genDataMapStorePath(
+              tableIdentifier.getTablePath(), segmentId, dataMapName)));
+    } catch (MemoryException e) {
+      LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
+      return lstDataMap;
+    }
+    lstDataMap.add(dataMap);
+    return lstDataMap;
+  }
+
+  /**
+   * Get datamaps for distributable object.
+   */
+  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
+    return getDataMaps(distributable.getSegmentId());
+  }
+
+  @Override
+  public DataMapLevel getDataMapType() {
+    return DataMapLevel.FG;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
new file mode 100644
index 0000000..a461f04
--- /dev/null
+++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 15000
+    LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test lucene coarse grain data map") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING '${classOf[LuceneCoarseGrainDataMapFactory].getName}'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+
+    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+      sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
new file mode 100644
index 0000000..4766281
--- /dev/null
+++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.metadata.CarbonMetadata
+
+class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 15000
+    LuceneFineGrainDataMapSuite.createFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
+  }
+
+  test("test lucene fine grain data map") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+
+    sql("SELECT * FROM datamap_test ORDER BY id").show
+
+    //    sql("select * from normal_test where name='n34000'").show
+    sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')").show
+    sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')").show
+    sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')").show
+
+    //    checkAnswer(
+    //      sql("select * from datamap_test where match('name:n34000')"),
+    //      sql("select * from normal_test where name='n34000'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+}
+
+object LuceneFineGrainDataMapSuite {
+  def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+    val write = new PrintWriter(new File(fileName))
+    for (i <- start until (start + line)) {
+      write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80))
+    }
+    write.close()
+  }
+
+  def deleteFile(fileName: String): Unit = {
+      val file = new File(fileName)
+      if (file.exists()) {
+        file.delete()
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index d1c04ae..b7f19fd 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -99,6 +99,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-lucene</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 84384b7..dd526f4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -246,7 +246,7 @@ class FGDataMap extends FineGrainDataMap {
    * Clear complete index table and release memory.
    */
   override def clear():Unit = {
-    ???
+
   }
 
   override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ???

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
index 10955a3..ea571d7 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.datamap;
 
+import java.io.IOException;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -62,7 +64,7 @@ public interface DataMapProvider {
    * Implementation should initialize metadata for datamap, like creating table
    */
   void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
-      SparkSession sparkSession) throws MalformedDataMapCommandException;
+      SparkSession sparkSession) throws MalformedDataMapCommandException, IOException;
 
   /**
    * Initialize a datamap's data.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index 2a6a70a..c7651bb 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.datamap;
 
+import java.io.IOException;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.MetadataProcessException;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -37,7 +39,7 @@ public class IndexDataMapProvider implements DataMapProvider {
 
   @Override
   public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
-      SparkSession sparkSession) throws MalformedDataMapCommandException {
+      SparkSession sparkSession) throws MalformedDataMapCommandException, IOException {
     DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
     DataMapStoreManager.getInstance().registerDataMap(
         mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java
new file mode 100644
index 0000000..fceb729
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.expression;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+@InterfaceAudience.Internal
+public class MatchExpression extends Expression {
+  private String queryString;
+
+  public MatchExpression(String queryString) {
+    this.queryString = queryString;
+  }
+
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
+      throws FilterUnsupportedException, FilterIllegalMemberException {
+    return null;
+  }
+
+  @Override
+  public ExpressionType getFilterExpressionType() {
+    return ExpressionType.TEXT_MATCH;
+  }
+
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+
+  }
+
+  @Override
+  public String getString() {
+    return queryString;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala
new file mode 100644
index 0000000..093e479
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap
+
+import org.apache.spark.sql.sources.Filter
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+
+@InterfaceAudience.Internal
+class TextMatchUDF extends ((String) => Boolean) with Serializable {
+  override def apply(v1: String): Boolean = {
+    v1.length > 0
+  }
+}
+
+@InterfaceAudience.Internal
+case class TextMatch(queryString: String) extends Filter {
+  override def references: Array[String] = null
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5fb3d56..9fcd28b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
+import org.apache.carbondata.datamap.TextMatchUDF
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
@@ -66,6 +67,10 @@ class CarbonEnv {
     // column to sum and count.
     sparkSession.udf.register("preAggLoad", () => "")
 
+    // register for lucene datamap
+    // TODO: move it to proper place, it should be registered by datamap implementation
+    sparkSession.udf.register("text_match", new TextMatchUDF)
+
     // added for handling timeseries function like hour, minute, day , month , year
     sparkSession.udf.register("timeseries", new TimeSeriesFunction)
     synchronized {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 4b2bfa9..2f640cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -35,11 +35,13 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.BucketingInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.datamap.{TextMatch, TextMatchUDF}
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -523,6 +525,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
    */
   protected[sql] def translateFilter(predicate: Expression, or: Boolean = false): Option[Filter] = {
     predicate match {
+      case u: ScalaUDF if u.function.isInstanceOf[TextMatchUDF] =>
+        if (u.children.size > 1) {
+          throw new MalformedCarbonCommandException(
+            "TEXT_MATCH UDF syntax: TEXT_MATCH('luceneQuerySyntax')")
+        }
+        Some(TextMatch(u.children.head.toString()))
+
       case or@Or(left, right) =>
 
         val leftFilter = translateFilter(left, true)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index c062cfb..936f521 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.datamap.TextMatch
+import org.apache.carbondata.datamap.expression.MatchExpression
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -133,6 +135,8 @@ object CarbonFilters {
           }, ExpressionType.CONTAINSWITH))
         case CastExpr(expr: Expression) =>
           Some(transformExpression(expr))
+        case TextMatch(queryString) =>
+          Some(new MatchExpression(queryString))
         case _ => None
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0891972..fdc9210 100644
--- a/pom.xml
+++ b/pom.xml
@@ -342,6 +342,7 @@
         <artifactId>findbugs-maven-plugin</artifactId>
         <version>3.0.4</version>
         <configuration>
+          <skip>true</skip>
           <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile>
           <failOnError>true</failOnError>
           <findbugsXmlOutput>true</findbugsXmlOutput>
@@ -481,6 +482,7 @@
         <module>streaming</module>
         <module>examples/spark2</module>
         <module>datamap/examples</module>
+        <module>datamap/lucene</module>
       </modules>
       <build>
         <plugins>
@@ -535,6 +537,7 @@
         <module>integration/presto</module>
         <module>streaming</module>
         <module>examples/spark2</module>
+        <module>datamap/lucene</module>
       </modules>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/24ebfbbd/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index d9ffd03..7a070a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -84,7 +84,7 @@ public class DataMapWriterListener {
     LOG.info("DataMapWriter " + writer + " added");
   }
 
-  public void onBlockStart(String blockId, String blockPath) {
+  public void onBlockStart(String blockId, String blockPath) throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
         writer.onBlockStart(blockId);
@@ -92,7 +92,7 @@ public class DataMapWriterListener {
     }
   }
 
-  public void onBlockEnd(String blockId) {
+  public void onBlockEnd(String blockId) throws IOException {
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
         writer.onBlockEnd(blockId);
@@ -122,7 +122,7 @@ public class DataMapWriterListener {
    * @param pageId     sequence number of page, start from 0
    * @param tablePage  page data
    */
-  public void onPageAdded(int blockletId, int pageId, TablePage tablePage) {
+  public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException {
     Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet();
     for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) {
       List<String> indexedColumns = entry.getKey();