You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/09/12 08:13:13 UTC

carbondata git commit: [CARBONDATA-2845][BloomDataMap] Merge bloom index files of multi-shards for each index column

Repository: carbondata
Updated Branches:
  refs/heads/master 22958d941 -> 7b31b9168


[CARBONDATA-2845][BloomDataMap] Merge bloom index files of multi-shards for each index column

Currently a bloom index file will be generated per task per load, the query performance will be bad if we have many segments. Main datamap already faced this problem before.
Here we want to merge the bloom index file in segment scope just like the main datamap do.

In this PR, we add an event listener to trigger action of merging bloom index files when datamap is built/rebuild.
File merging process uses an added RDD to work and write merged files by Thrift.
It is compatible to query on index files before/after merged. File mergeShard.inprogress is created as flag when the merge files is not ready so that query won't be disturbed when merging

This closes #2624


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7b31b916
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7b31b916
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7b31b916

Branch: refs/heads/master
Commit: 7b31b91683acbc22ee1bb9d05c665e0149821f09
Parents: 22958d9
Author: Manhua <ke...@qq.com>
Authored: Thu Aug 9 19:17:02 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Sep 12 16:11:41 2018 +0800

----------------------------------------------------------------------
 .../bloom/AbstractBloomDataMapWriter.java       |   2 +-
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |  26 ++-
 .../bloom/BloomCoarseGrainDataMapFactory.java   | 138 +++++------
 .../datamap/bloom/BloomDataMapCache.java        |  41 +---
 .../bloom/BloomDataMapDistributable.java        |  16 +-
 .../datamap/bloom/BloomIndexFileStore.java      | 230 +++++++++++++++++++
 .../hadoop/util/bloom/CarbonBloomFilter.java    |  12 +
 .../carbondata/events/DataMapEvents.scala       |   2 +-
 .../datamap/CarbonMergeBloomIndexFilesRDD.scala |  86 +++++++
 .../datamap/IndexDataMapRebuildRDD.scala        |   2 +-
 .../spark/rdd/CarbonTableCompactor.scala        |   6 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   3 +-
 .../events/MergeBloomIndexEventListener.scala   |  76 ++++++
 .../sql/events/MergeIndexEventListener.scala    |  19 +-
 .../management/CarbonLoadDataCommand.scala      |   8 +-
 .../BloomCoarseGrainDataMapFunctionSuite.scala  |   4 +-
 16 files changed, 521 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
index c5508fe..3823460 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java
@@ -164,7 +164,7 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter {
       }
     }
     for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
-      String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath,
+      String dmFile = BloomIndexFileStore.getBloomIndexFile(dataMapPath,
           indexColumns.get(indexColId).getColName());
       DataOutputStream dataOutStream = null;
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 71b1c55..27911ca 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.datamap.bloom;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.text.DateFormat;
@@ -81,11 +80,12 @@ import org.apache.hadoop.util.bloom.Key;
 public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
-  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
   private Map<String, CarbonColumn> name2Col;
   private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
   private String shardName;
   private Path indexPath;
+  private Set<String> filteredShard;
+  private boolean needShardPrune;
   /**
    * This is used to convert literal filter value to internal carbon value
    */
@@ -102,6 +102,13 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
     }
   }
 
+  public void setFilteredShard(Set<String> filteredShard) {
+    this.filteredShard = filteredShard;
+    // do shard prune when pruning only if bloom index files are merged
+    this.needShardPrune = filteredShard != null &&
+            shardName.equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME);
+  }
+
   /**
    * init field converters for index columns
    */
@@ -182,11 +189,16 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
       BloomCacheKeyValue.CacheValue cacheValue = cache.get(cacheKey);
       List<CarbonBloomFilter> bloomIndexList = cacheValue.getBloomFilters();
       for (CarbonBloomFilter bloomFilter : bloomIndexList) {
+        if (needShardPrune && !filteredShard.contains(bloomFilter.getShardName())) {
+          // skip shard which has been pruned in Main datamap
+          continue;
+        }
         boolean scanRequired = bloomFilter.membershipTest(new Key(bloomQueryModel.filterValue));
         if (scanRequired) {
           LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
               String.valueOf(bloomFilter.getBlockletNo())));
