You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:27 UTC

[02/50] [abbrv] carbondata git commit: [CARBONDATA-2494] Fix lucene datasize and performance

[CARBONDATA-2494] Fix lucene datasize and performance

Improved lucene datamap size and performance by using the following parameters.
New DM properties
1.flush_cache: size of the cache to maintain in Lucene writer, if specified then it tries to aggregate the unique data till the cache limit and flush to Lucene. It is best suitable for low cardinality dimensions.
2.split_blocklet: when made as true then store the data in blocklet wise in lucene , it means new folder will be created for each blocklet thus it eliminates storing on blockletid in lucene. And also it makes lucene small chunks of data.

This closes #2275


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f184de88
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f184de88
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f184de88

Branch: refs/heads/spark-2.3
Commit: f184de885a4656c654812c1244891732af788a39
Parents: cf55028
Author: ravipesala <ra...@gmail.com>
Authored: Sun May 6 23:42:09 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Mon May 21 09:10:59 2018 +0800

----------------------------------------------------------------------
 .../datamap/lucene/LuceneDataMapBuilder.java    | 159 +++------
 .../lucene/LuceneDataMapFactoryBase.java        |  68 +++-
 .../datamap/lucene/LuceneDataMapWriter.java     | 345 ++++++++++++++-----
 .../datamap/lucene/LuceneFineGrainDataMap.java  | 206 +++++++----
 .../lucene/LuceneFineGrainDataMapFactory.java   |   4 +-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  44 +++
 6 files changed, 559 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f184de88/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
index 35c07f0..eb70220 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
@@ -17,8 +17,12 @@
 
 package org.apache.carbondata.datamap.lucene;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -26,32 +30,25 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import static org.apache.carbondata.datamap.lucene.LuceneDataMapWriter.addData;
+import static org.apache.carbondata.datamap.lucene.LuceneDataMapWriter.addToCache;
+import static org.apache.carbondata.datamap.lucene.LuceneDataMapWriter.flushCache;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
 import org.apache.lucene.codecs.lucene62.Lucene62Codec;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoublePoint;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FloatPoint;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.IntRangeField;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
 import org.apache.solr.store.hdfs.HdfsDirectory;
+import org.roaringbitmap.RoaringBitmap;
 
 public class LuceneDataMapBuilder implements DataMapBuilder {
 
@@ -66,21 +63,38 @@ public class LuceneDataMapBuilder implements DataMapBuilder {
 
   private IndexWriter indexWriter = null;
 
-  private IndexWriter pageIndexWriter = null;
-
   private Analyzer analyzer = null;
 
-  LuceneDataMapBuilder(String tablePath, String dataMapName,
-      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
-    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
-        tablePath, segment.getSegmentNo(), dataMapName, shardName);
+  private int writeCacheSize;
+
+  private Map<LuceneDataMapWriter.LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache =
+      new HashMap<>();
+
+  private ByteBuffer intBuffer = ByteBuffer.allocate(4);
+
+  private boolean storeBlockletWise;
+
+  private int currentBlockletId = -1;
+
+  LuceneDataMapBuilder(String tablePath, String dataMapName, Segment segment, String shardName,
+      List<CarbonColumn> indexColumns, int writeCacheSize, boolean storeBlockletWise) {
+    this.dataMapPath = CarbonTablePath
+        .getDataMapStorePathOnShardName(tablePath, segment.getSegmentNo(), dataMapName, shardName);
     this.indexColumns = indexColumns;
     this.columnsCount = indexColumns.size();
+    this.writeCacheSize = writeCacheSize;
+    this.storeBlockletWise = storeBlockletWise;
   }
 
   @Override
   public void initialize() throws IOException {
-    // get index path, put index data into segment's path
+    if (!storeBlockletWise) {
+      // get index path, put index data into segment's path
+      indexWriter = createIndexWriter(dataMapPath);
+    }
+  }
+
+  private IndexWriter createIndexWriter(String dataMapPath) throws IOException {
     Path indexPath = FileFactory.getPath(dataMapPath);
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
@@ -111,107 +125,44 @@ public class LuceneDataMapBuilder implements DataMapBuilder {
           .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
     }
 
-    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
-  }
-
-  private IndexWriter createPageIndexWriter() throws IOException {
-    // save index data into ram, write into disk after one page finished
-    RAMDirectory ramDir = new RAMDirectory();
-    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
-  }
-
-  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
-
-    Directory directory = pageIndexWriter.getDirectory();
-
-    // close ram writer
-    pageIndexWriter.close();
-
-    // add ram index data into disk
-    indexWriter.addIndexes(directory);
-
-    // delete this ram data
-    directory.close();
+    return new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
   }
 
   @Override
-  public void addRow(int blockletId, int pageId, int rowId, Object[] values) throws IOException {
-    if (rowId == 0) {
-      if (pageIndexWriter != null) {
-        addPageIndex(pageIndexWriter);
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values)
+      throws IOException {
+    if (storeBlockletWise) {
+      if (currentBlockletId != blockletId) {
+        close();
+        indexWriter = createIndexWriter(dataMapPath + File.separator + blockletId);
+        currentBlockletId = blockletId;
       }
-      pageIndexWriter = createPageIndexWriter();
     }
-
-    // create a new document
-    Document doc = new Document();
-
-    // add blocklet Id
-    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
-    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) values[columnsCount]));
-
-    // add page id
-    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
-    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) values[columnsCount + 1]));
-
-    // add row id
-    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
-    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
-
     // add other fields
+    LuceneDataMapWriter.LuceneColumnKeys columns =
+        new LuceneDataMapWriter.LuceneColumnKeys(columnsCount);
     for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
-      CarbonColumn column = indexColumns.get(colIdx);
-      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
+      columns.getColValues()[colIdx] = values[colIdx];
+    }
+    if (writeCacheSize > 0) {
+      addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise);
+      flushCacheIfPossible();
+    } else {
+      addData(columns, rowId, pageId, blockletId, intBuffer, indexWriter, indexColumns,
+          storeBlockletWise);
     }
 
-    pageIndexWriter.addDocument(doc);
   }
 
-  private boolean addField(Document doc, String fieldName, DataType type, Object value) {
-    if (type == DataTypes.STRING) {
-      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
-    } else if (type == DataTypes.BYTE) {
-      // byte type , use int range to deal with byte, lucene has no byte type
-      IntRangeField field =
-          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
-      field.setIntValue((int) value);
-      doc.add(field);
-    } else if (type == DataTypes.SHORT) {
-      // short type , use int range to deal with short type, lucene has no short type
-      IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
-          new int[] { Short.MAX_VALUE });
-      field.setShortValue((short) value);
-      doc.add(field);
-    } else if (type == DataTypes.INT) {
-      // int type , use int point to deal with int type
-      doc.add(new IntPoint(fieldName, (int) value));
-    } else if (type == DataTypes.LONG) {
-      // long type , use long point to deal with long type
-      doc.add(new LongPoint(fieldName, (long) value));
-    } else if (type == DataTypes.FLOAT) {
-      doc.add(new FloatPoint(fieldName, (float) value));
-    } else if (type == DataTypes.DOUBLE) {
-      doc.add(new DoublePoint(fieldName, (double) value));
-    } else if (type == DataTypes.DATE) {
-      // TODO: how to get data value
-    } else if (type == DataTypes.TIMESTAMP) {
-      // TODO: how to get
-    } else if (type == DataTypes.BOOLEAN) {
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
-      field.setIntValue((boolean) value ? 1 : 0);
-      doc.add(field);
-    } else {
-      LOGGER.error("unsupport data type " + type);
-      throw new RuntimeException("unsupported data type " + type);
+  private void flushCacheIfPossible() throws IOException {
+    if (cache.size() >= writeCacheSize) {
+      flushCache(cache, indexColumns, indexWriter, storeBlockletWise);
     }
-    return true;
   }
 
   @Override
   public void finish() throws IOException {
-    if (indexWriter != null && pageIndexWriter != null) {
-      addPageIndex(pageIndexWriter);
-    }
+    flushCache(cache, indexColumns, indexWriter, storeBlockletWise);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f184de88/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 4bcdebb..fab0565 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -62,6 +62,29 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer;
 abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactory<T> {
 
   /**
+   * Size of the cache to maintain in Lucene writer, if specified then it tries to aggregate the
+   * unique data till the cache limit and flush to Lucene.
+   * It is best suitable for low cardinality dimensions.
+   */
+  static final String FLUSH_CACHE = "flush_cache";
+
+  /**
+   * By default it does not use any cache.
+   */
+  static final String FLUSH_CACHE_DEFAULT_SIZE = "-1";
+
+  /**
+   * when made as true then store the data in blocklet wise in lucene , it means new folder will be
+   * created for each blocklet thus it eliminates storing on blockletid in lucene.
+   * And also it makes lucene small chuns of data
+   */
+  static final String SPLIT_BLOCKLET = "split_blocklet";
+
+  /**
+   * By default it is false
+   */
+  static final String SPLIT_BLOCKLET_DEFAULT = "true";
+  /**
    * Logger
    */
   final LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
@@ -86,6 +109,12 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
    */
   AbsoluteTableIdentifier tableIdentifier = null;
 
+  List<CarbonColumn> indexedCarbonColumns = null;
+
+  int flushCacheSize;
+
+  boolean storeBlockletWise;
+
   public LuceneDataMapFactoryBase(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws MalformedDataMapCommandException {
     super(carbonTable, dataMapSchema);
@@ -96,7 +125,9 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
     this.dataMapName = dataMapSchema.getDataMapName();
 
     // validate DataMapSchema and get index columns
-    List<CarbonColumn> indexedColumns =  carbonTable.getIndexedColumns(dataMapSchema);
+    indexedCarbonColumns =  carbonTable.getIndexedColumns(dataMapSchema);;
+    flushCacheSize = validateAndGetWriteCacheSize(dataMapSchema);
+    storeBlockletWise = validateAndGetStoreBlockletWise(dataMapSchema);
 
     // add optimizedOperations
     List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
@@ -107,13 +138,39 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
     // optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO);
     // optimizedOperations.add(ExpressionType.NOT);
     optimizedOperations.add(ExpressionType.TEXT_MATCH);
-    this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations);
-
+    this.dataMapMeta = new DataMapMeta(indexedCarbonColumns, optimizedOperations);
     // get analyzer
     // TODO: how to get analyzer ?
     analyzer = new StandardAnalyzer();
   }
 
+  public static int validateAndGetWriteCacheSize(DataMapSchema schema) {
+    String cacheStr = schema.getProperties().get(FLUSH_CACHE);
+    if (cacheStr == null) {
+      cacheStr = FLUSH_CACHE_DEFAULT_SIZE;
+    }
+    int cacheSize;
+    try {
+      cacheSize = Integer.parseInt(cacheStr);
+    } catch (NumberFormatException e) {
+      cacheSize = -1;
+    }
+    return cacheSize;
+  }
+
+  public static boolean validateAndGetStoreBlockletWise(DataMapSchema schema) {
+    String splitBlockletStr = schema.getProperties().get(SPLIT_BLOCKLET);
+    if (splitBlockletStr == null) {
+      splitBlockletStr = SPLIT_BLOCKLET_DEFAULT;
+    }
+    boolean splitBlockletWise;
+    try {
+      splitBlockletWise = Boolean.parseBoolean(splitBlockletStr);
+    } catch (NumberFormatException e) {
+      splitBlockletWise = true;
+    }
+    return splitBlockletWise;
+  }
   /**
    * this method will delete the datamap folders during drop datamap
    * @throws MalformedDataMapCommandException
@@ -149,13 +206,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
   public DataMapWriter createWriter(Segment segment, String shardName) {
     LOGGER.info("lucene data write to " + shardName);
     return new LuceneDataMapWriter(getCarbonTable().getTablePath(), dataMapName,
-        dataMapMeta.getIndexedColumns(), segment, shardName, true);
+        dataMapMeta.getIndexedColumns(), segment, shardName, flushCacheSize,
+        storeBlockletWise);
   }
 
   @Override
   public DataMapBuilder createBuilder(Segment segment, String shardName) {
     return new LuceneDataMapBuilder(getCarbonTable().getTablePath(), dataMapName,
-        segment, shardName, dataMapMeta.getIndexedColumns());
+        segment, shardName, dataMapMeta.getIndexedColumns(), flushCacheSize, storeBlockletWise);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f184de88/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 3615936..759b607 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -17,9 +17,14 @@
 
 package org.apache.carbondata.datamap.lucene;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
@@ -55,6 +60,8 @@ import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.RAMDirectory;
 import org.apache.solr.store.hdfs.HdfsDirectory;
+import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.RoaringBitmap;
 
 /**
  * Implementation to write lucene index while loading
@@ -74,30 +81,64 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   private Analyzer analyzer = null;
 
-  private boolean isFineGrain = true;
+  public static final String PAGEID_NAME = "pageId";
 
-  public static final String BLOCKLETID_NAME = "blockletId";
+  public static final String ROWID_NAME = "rowId";
 
-  private String indexShardName = null;
+  private Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache = new HashMap<>();
 
-  public static final String PAGEID_NAME = "pageId";
+  private int cacheSize;
 
-  public static final String ROWID_NAME = "rowId";
+  private ByteBuffer intBuffer = ByteBuffer.allocate(4);
+
+  private boolean storeBlockletWise;
 
   LuceneDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, boolean isFineGrain) {
+      Segment segment, String shardName, int flushSize,
+      boolean storeBlockletWise) {
     super(tablePath, dataMapName, indexColumns, segment, shardName);
-    this.isFineGrain = isFineGrain;
+    this.cacheSize = flushSize;
+    this.storeBlockletWise = storeBlockletWise;
   }
 
   /**
    * Start of new block notification.
    */
   public void onBlockStart(String blockId) throws IOException {
+
+  }
+
+  /**
+   * End of block notification
+   */
+  public void onBlockEnd(String blockId) throws IOException {
+
+  }
+
+  private RAMDirectory ramDir;
+  private IndexWriter ramIndexWriter;
+
+  /**
+   * Start of new blocklet notification.
+   */
+  public void onBlockletStart(int blockletId) throws IOException {
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
+    // save index data into ram, write into disk after one page finished
+    ramDir = new RAMDirectory();
+    ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+
     if (indexWriter != null) {
       return;
     }
     // get index path, put index data into segment's path
+    String dataMapPath;
+    if (storeBlockletWise) {
+      dataMapPath = this.dataMapPath + File.separator + blockletId;
+    } else {
+      dataMapPath = this.dataMapPath;
+    }
     Path indexPath = FileFactory.getPath(dataMapPath);
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
@@ -108,10 +149,6 @@ public class LuceneDataMapWriter extends DataMapWriter {
       }
     }
 
-    if (null == analyzer) {
-      analyzer = new StandardAnalyzer();
-    }
-
     // the indexWriter closes the FileSystem on closing the writer, so for a new configuration
     // and disable the cache for the index writer, it will be closed on closing the writer
     Configuration conf = new Configuration();
@@ -131,26 +168,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
           .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
     }
 
-    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
-  }
-
-  /**
-   * End of block notification
-   */
-  public void onBlockEnd(String blockId) throws IOException {
-
-  }
-
-  private RAMDirectory ramDir;
-  private IndexWriter ramIndexWriter;
-
-  /**
-   * Start of new blocklet notification.
-   */
-  public void onBlockletStart(int blockletId) throws IOException {
-    // save index data into ram, write into disk after one page finished
-    ramDir = new RAMDirectory();
-    ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+    indexWriter = new IndexWriter(indexDir, indexWriterConfig);
   }
 
   /**
@@ -165,6 +183,12 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
     // delete this ram data
     ramDir.close();
+
+    if (storeBlockletWise) {
+      flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
+      indexWriter.close();
+      indexWriter = null;
+    }
   }
 
   /**
@@ -175,52 +199,39 @@ public class LuceneDataMapWriter extends DataMapWriter {
    */
   public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages)
       throws IOException {
+    // save index data into ram, write into disk after one page finished
+    int columnsCount = pages.length;
+    if (columnsCount <= 0) {
+      LOGGER.warn("No data in the page " + pageId + "with blockletid " + blockletId
+          + " to write lucene datamap");
+      return;
+    }
     for (int rowId = 0; rowId < pageSize; rowId++) {
-      // create a new document
-      Document doc = new Document();
-      // add blocklet Id
-      doc.add(new IntPoint(BLOCKLETID_NAME, blockletId));
-      doc.add(new StoredField(BLOCKLETID_NAME, blockletId));
-      //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId));
-
-      // add page id and row id in Fine Grain data map
-      if (isFineGrain) {
-        // add page Id
-        doc.add(new IntPoint(PAGEID_NAME, pageId));
-        doc.add(new StoredField(PAGEID_NAME, pageId));
-        //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId));
-
-        // add row id
-        doc.add(new IntPoint(ROWID_NAME, rowId));
-        doc.add(new StoredField(ROWID_NAME, rowId));
-        //doc.add(new NumericDocValuesField(ROWID_NAME,rowId));
-      }
-
       // add indexed columns value into the document
-      List<CarbonColumn> indexColumns = getIndexColumns();
-      for (int i = 0; i < pages.length; i++) {
-        // add to lucene only if value is not null
-        if (!pages[i].getNullBits().get(rowId)) {
-          addField(doc, pages[i].getData(rowId), indexColumns.get(i), Field.Store.NO);
+      LuceneColumnKeys columns = new LuceneColumnKeys(getIndexColumns().size());
+      int i = 0;
+      for (ColumnPage page : pages) {
+        if (!page.getNullBits().get(rowId)) {
+          columns.colValues[i++] = getValue(page, rowId);
         }
       }
-
-      // add this document
-      ramIndexWriter.addDocument(doc);
+      if (cacheSize > 0) {
+        addToCache(columns, rowId, pageId, blockletId, cache, intBuffer, storeBlockletWise);
+      } else {
+        addData(columns, rowId, pageId, blockletId, intBuffer, ramIndexWriter, getIndexColumns(),
+            storeBlockletWise);
+      }
+    }
+    if (cacheSize > 0) {
+      flushCacheIfPossible();
     }
-
   }
 
-  private boolean addField(Document doc, Object data, CarbonColumn column, Field.Store store) {
+  private static void addField(Document doc, Object key, String fieldName, Field.Store store) {
     //get field name
-    String fieldName = column.getColName();
-
-    //get field type
-    DataType type = column.getDataType();
-
-    if (type == DataTypes.BYTE) {
+    if (key instanceof Byte) {
       // byte type , use int range to deal with byte, lucene has no byte type
-      byte value = (byte) data;
+      byte value = (Byte) key;
       IntRangeField field =
           new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE });
       field.setIntValue(value);
@@ -230,9 +241,9 @@ public class LuceneDataMapWriter extends DataMapWriter {
       if (store == Field.Store.YES) {
         doc.add(new StoredField(fieldName, (int) value));
       }
-    } else if (type == DataTypes.SHORT) {
+    } else if (key instanceof Short) {
       // short type , use int range to deal with short type, lucene has no short type
-      short value = (short) data;
+      short value = (Short) key;
       IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE },
           new int[] { Short.MAX_VALUE });
       field.setShortValue(value);
@@ -242,62 +253,179 @@ public class LuceneDataMapWriter extends DataMapWriter {
       if (store == Field.Store.YES) {
         doc.add(new StoredField(fieldName, (int) value));
       }
-    } else if (type == DataTypes.INT) {
+    } else if (key instanceof Integer) {
       // int type , use int point to deal with int type
-      int value = (int) data;
-      doc.add(new IntPoint(fieldName, value));
+      int value = (Integer) key;
+      doc.add(new IntPoint(fieldName, new int[] { value }));
 
       // if need store it , add StoredField
       if (store == Field.Store.YES) {
         doc.add(new StoredField(fieldName, value));
       }
-    } else if (type == DataTypes.LONG) {
+    } else if (key instanceof Long) {
       // long type , use long point to deal with long type
-      long value = (long) data;
-      doc.add(new LongPoint(fieldName, value));
+      long value = (Long) key;
+      doc.add(new LongPoint(fieldName, new long[] { value }));
 
       // if need store it , add StoredField
       if (store == Field.Store.YES) {
         doc.add(new StoredField(fieldName, value));
       }
-    } else if (type == DataTypes.FLOAT) {
-      float value = (float) data;
-      doc.add(new FloatPoint(fieldName, value));
+    } else if (key instanceof Float) {
+      float value = (Float) key;
+      doc.add(new FloatPoint(fieldName, new float[] { value }));
       if (store == Field.Store.YES) {
         doc.add(new FloatPoint(fieldName, value));
       }
-    } else if (type == DataTypes.DOUBLE) {
-      double value = (double) data;
-      doc.add(new DoublePoint(fieldName, value));
+    } else if (key instanceof Double) {
+      double value = (Double) key;
+      doc.add(new DoublePoint(fieldName, new double[] { value }));
       if (store == Field.Store.YES) {
         doc.add(new DoublePoint(fieldName, value));
       }
+    } else if (key instanceof String) {
+      String strValue = (String) key;
+      doc.add(new TextField(fieldName, strValue, store));
+    } else if (key instanceof Boolean) {
+      boolean value = (Boolean) key;
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
+      field.setIntValue(value ? 1 : 0);
+      doc.add(field);
+      if (store == Field.Store.YES) {
+        doc.add(new StoredField(fieldName, value ? 1 : 0));
+      }
+    }
+  }
+
+  private Object getValue(ColumnPage page, int rowId) {
+
+    //get field type
+    DataType type = page.getColumnSpec().getSchemaDataType();
+    Object value = null;
+    if (type == DataTypes.BYTE) {
+      // byte type , use int range to deal with byte, lucene has no byte type
+      value = page.getByte(rowId);
+    } else if (type == DataTypes.SHORT) {
+      // short type , use int range to deal with short type, lucene has no short type
+      value = page.getShort(rowId);
+    } else if (type == DataTypes.INT) {
+      // int type , use int point to deal with int type
+      value = page.getInt(rowId);
+    } else if (type == DataTypes.LONG) {
+      // long type , use long point to deal with long type
+      value = page.getLong(rowId);
+    } else if (type == DataTypes.FLOAT) {
+      value = page.getFloat(rowId);
+    } else if (type == DataTypes.DOUBLE) {
+      value = page.getDouble(rowId);
     } else if (type == DataTypes.STRING) {
-      byte[] value = (byte[]) data;
-      String strValue = null;
+      byte[] bytes = page.getBytes(rowId);
       try {
-        strValue = new String(value, 2, value.length - 2, "UTF-8");
+        value = new String(bytes, 2, bytes.length - 2, "UTF-8");
       } catch (UnsupportedEncodingException e) {
         throw new RuntimeException(e);
       }
-      doc.add(new TextField(fieldName, strValue, store));
     } else if (type == DataTypes.DATE) {
       throw new RuntimeException("unsupported data type " + type);
     } else if (type == DataTypes.TIMESTAMP) {
       throw new RuntimeException("unsupported data type " + type);
     } else if (type == DataTypes.BOOLEAN) {
-      boolean value = (boolean) data;
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 });
-      field.setIntValue(value ? 1 : 0);
-      doc.add(field);
-      if (store == Field.Store.YES) {
-        doc.add(new StoredField(fieldName, value ? 1 : 0));
-      }
+      value = page.getBoolean(rowId);
     } else {
       LOGGER.error("unsupport data type " + type);
       throw new RuntimeException("unsupported data type " + type);
     }
-    return true;
+    return value;
+  }
+
+  public static void addToCache(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
+      Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer intBuffer,
+      boolean storeBlockletWise) {
+    Map<Integer, RoaringBitmap> setMap = cache.get(key);
+    if (setMap == null) {
+      setMap = new HashMap<>();
+      cache.put(key, setMap);
+    }
+    int combinKey;
+    if (!storeBlockletWise) {
+      intBuffer.clear();
+      intBuffer.putShort((short) blockletId);
+      intBuffer.putShort((short) pageId);
+      intBuffer.rewind();
+      combinKey = intBuffer.getInt();
+    } else {
+      combinKey = pageId;
+    }
+    RoaringBitmap bitSet = setMap.get(combinKey);
+    if (bitSet == null) {
+      bitSet = new RoaringBitmap();
+      setMap.put(combinKey, bitSet);
+    }
+    bitSet.add(rowId);
+  }
+
+  public static void addData(LuceneColumnKeys key, int rowId, int pageId, int blockletId,
+      ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn> indexCols,
+      boolean storeBlockletWise) throws IOException {
+
+    Document document = new Document();
+    for (int i = 0; i < key.getColValues().length; i++) {
+      addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
+    }
+    intBuffer.clear();
+    if (storeBlockletWise) {
+      // No need to store blocklet id to it.
+      intBuffer.putShort((short) pageId);
+      intBuffer.putShort((short) rowId);
+      intBuffer.rewind();
+      document.add(new StoredField(ROWID_NAME, intBuffer.getInt()));
+    } else {
+      intBuffer.putShort((short) blockletId);
+      intBuffer.putShort((short) pageId);
+      intBuffer.rewind();
+      document.add(new StoredField(PAGEID_NAME, intBuffer.getInt()));
+      document.add(new StoredField(ROWID_NAME, (short) rowId));
+    }
+    indexWriter.addDocument(document);
+  }
+
+  private void flushCacheIfPossible() throws IOException {
+    if (cache.size() > cacheSize) {
+      flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
+    }
+  }
+
+  public static void flushCache(Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache,
+      List<CarbonColumn> indexCols, IndexWriter indexWriter, boolean storeBlockletWise)
+      throws IOException {
+    for (Map.Entry<LuceneColumnKeys, Map<Integer, RoaringBitmap>> entry : cache.entrySet()) {
+      Document document = new Document();
+      LuceneColumnKeys key = entry.getKey();
+      for (int i = 0; i < key.getColValues().length; i++) {
+        addField(document, key.getColValues()[i], indexCols.get(i).getColName(), Field.Store.NO);
+      }
+      Map<Integer, RoaringBitmap> value = entry.getValue();
+      int count = 0;
+      for (Map.Entry<Integer, RoaringBitmap> pageData : value.entrySet()) {
+        RoaringBitmap bitMap = pageData.getValue();
+        int cardinality = bitMap.getCardinality();
+        // Each row is short and pageid is stored in int
+        ByteBuffer byteBuffer = ByteBuffer.allocate(cardinality * 2 + 4);
+        if (!storeBlockletWise) {
+          byteBuffer.putInt(pageData.getKey());
+        } else {
+          byteBuffer.putShort(pageData.getKey().shortValue());
+        }
+        IntIterator intIterator = bitMap.getIntIterator();
+        while (intIterator.hasNext()) {
+          byteBuffer.putShort((short) intIterator.next());
+        }
+        document.add(new StoredField(PAGEID_NAME + count, byteBuffer.array()));
+        count++;
+      }
+      indexWriter.addDocument(document);
+    }
+    cache.clear();
   }
 
   /**
@@ -305,10 +433,39 @@ public class LuceneDataMapWriter extends DataMapWriter {
    * class.
    */
   public void finish() throws IOException {
+    flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
     // finished a file , close this index writer
     if (indexWriter != null) {
       indexWriter.close();
     }
   }
 