-          Blocklet blocklet = new Blocklet(shardName, String.valueOf(bloomFilter.getBlockletNo()));
+          Blocklet blocklet = new Blocklet(bloomFilter.getShardName(),
+                  String.valueOf(bloomFilter.getBlockletNo()));
           hitBlocklets.add(blocklet);
         } else {
           LOGGER.debug(String.format("BloomCoarseGrainDataMap: Skip scan -> blocklet#%s",
@@ -349,14 +361,6 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
   public void clear() {
   }
 
-  /**
-   * get bloom index file
-   * @param shardPath path for the shard
-   * @param colName index column name
-   */
-  public static String getBloomIndexFile(String shardPath, String colName) {
-    return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX);
-  }
   static class BloomQueryModel {
     private String columnName;
     private byte[] filterValue;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 1e5b79c..8c74c94 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.datamap.bloom;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,16 +32,13 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -220,26 +218,55 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
         this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
   }
 
+  /**
+   * returns all shard directories of bloom index files for query
+   * if bloom index files are merged we should get only one shard path
+   */
+  private Set<String> getAllShardPaths(String tablePath, String segmentId) {
+    String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
+        tablePath, segmentId, dataMapName);
+    CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
+    Set<String> shardPaths = new HashSet<>();
+    boolean mergeShardInprogress = false;
+    CarbonFile mergeShardFile = null;
+    for (CarbonFile carbonFile : carbonFiles) {
+      if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) {
+        mergeShardFile = carbonFile;
+      } else if (carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) {
+        mergeShardInprogress = true;
+      } else if (carbonFile.isDirectory()) {
+        shardPaths.add(carbonFile.getAbsolutePath());
+      }
+    }
+    if (mergeShardFile != null && !mergeShardInprogress) {
+      // should only get one shard path if mergeShard is generated successfully
+      shardPaths.clear();
+      shardPaths.add(mergeShardFile.getAbsolutePath());
+    }
+    return shardPaths;
+  }
+
   @Override
   public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
-    List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
     try {
       Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
       if (shardPaths == null) {
-        String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
-            getCarbonTable().getTablePath(), segment.getSegmentNo(), dataMapName);
-        CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles();
-        shardPaths = new HashSet<>();
-        for (CarbonFile carbonFile : carbonFiles) {
-          shardPaths.add(carbonFile.getAbsolutePath());
-        }
+        shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
         segmentMap.put(segment.getSegmentNo(), shardPaths);
       }
+      Set<String> filteredShards = segment.getFilteredIndexShardNames();
       for (String shard : shardPaths) {
-        BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
-        bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration()));
-        bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
-        dataMaps.add(bloomDM);
+        if (shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
+            filteredShards.contains(new File(shard).getName())) {
+          // Filter out the tasks which are filtered through Main datamap.
+          // for merge shard, shard pruning delay to be done before pruning blocklet
+          BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
+          bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration()));
+          bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
+          bloomDM.setFilteredShard(filteredShards);
+          dataMaps.add(bloomDM);
+        }
       }
     } catch (Exception e) {
       throw new IOException("Error occurs while init Bloom DataMap", e);
@@ -250,75 +277,36 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
   @Override
   public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
-    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
-    BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
     String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
-    bloomCoarseGrainDataMap
-        .init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
-    bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
-        dataMapMeta.getIndexedColumns());
-    coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
-    return coarseGrainDataMaps;
+    Set<String> filteredShards = ((BloomDataMapDistributable) distributable).getFilteredShards();
+    BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
+    bloomDM.init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
+    bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
+    bloomDM.setFilteredShard(filteredShards);
+    dataMaps.add(bloomDM);
+    return dataMaps;
   }
 
-  /**
-   * returns all the directories of lucene index files for query
-   * Note: copied from luceneDataMapFactory, will extract to a common interface
-   */
-  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) {
-    List<CarbonFile> indexDirs = new ArrayList<>();
-    List<TableDataMap> dataMaps;
-    try {
-      // there can be multiple bloom datamaps present on a table, so get all datamaps and form
-      // the path till the index file directories in all datamaps folders present in each segment
-      dataMaps = DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
-    } catch (IOException ex) {
-      LOGGER.error(ex, String.format("failed to get datamaps for tablePath %s, segmentId %s",
-          tablePath, segmentId));
-      throw new RuntimeException(ex);
-    }
-    if (dataMaps.size() > 0) {
-      for (TableDataMap dataMap : dataMaps) {
-        if (dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
-          List<CarbonFile> indexFiles;
-          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, segmentId,
-              dataMap.getDataMapSchema().getDataMapName());
-          FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
-          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
-          indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
-            @Override
-            public boolean accept(CarbonFile file) {
-              return file.isDirectory();
-            }
-          }));
-          indexDirs.addAll(indexFiles);
-        }
-      }
-    }
-    return indexDirs.toArray(new CarbonFile[0]);
-  }
 
   @Override
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> dataMapDistributableList = new ArrayList<>();
-    CarbonFile[] indexDirs =
-        getAllIndexDirs(getCarbonTable().getTablePath(), segment.getSegmentNo());
-    if (segment.getFilteredIndexShardNames().size() == 0) {
-      for (CarbonFile indexDir : indexDirs) {
-        DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
-            indexDir.getAbsolutePath());
-        dataMapDistributableList.add(bloomDataMapDistributable);
-      }
-      return dataMapDistributableList;
+    Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
+    if (shardPaths == null) {
+      shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), segment.getSegmentNo());
+      segmentMap.put(segment.getSegmentNo(), shardPaths);
     }