+  /**
+   * Keeps column values of a single row.
+   */
+  public static class LuceneColumnKeys {
+
+    private Object[] colValues;
+
+    public LuceneColumnKeys(int size) {
+      colValues = new Object[size];
+    }
+
+    public Object[] getColValues() {
+      return colValues;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      LuceneColumnKeys that = (LuceneColumnKeys) o;
+      return Arrays.equals(colValues, that.colValues);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(colValues);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f184de88/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index f8d1b12..3645bb6 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -18,12 +18,11 @@
 package org.apache.carbondata.datamap.lucene;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
@@ -32,14 +31,15 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.MatchExpression;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
@@ -55,17 +55,12 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.store.hdfs.HdfsDirectory;
 
 @InterfaceAudience.Internal
 public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
-  private static final int BLOCKLETID_ID = 0;
-
-  private static final int PAGEID_ID = 1;
-
-  private static final int ROWID_ID = 2;
-
   /**
    * log information
    */
@@ -73,14 +68,9 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
       LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName());
 
   /**
-   * index Reader object to create searcher object
-   */
-  private IndexReader indexReader = null;
-
-  /**
    * searcher object for this datamap
    */
-  private IndexSearcher indexSearcher = null;
+  private Map<String, IndexSearcher> indexSearcherMap = null;
 
   /**
    * analyzer for lucene index
@@ -89,14 +79,21 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
   private String filePath;
 
-  LuceneFineGrainDataMap(Analyzer analyzer) {
+  private int writeCacheSize;
+
+  private boolean storeBlockletWise;
+
+  LuceneFineGrainDataMap(Analyzer analyzer, DataMapSchema schema) {
     this.analyzer = analyzer;
+    writeCacheSize = LuceneDataMapFactoryBase.validateAndGetWriteCacheSize(schema);
+    storeBlockletWise = LuceneDataMapFactoryBase.validateAndGetStoreBlockletWise(schema);
   }
 
   /**
    * It is called to load the data map to memory or to initialize it.
    */
   public void init(DataMapModel dataMapModel) throws IOException {
+    long startTime = System.currentTimeMillis();
     // get this path from file path
     Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
 
@@ -104,32 +101,51 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
     this.filePath = indexPath.getName();
 
+    this.indexSearcherMap = new HashMap<>();
+
     // get file system , use hdfs file system , realized in solr project
-    FileSystem fs = FileFactory.getFileSystem(indexPath);
+    CarbonFile indexFilePath = FileFactory.getCarbonFile(indexPath.toString());
 
     // check this path valid
-    if (!fs.exists(indexPath)) {
+    if (!indexFilePath.exists()) {
       String errorMessage = String.format("index directory %s not exists.", indexPath);
       LOGGER.error(errorMessage);
       throw new IOException(errorMessage);
     }
 
-    if (!fs.isDirectory(indexPath)) {
+    if (!indexFilePath.isDirectory()) {
       String errorMessage = String.format("error index path %s, must be directory", indexPath);
       LOGGER.error(errorMessage);
       throw new IOException(errorMessage);
     }
 
+    if (storeBlockletWise) {
+      CarbonFile[] blockletDirs = indexFilePath.listFiles();
+      for (CarbonFile blockletDir : blockletDirs) {
+        IndexSearcher indexSearcher = createIndexSearcher(new Path(blockletDir.getAbsolutePath()));
+        indexSearcherMap.put(blockletDir.getName(), indexSearcher);
+      }
+
+    } else {
+      IndexSearcher indexSearcher = createIndexSearcher(indexPath);
+      indexSearcherMap.put("-1", indexSearcher);
+
+    }
+    LOGGER.info(
+        "Time taken to intialize lucene searcher: " + (System.currentTimeMillis() - startTime));
+  }
+
+  private IndexSearcher createIndexSearcher(Path indexPath) throws IOException {
     // open this index path , use HDFS default configuration
     Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration());
 
-    indexReader = DirectoryReader.open(indexDir);
+    IndexReader indexReader = DirectoryReader.open(indexDir);
     if (indexReader == null) {
       throw new RuntimeException("failed to create index reader object");
     }
 
     // create a index searcher object
-    indexSearcher = new IndexSearcher(indexReader);
+    return new IndexSearcher(indexReader);
   }
 
   /**
@@ -212,49 +228,40 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
       LOGGER.error(errorMessage);
       return null;
     }
-
-    // execute index search
-    // initialize to null, else ScoreDoc objects will get accumulated in memory
-    TopDocs result = null;
-    try {
-      result = indexSearcher.search(query, maxDocs);
-    } catch (IOException e) {
-      String errorMessage =
-          String.format("failed to search lucene data, detail is %s", e.getMessage());
-      LOGGER.error(errorMessage);
-      throw new IOException(errorMessage);
-    }
-
     // temporary data, delete duplicated data
     // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
-    Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>();
-
-    for (ScoreDoc scoreDoc : result.scoreDocs) {
-      // get a document
-      Document doc = indexSearcher.doc(scoreDoc.doc);
+    Map<String, Map<Integer, List<Short>>> mapBlocks = new HashMap<>();
+
+    for (Map.Entry<String, IndexSearcher> searcherEntry : indexSearcherMap.entrySet()) {
+      IndexSearcher indexSearcher = searcherEntry.getValue();
+      // execute index search
+      // initialize to null, else ScoreDoc objects will get accumulated in memory
+      TopDocs result = null;
+      try {
+        result = indexSearcher.search(query, maxDocs);
+      } catch (IOException e) {
+        String errorMessage =
+            String.format("failed to search lucene data, detail is %s", e.getMessage());
+        LOGGER.error(errorMessage);
+        throw new IOException(errorMessage);
+      }
 
-      // get all fields
-      List<IndexableField> fieldsInDoc = doc.getFields();
+      ByteBuffer intBuffer = ByteBuffer.allocate(4);
 
-      // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>>
-      String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue();
-      Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId);
-      if (mapPageIds == null) {
-        mapPageIds = new HashMap<>();
-        mapBlocks.put(blockletId, mapPageIds);
-      }
+      for (ScoreDoc scoreDoc : result.scoreDocs) {
+        // get a document
+        Document doc = indexSearcher.doc(scoreDoc.doc);
 
-      // get the page id Map<PageId, Set<RowId>>
-      Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue();
-      Set<Integer> setRowId = mapPageIds.get(pageId.intValue());
-      if (setRowId == null) {
-        setRowId = new HashSet<>();
-        mapPageIds.put(pageId.intValue(), setRowId);
+        // get all fields
+        List<IndexableField> fieldsInDoc = doc.getFields();
+        if (writeCacheSize > 0) {
+          // It fills rowids to the map, its value is combined with multiple rows.
+          fillMapForCombineRows(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
+        } else {
+          // Fill rowids to the map
+          fillMap(intBuffer, mapBlocks, fieldsInDoc, searcherEntry.getKey());
+        }
       }
-
-      // get the row id Set<RowId>
-      Number rowId = fieldsInDoc.get(ROWID_ID).numericValue();
-      setRowId.add(rowId.intValue());
     }
 
     // result blocklets
@@ -262,19 +269,19 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
 
     // transform all blocks into result type blocklets
     // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>>
-    for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet :
+    for (Map.Entry<String, Map<Integer, List<Short>>> mapBlocklet :
         mapBlocks.entrySet()) {
       String blockletId = mapBlocklet.getKey();
-      Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue();
+      Map<Integer, List<Short>> mapPageIds = mapBlocklet.getValue();
       List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>();
 
       // for pages in this blocklet Map<PageId, Set<RowId>>>
-      for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) {
+      for (Map.Entry<Integer, List<Short>> mapPageId : mapPageIds.entrySet()) {
         // construct array rowid
         int[] rowIds = new int[mapPageId.getValue().size()];
         int i = 0;
         // for rowids in this page Set<RowId>
-        for (Integer rowid : mapPageId.getValue()) {
+        for (Short rowid : mapPageId.getValue()) {
           rowIds[i++] = rowid;
         }
         // construct one page
@@ -293,6 +300,81 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap {
     return blocklets;
   }
 
+  /**
+   * It fills the rowids to the map, its value is combined with multiple rowids as we store group
+   * rows and combine as per there uniqueness.
+   */
+  private void fillMapForCombineRows(ByteBuffer intBuffer,
+      Map<String, Map<Integer, List<Short>>> mapBlocks, List<IndexableField> fieldsInDoc,
+      String blockletId) {
+    for (int i = 0; i < fieldsInDoc.size(); i++) {
+      BytesRef bytesRef = fieldsInDoc.get(i).binaryValue();
+      ByteBuffer buffer = ByteBuffer.wrap(bytesRef.bytes);
+
+      int pageId;
+      if (storeBlockletWise) {
+        // If we store as per blockletwise then just read pageid only we don't store blockletid
+        pageId = buffer.getShort();
+      } else {
+        int combineKey = buffer.getInt();
+        intBuffer.clear();
+        intBuffer.putInt(combineKey);
+        intBuffer.rewind();
+        blockletId = String.valueOf(intBuffer.getShort());
+        pageId = intBuffer.getShort();
+      }
+
+      Map<Integer, List<Short>> mapPageIds = mapBlocks.get(blockletId);
+      if (mapPageIds == null) {
+        mapPageIds = new HashMap<>();
+        mapBlocks.put(blockletId, mapPageIds);
+      }
+      List<Short> setRowId = mapPageIds.get(pageId);
+      if (setRowId == null) {
+        setRowId = new ArrayList<>();
+        mapPageIds.put(pageId, setRowId);
+      }
+
+      while (buffer.hasRemaining()) {
+        setRowId.add(buffer.getShort());
+      }
+    }
+  }
+
+  /**
+   * Fill the map with rowids from documents
+   */
+  private void fillMap(ByteBuffer intBuffer, Map<String, Map<Integer, List<Short>>> mapBlocks,
+      List<IndexableField> fieldsInDoc, String blockletId) {
+    int combineKey = fieldsInDoc.get(0).numericValue().intValue();
+    intBuffer.clear();
+    intBuffer.putInt(combineKey);
+    intBuffer.rewind();
+    short rowId;
+    int pageId;
+    if (storeBlockletWise) {
+      // If we store as per blockletwise then just read pageid and rowid
+      // only we don't store blockletid
+      pageId = intBuffer.getShort();
+      rowId = intBuffer.getShort();
+    } else {
+      blockletId = String.valueOf(intBuffer.getShort());
+      pageId = intBuffer.getShort();
+      rowId = fieldsInDoc.get(1).numericValue().shortValue();
+    }
+    Map<Integer, List<Short>> mapPageIds = mapBlocks.get(blockletId);
+    if (mapPageIds == null) {
+      mapPageIds = new HashMap<>();
+      mapBlocks.put(blockletId, mapPageIds);
+    }
+    List<Short> setRowId = mapPageIds.get(pageId);
+    if (setRowId == null) {
+      setRowId = new ArrayList<>();
+      mapPageIds.put(pageId, setRowId);
+    }
+    setRowId.add(rowId);
+  }
+
   @Override
   public boolean isScanRequired(FilterResolverIntf filterExp) {
     return true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f184de88/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 2d9618c..8c7539f 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -50,7 +50,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
    */
   @Override public List<FineGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<FineGrainDataMap> lstDataMap = new ArrayList<>();
-    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
+    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
     try {
       dataMap.init(new DataMapModel(
           DataMapWriter.getDefaultDataMapPath(
@@ -70,7 +70,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
   public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
     List<FineGrainDataMap> lstDataMap = new ArrayList<>();
-    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
+    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
     String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath();
     try {
       dataMap.init(new DataMapModel(indexPath));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f184de88/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 89623cf..b90d190 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -731,6 +731,50 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE table1")
   }
 
+  test("test lucene with flush_cache as true") {
+    sql("DROP TABLE IF EXISTS datamap_test_table")
+    sql(
+      """
+        | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm_flush ON TABLE datamap_test_table
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='name , city', 'flush_cache'='true')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n99*')"),
+      sql("select * from datamap_test_table where name like 'n99%'"))
+    checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n*9')"),
+      sql(s"select * from datamap_test_table where name like 'n%9'"))
+    sql("drop datamap if exists dm_flush on table datamap_test_table")
+  }
+
+  test("test lucene with split_blocklet as false ") {
+    sql("DROP TABLE IF EXISTS datamap_test_table")
+    sql(
+      """
+        | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm_split_false ON TABLE datamap_test_table
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='name , city', 'split_blocklet'='false')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')")
+    checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n99*')"),
+      sql("select * from datamap_test_table where name like 'n99%'"))
+    checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n*9')"),
+      sql(s"select * from datamap_test_table where name like 'n%9'"))
+    sql("drop datamap if exists dm_split_false on table datamap_test_table")
+  }
+
   override protected def afterAll(): Unit = {
     LuceneFineGrainDataMapSuite.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")