-    for (CarbonFile indexDir : indexDirs) {
-      // Filter out the tasks which are filtered through CG datamap.
-      if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
-        continue;
+    Set<String> filteredShards = segment.getFilteredIndexShardNames();
+    for (String shardPath : shardPaths) {
+      // Filter out the tasks which are filtered through Main datamap.
+      // for merge shard, shard pruning delay to be done before pruning blocklet
+      if (shardPath.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
+          filteredShards.contains(new File(shardPath).getName())) {
+        DataMapDistributable bloomDataMapDistributable =
+            new BloomDataMapDistributable(shardPath, filteredShards);
+        dataMapDistributableList.add(bloomDataMapDistributable);
       }
-      DataMapDistributable bloomDataMapDistributable = new BloomDataMapDistributable(
-          indexDir.getAbsolutePath());
-      dataMapDistributableList.add(bloomDataMapDistributable);
     }
     return dataMapDistributableList;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
index c1e6251..4063c2e 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.datamap.bloom;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,9 +25,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.util.bloom.CarbonBloomFilter;
 
@@ -58,7 +55,9 @@ public class BloomDataMapCache
       throws IOException {
     BloomCacheKeyValue.CacheValue cacheValue = getIfPresent(key);
     if (cacheValue == null) {
-      cacheValue = loadBloomDataMapModel(key);
+      List<CarbonBloomFilter> bloomFilters =
+              BloomIndexFileStore.loadBloomFilterFromFile(key.getShardPath(), key.getIndexColumn());
+      cacheValue = new BloomCacheKeyValue.CacheValue(bloomFilters);
       lruCache.put(key.toString(), cacheValue, cacheValue.getMemorySize());
     }
     return cacheValue;
@@ -91,40 +90,6 @@ public class BloomDataMapCache
     // No impl required.
   }
 
-  /**
-   * load datamap from bloomindex file
-   */
-  private BloomCacheKeyValue.CacheValue loadBloomDataMapModel(
-      BloomCacheKeyValue.CacheKey cacheKey) {
-    DataInputStream dataInStream = null;
-    List<CarbonBloomFilter> bloomFilters = new ArrayList<>();
-    try {
-      String indexFile = getIndexFileFromCacheKey(cacheKey);
-      dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
-      while (dataInStream.available() > 0) {
-        CarbonBloomFilter bloomFilter = new CarbonBloomFilter();
-        bloomFilter.readFields(dataInStream);
-        bloomFilters.add(bloomFilter);
-      }
-      LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile));
-
-      return new BloomCacheKeyValue.CacheValue(bloomFilters);
-    } catch (IOException e) {
-      LOGGER.error(e, "Error occurs while reading bloom index");
-      throw new RuntimeException("Error occurs while reading bloom index", e);
-    } finally {
-      CarbonUtil.closeStreams(dataInStream);
-    }
-  }
-
-  /**
-   * get bloom index file name from cachekey
-   */
-  private String getIndexFileFromCacheKey(BloomCacheKeyValue.CacheKey cacheKey) {
-    return BloomCoarseGrainDataMap
-        .getBloomIndexFile(cacheKey.getShardPath(), cacheKey.getIndexColumn());
-  }
-
   @Override
   public void clearAccessCount(List<BloomCacheKeyValue.CacheKey> keys) {
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java
index 86d6932..b0a7ace 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapDistributable.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.datamap.bloom;
 
+import java.util.Set;
+
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 
@@ -27,11 +29,23 @@ class BloomDataMapDistributable extends DataMapDistributable {
    */
   private String indexPath;
 
-  BloomDataMapDistributable(String indexPath) {
+  /**
+   * List of index shards which are already got filtered through CG index operation.
+   * This is used for merge shard which cannot prune shard in `toDistributable` function.
+   * Other case will be set to Null
+   */
+  private Set<String> filteredShards;
+
+  BloomDataMapDistributable(String indexPath, Set<String> filteredShards) {
     this.indexPath = indexPath;
+    this.filteredShards = filteredShards;
   }
 
   public String getIndexPath() {
     return indexPath;
   }
+
+  public Set<String> getFilteredShards() {
+    return filteredShards;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
new file mode 100644
index 0000000..2abdc3f
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
+
+/**
+ * This class works for merging and loading bloom index
+ */
+@InterfaceAudience.Internal
+public class BloomIndexFileStore {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(BloomIndexFileStore.class.getName());
+
+  // suffix of original generated file
+  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
+  // suffix of merged bloom index file
+  public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge";
+  // directory to store merged bloom index files
+  public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard";
+  /**
+   * flag file for merging
+   * if flag file exists, query won't use mergeShard
+   * if flag file not exists and mergeShard generated, query will use mergeShard
+   */
+  public static final String MERGE_INPROGRESS_FILE = "mergeShard.inprogress";
+
+
+  public static void mergeBloomIndexFile(String dmSegmentPathString, List<String> indexCols) {
+    // get all shard paths of old store
+    CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
+            FileFactory.getFileType(dmSegmentPathString));
+    CarbonFile[] shardPaths = segmentPath.listFiles(new CarbonFileFilter() {
+      @Override
+      public boolean accept(CarbonFile file) {
+        return file.isDirectory() && !file.getName().equals(MERGE_BLOOM_INDEX_SHARD_NAME);
+      }
+    });
+
+    String mergeShardPath = dmSegmentPathString + File.separator + MERGE_BLOOM_INDEX_SHARD_NAME;
+    String mergeInprogressFile = dmSegmentPathString + File.separator + MERGE_INPROGRESS_FILE;
+    try {
+      // delete mergeShard folder if exists
+      if (FileFactory.isFileExist(mergeShardPath)) {
+        FileFactory.deleteFile(mergeShardPath, FileFactory.getFileType(mergeShardPath));
+      }
+      // create flag file before creating mergeShard folder
+      if (!FileFactory.isFileExist(mergeInprogressFile)) {
+        FileFactory.createNewFile(
+            mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile));
+      }
+      // create mergeShard output folder
+      if (!FileFactory.mkdirs(mergeShardPath, FileFactory.getFileType(mergeShardPath))) {
+        throw new RuntimeException("Failed to create directory " + mergeShardPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error(e, "Error occurs while create directory " + mergeShardPath);
+      throw new RuntimeException("Error occurs while create directory " + mergeShardPath);
+    }
+
+    // for each index column, merge the bloomindex files from all shards into one
+    for (String indexCol: indexCols) {
+      String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, indexCol);
+      DataInputStream dataInputStream = null;
+      DataOutputStream dataOutputStream = null;
+      try {
+        FileFactory.createNewFile(mergeIndexFile, FileFactory.getFileType(mergeIndexFile));
+        dataOutputStream = FileFactory.getDataOutputStream(
+            mergeIndexFile, FileFactory.getFileType(mergeIndexFile));
+        for (CarbonFile shardPath : shardPaths) {
+          String bloomIndexFile = getBloomIndexFile(shardPath.getCanonicalPath(), indexCol);
+          dataInputStream = FileFactory.getDataInputStream(
+              bloomIndexFile, FileFactory.getFileType(bloomIndexFile));
+          byte[] fileData = new byte[(int) FileFactory.getCarbonFile(bloomIndexFile).getSize()];
+          dataInputStream.readFully(fileData);
+          byte[] shardName = shardPath.getName().getBytes(Charset.forName("UTF-8"));
+          dataOutputStream.writeInt(shardName.length);
+          dataOutputStream.write(shardName);
+          dataOutputStream.writeInt(fileData.length);
+          dataOutputStream.write(fileData);
+          CarbonUtil.closeStream(dataInputStream);
+        }
+      } catch (IOException e) {
+        LOGGER.error(e, "Error occurs while merge bloom index file of column: " + indexCol);
+        // delete merge shard of bloom index for this segment when failed
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath));
+        throw new RuntimeException(
+            "Error occurs while merge bloom index file of column: " + indexCol);
+      } finally {
+        CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
+      }
+    }
+    // delete flag file and mergeShard can be used
+    try {
+      FileFactory.deleteFile(mergeInprogressFile, FileFactory.getFileType(mergeInprogressFile));
+    } catch (IOException e) {
+      LOGGER.error(e, "Error occurs while deleting file " + mergeInprogressFile);
+      throw new RuntimeException("Error occurs while deleting file " + mergeInprogressFile);
+    }
+    // remove old store
+    for (CarbonFile shardpath: shardPaths) {
+      FileFactory.deleteAllCarbonFilesOfDir(shardpath);
+    }
+  }
+
+  /**
+   * load bloom filter from bloom index file
+   */
+  public static List<CarbonBloomFilter> loadBloomFilterFromFile(
+          String shardPath, String colName) {
+    if (shardPath.endsWith(MERGE_BLOOM_INDEX_SHARD_NAME)) {
+      return loadMergeBloomIndex(shardPath, colName);
+    } else {
+      return loadBloomIndex(shardPath, colName);
+    }
+  }
+
+  /**
+   * load bloom filter of {@code colName} from {@code shardPath}
+   */
+  public static List<CarbonBloomFilter> loadBloomIndex(
+          String shardPath, String colName) {
+    DataInputStream dataInStream = null;
+    List<CarbonBloomFilter> bloomFilters = new ArrayList<>();
+    try {
+      String indexFile = getBloomIndexFile(shardPath, colName);
+      dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
+      while (dataInStream.available() > 0) {
+        CarbonBloomFilter bloomFilter = new CarbonBloomFilter();
+        bloomFilter.readFields(dataInStream);
+        bloomFilter.setShardName(new Path(shardPath).getName());
+        bloomFilters.add(bloomFilter);
+      }
+      LOGGER.info(String.format("Read %d bloom indices from %s", bloomFilters.size(), indexFile));
+
+      return bloomFilters;
+    } catch (IOException e) {
+      LOGGER.error(e, "Error occurs while reading bloom index");
+      throw new RuntimeException("Error occurs while reading bloom index", e);
+    } finally {
+      CarbonUtil.closeStreams(dataInStream);
+    }
+  }
+
+
+  /**
+   * load bloom filter of {@code colName} from {@code mergeShardPath}
+   */
+  public static List<CarbonBloomFilter> loadMergeBloomIndex(
+          String mergeShardPath, String colName) {
+    String mergeIndexFile = getMergeBloomIndexFile(mergeShardPath, colName);
+    DataInputStream mergeIndexInStream = null;
+    List<CarbonBloomFilter> bloomFilters = new ArrayList<>();
+    try {
+      mergeIndexInStream = FileFactory.getDataInputStream(
+          mergeIndexFile, FileFactory.getFileType(mergeIndexFile));
+      while (mergeIndexInStream.available() > 0) {
+        // read shard name
+        int shardNameByteLength = mergeIndexInStream.readInt();
+        byte[] shardNameBytes = new byte[shardNameByteLength];
+        mergeIndexInStream.readFully(shardNameBytes);
+        String shardName = new String(shardNameBytes, Charset.forName("UTF-8"));
+        // read bloom index file data
+        int indexFileByteLength = mergeIndexInStream.readInt();
+        byte[] indexFileBytes = new byte[indexFileByteLength];
+        mergeIndexInStream.readFully(indexFileBytes);
+        // warp byte array as input stream to get bloom filters
+        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(indexFileBytes);
+        DataInputStream indexDataInStream = new DataInputStream(byteArrayInputStream);
+        while (indexDataInStream.available() > 0) {
+          CarbonBloomFilter bloomFilter = new CarbonBloomFilter();
+          bloomFilter.readFields(indexDataInStream);
+          bloomFilter.setShardName(shardName);
+          bloomFilters.add(bloomFilter);
+        }
+      }
+      LOGGER.info(
+          String.format("Read %d bloom indices from %s", bloomFilters.size(), mergeIndexFile));
+      return bloomFilters;
+    } catch (IOException e) {
+      LOGGER.error(e, "Error occurs while reading merge bloom index");
+      throw new RuntimeException("Error occurs while reading merge bloom index", e);
+    } finally {
+      CarbonUtil.closeStreams(mergeIndexInStream);
+    }
+  }
+
+  /**
+   * get bloom index file
+   */
+  public static String getBloomIndexFile(String shardPath, String colName) {
+    return shardPath.concat(File.separator).concat(colName).concat(BLOOM_INDEX_SUFFIX);
+  }
+
+  /**
+   * get merge bloom index file
+   */
+  public static String getMergeBloomIndexFile(String mergeShardPath, String colName) {
+    return mergeShardPath.concat(File.separator).concat(colName).concat(MERGE_BLOOM_INDEX_SUFFIX);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
index c6a62cc..4b111df 100644
--- a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
+++ b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
@@ -36,6 +36,9 @@ public class CarbonBloomFilter extends BloomFilter {
 
   private int blockletNo;
 
+  // used for building blocklet when query
+  private String shardName;
+
   public CarbonBloomFilter() {
   }
 
@@ -166,4 +169,13 @@ public class CarbonBloomFilter extends BloomFilter {
   public int getBlockletNo() {
     return blockletNo;
   }
+
+  public String getShardName() {
+    return shardName;
+  }
+
+  public void setShardName(String shardName) {
+    this.shardName = shardName;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
index e601633..503729a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
@@ -60,7 +60,7 @@ case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession,
  * example: bloom datamap, Lucene datamap
  */
 case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession,
-    identifier: AbsoluteTableIdentifier)
+    identifier: AbsoluteTableIdentifier, segmentIdList: Seq[String], isFromRebuild: Boolean)
   extends Event with TableEventInfo
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
new file mode 100644
index 0000000..1aa8b82
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/CarbonMergeBloomIndexFilesRDD.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.Partition
+import org.apache.spark.rdd.CarbonMergeFilePartition
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.TaskContext
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.datamap.bloom.BloomIndexFileStore
+import org.apache.carbondata.spark.rdd.CarbonRDD
+
+
+/**
+ * RDD to merge all bloomindex files of specified segment for bloom datamap
+ */
+class CarbonMergeBloomIndexFilesRDD(
+  @transient ss: SparkSession,
+  carbonTable: CarbonTable,
+  segmentIds: Seq[String],
+  bloomDatamapNames: Seq[String],
+  bloomIndexColumns: Seq[Seq[String]])
+  extends CarbonRDD[String](ss, Nil) {
+
+  override def internalGetPartitions: Array[Partition] = {
+    segmentIds.zipWithIndex.map {s =>
+      CarbonMergeFilePartition(id, s._2, s._1)
+    }.toArray
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
+    val tablePath = carbonTable.getTablePath
+    val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
+    logInfo("Merging bloom index files of " +
+      s"segment ${split.segmentId} for ${carbonTable.getTableName}")
+
+    bloomDatamapNames.zipWithIndex.map( dm => {
+      val dmSegmentPath = CarbonTablePath.getDataMapStorePath(
+        tablePath, split.segmentId, dm._1)
+      BloomIndexFileStore.mergeBloomIndexFile(dmSegmentPath, bloomIndexColumns(dm._2).asJava)
+    })
+
+    val iter = new Iterator[String] {
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): String = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        ""
+      }
+    }
+    iter
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 7d6c4e8..5e07f1b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -133,7 +133,7 @@ object IndexDataMapRebuildRDD {
     }
 
     val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
-      tableIdentifier)
+      tableIdentifier, validSegments.asScala.map(_.getSegmentNo), true)
     OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index d9884e1..c505bbc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -285,9 +285,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       OperationListenerBus.getInstance()
         .fireEvent(compactionLoadStatusPostEvent, operationContext)
       if (null != tableDataMaps) {
-        val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent =
-          new BuildDataMapPostExecutionEvent(sqlContext.sparkSession,
-            carbonTable.getAbsoluteTableIdentifier)
+        val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(
+          sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
+          Seq(carbonLoadModel.getSegmentId), true)
         OperationListenerBus.getInstance()
           .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 8253c4d..90ba58c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.events.MergeIndexEventListener
+import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -183,6 +183,7 @@ object CarbonEnv {
       .addListener(classOf[LoadTablePostExecutionEvent], new MergeIndexEventListener)
       .addListener(classOf[AlterTableCompactionPostEvent], new MergeIndexEventListener)
       .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
+      .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
new file mode 100644
index 0000000..1a76ed7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.events
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
+import org.apache.carbondata.events._
+
+class MergeBloomIndexEventListener extends OperationEventListener with Logging {
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    event match {
+      case datamapPostEvent: BuildDataMapPostExecutionEvent =>
+        LOGGER.audit("Load post status event-listener called for merge bloom index")
+        val carbonTableIdentifier = datamapPostEvent.identifier
+        val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
+        val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
+        val sparkSession = SparkSession.getActiveSession.get
+
+        // filter out bloom datamap
+        var bloomDatamaps = tableDataMaps.asScala.filter(
+          _.getDataMapSchema.getProviderName.equalsIgnoreCase(
+            DataMapClassProvider.BLOOMFILTER.getShortName))
+
+        // for load process, filter lazy datamap
+        if (!datamapPostEvent.isFromRebuild) {
+          bloomDatamaps = bloomDatamaps.filter(!_.getDataMapSchema.isLazy)
+        }
+
+        val segmentIds = datamapPostEvent.segmentIdList
+        if (bloomDatamaps.size > 0 && segmentIds.size > 0) {
+          // we extract bloom datamap name and index columns here
+          // because TableDataMap is not serializable
+          val bloomDMnames = ListBuffer.empty[String]
+          val bloomIndexColumns = ListBuffer.empty[Seq[String]]
+          bloomDatamaps.foreach( dm => {
+            bloomDMnames += dm.getDataMapSchema.getDataMapName
+            bloomIndexColumns += dm.getDataMapSchema.getIndexColumns.map(_.trim.toLowerCase)
+          })
+          new CarbonMergeBloomIndexFilesRDD(sparkSession, carbonTable,
+            segmentIds, bloomDMnames, bloomIndexColumns).collect()
+        }
+    }
+  }
+
+
+  private def clearBloomCache(carbonTable: CarbonTable, segmentIds: Seq[String]): Unit = {
+    DataMapStoreManager.getInstance.clearDataMaps(carbonTable.getTableUniqueName)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 24ef0db..a0c19e9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -46,9 +46,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
     event match {
       case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
         LOGGER.audit("Load post status event-listener called for merge index")
-        val loadTablePreStatusUpdateEvent = event.asInstanceOf[LoadTablePostExecutionEvent]
-        val carbonTableIdentifier = loadTablePreStatusUpdateEvent.getCarbonTableIdentifier
-        val loadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel
+        val loadModel = preStatusUpdateEvent.getCarbonLoadModel
         val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
         val compactedSegments = loadModel.getMergedSegmentIds
         val sparkSession = SparkSession.getActiveSession.get
@@ -74,7 +72,6 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
         }
       case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
         LOGGER.audit("Merge index for compaction called")
-        val alterTableCompactionPostEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
         val carbonTable = alterTableCompactionPostEvent.carbonTable
         val mergedLoads = alterTableCompactionPostEvent.compactedLoads
         val sparkSession = alterTableCompactionPostEvent.sparkSession
@@ -82,11 +79,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
           mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads)
         }
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
-        val exceptionEvent = event.asInstanceOf[AlterTableMergeIndexEvent]
-        val alterTableModel = exceptionEvent.alterTableModel
-        val carbonMainTable = exceptionEvent.carbonTable
-        val compactionType = alterTableModel.compactionType
-        val sparkSession = exceptionEvent.sparkSession
+        val alterTableModel = alterTableMergeIndexEvent.alterTableModel
+        val carbonMainTable = alterTableMergeIndexEvent.carbonTable
+        val sparkSession = alterTableMergeIndexEvent.sparkSession
         if (!carbonMainTable.isStreamingSink) {
           LOGGER.audit(s"Compaction request received for table " +
                        s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
@@ -132,8 +127,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                 readFileFooterFromCarbonDataFile = true)
               // clear Block dataMap Cache
               clearBlockDataMapCache(carbonMainTable, validSegmentIds)
-              val requestMessage = "Compaction request completed for table "
-              s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
+              val requestMessage = "Compaction request completed for table " +
+                s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
               LOGGER.audit(requestMessage)
               LOGGER.info(requestMessage)
             } else {
@@ -181,7 +176,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
     // So, it is enough to do merge index only for 0.2 as it is the only valid segment in this list
     val validMergedSegIds = validSegments
       .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) }.map(_.getSegmentNo)
-    if (null != validMergedSegIds && !validMergedSegIds.isEmpty) {
+    if (null != validMergedSegIds && validMergedSegIds.nonEmpty) {
       CommonUtil.mergeIndexFiles(sparkSession,
           validMergedSegIds,
           segmentFileNameMap,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 52801b1..63da404 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -245,7 +245,7 @@ case class CarbonLoadDataCommand(
         // Add pre event listener for index datamap
         val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
         val dataMapOperationContext = new OperationContext()
-        if (null != tableDataMaps) {
+        if (tableDataMaps.size() > 0) {
           val dataMapNames: mutable.Buffer[String] =
             tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
           val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
@@ -322,9 +322,9 @@ case class CarbonLoadDataCommand(
             table.getCarbonTableIdentifier,
             carbonLoadModel)
         OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
-        if (null != tableDataMaps) {
-          val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent =
-            BuildDataMapPostExecutionEvent(sparkSession, table.getAbsoluteTableIdentifier)
+        if (tableDataMaps.size() > 0) {
+          val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
+            table.getAbsoluteTableIdentifier, Seq(carbonLoadModel.getSegmentId), false)
           OperationListenerBus.getInstance()
             .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b31b916/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
index dd2ecb4..f6e5eab 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFunctionSuite.scala
@@ -780,7 +780,7 @@ class BloomCoarseGrainDataMapFunctionSuite  extends QueryTest with BeforeAndAfte
     import scala.collection.JavaConverters._
     (0 to 1).foreach { segId =>
       val datamapPath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segId.toString, dataMapName)
-      assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindex"), true).asScala.nonEmpty)
+      assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindexmerge"), true).asScala.nonEmpty)
     }
     // delete and clean the first segment, the corresponding datamap files should be cleaned too
     sql(s"DELETE FROM TABLE $bloomDMSampleTable WHERE SEGMENT.ID IN (0)")
@@ -788,7 +788,7 @@ class BloomCoarseGrainDataMapFunctionSuite  extends QueryTest with BeforeAndAfte
     var datamapPath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, "0", dataMapName)
     assert(!FileUtils.getFile(datamapPath).exists(), "index file of this segment has been deleted, should not exist")
     datamapPath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, "1", dataMapName)
-    assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindex"), true).asScala.nonEmpty)
+    assert(FileUtils.listFiles(FileUtils.getFile(datamapPath), Array("bloomindexmerge"), true).asScala.nonEmpty)
   }
 
   // two blocklets in one block are hit by bloom datamap while block cache level hit this block