You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:41 UTC

[16/50] [abbrv] carbondata git commit: [CARBONDATA-2496] Changed to hadoop bloom implementation and added compress option to compress bloom on disk

[CARBONDATA-2496] Changed to hadoop bloom implementation and added compress option to compress bloom on disk

This PR removes the guava bloom and adds the hadoop bloom. And also added the compress bloom option to compress bloom on disk and in memory as well.
The user can use bloom_compress property to enable/disable compression. By default, it is enabled.

Please check the performance of bloom
Loaded 100 million data with bloom datamap on a column with a cardinality of 5 million with 'BLOOM_SIZE'='5000000', 'bloom_fpp'='0.001'.

This closes #2324


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/77a11107
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/77a11107
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/77a11107

Branch: refs/heads/spark-2.3
Commit: 77a11107c57beebda74925dbb328f7bad6c72136
Parents: d9534c2
Author: ravipesala <ra...@gmail.com>
Authored: Sun May 20 21:52:57 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue May 22 14:24:58 2018 +0530

----------------------------------------------------------------------
 .../blockletindex/BlockletDataMapFactory.java   |   5 +-
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |   8 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  42 ++++++--
 .../carbondata/datamap/bloom/BloomDMModel.java  |  35 ++++--
 .../datamap/bloom/BloomDataMapBuilder.java      |  12 ++-
 .../datamap/bloom/BloomDataMapCache.java        |  12 +--
 .../datamap/bloom/BloomDataMapWriter.java       |  60 ++++++-----
 .../hadoop/util/bloom/CarbonBloomFilter.java    | 108 +++++++++++++++++++
 8 files changed, 225 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 0188281..318fc6e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
@@ -78,7 +79,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
-  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>();
+  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new ConcurrentHashMap<>();
 
   private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache;
 
@@ -279,7 +280,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public void clear() {
+  public synchronized void clear() {
     if (segmentMap.size() > 0) {
       for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
         clear(new Segment(segmentId, null, null));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 09de25e..a5a141c 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.bloom.Key;
 
 /**
  * BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed column,
@@ -83,7 +84,7 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
-      List<PartitionSpec> partitions) throws IOException {
+      List<PartitionSpec> partitions) {
     List<Blocklet> hitBlocklets = new ArrayList<Blocklet>();
     if (filterExp == null) {
       // null is different from empty here. Empty means after pruning, no blocklet need to scan.
@@ -97,8 +98,8 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
           this.indexPath.toString(), bloomQueryModel.columnName);
       List<BloomDMModel> bloomDMModels = this.bloomDataMapCache.getBloomDMModelByKey(cacheKey);
       for (BloomDMModel bloomDMModel : bloomDMModels) {
-        boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
-            convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue));
+        boolean scanRequired = bloomDMModel.getBloomFilter().membershipTest(new Key(
+            convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue)));
         if (scanRequired) {
           LOGGER.debug(String.format("BloomCoarseGrainDataMap: Need to scan -> blocklet#%s",
               String.valueOf(bloomDMModel.getBlockletNo())));
@@ -110,7 +111,6 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
         }
       }
     }
-
     return hitBlocklets;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 16b49f2..3231551 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
@@ -66,22 +67,33 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
    */
   private static final String BLOOM_SIZE = "bloom_size";
   /**
-   * default size for bloom filter: suppose one blocklet contains 20 pages
-   * and all the indexed value is distinct.
+   * default size for bloom filter, cardinality of the column.
    */
-  private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20;
+  private static final int DEFAULT_BLOOM_FILTER_SIZE =
+      CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
   /**
    * property for fpp(false-positive-probability) of bloom filter
    */
   private static final String BLOOM_FPP = "bloom_fpp";
   /**
-   * default value for fpp of bloom filter
+   * default value for fpp of bloom filter is 1%
    */
-  private static final double DEFAULT_BLOOM_FILTER_FPP = 0.00001d;
+  private static final double DEFAULT_BLOOM_FILTER_FPP = 0.01d;
+
+  /**
+   * property for compressing bloom while saving to disk.
+   */
+  private static final String COMPRESS_BLOOM = "bloom_compress";
+  /**
+   * Default value of compressing bloom while save to disk.
+   */
+  private static final boolean DEFAULT_BLOOM_COMPRESS = true;
+
   private DataMapMeta dataMapMeta;
   private String dataMapName;
   private int bloomFilterSize;
   private double bloomFilterFpp;
+  private boolean bloomCompress;
 
   public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws MalformedDataMapCommandException {
@@ -94,6 +106,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema);
     this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
     this.bloomFilterFpp = validateAndGetBloomFilterFpp(dataMapSchema);
+    this.bloomCompress = validateAndGetBloomCompress(dataMapSchema);
     List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
     // todo: support more optimize operations
     optimizedOperations.add(ExpressionType.EQUALS);
@@ -163,6 +176,21 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     return bloomFilterFpp;
   }
 
+  /**
+   * validate bloom DataMap COMPRESS_BLOOM
+   * Default value is true
+   */
+  private boolean validateAndGetBloomCompress(DataMapSchema dmSchema) {
+    String bloomCompress = dmSchema.getProperties().get(COMPRESS_BLOOM);
+    if (StringUtils.isBlank(bloomCompress)) {
+      LOGGER.warn(
+          String.format("Bloom compress is not configured for datamap %s, use default value %b",
+              dataMapName, DEFAULT_BLOOM_COMPRESS));
+      return DEFAULT_BLOOM_COMPRESS;
+    }
+    return Boolean.parseBoolean(bloomCompress);
+  }
+
   @Override
   public DataMapWriter createWriter(Segment segment, String shardName) throws IOException {
     LOGGER.info(
@@ -170,14 +198,14 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
             this.dataMapName, getCarbonTable().getTableName() , shardName));
     return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName,
         this.dataMapMeta.getIndexedColumns(), segment, shardName,
-        this.bloomFilterSize, this.bloomFilterFpp);
+        this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
   }
 
   @Override
   public DataMapBuilder createBuilder(Segment segment, String shardName) throws IOException {
     return new BloomDataMapBuilder(getCarbonTable().getTablePath(), this.dataMapName,
         this.dataMapMeta.getIndexedColumns(), segment, shardName,
-        this.bloomFilterSize, this.bloomFilterFpp);
+        this.bloomFilterSize, this.bloomFilterFpp, bloomCompress);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
index 3cf2f3b..7317c70 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
@@ -16,22 +16,27 @@
  */
 package org.apache.carbondata.datamap.bloom;
 
-import java.io.Serializable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 
-import com.google.common.hash.BloomFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
 
 /**
  * This class holds a bloom filter for one blocklet
  */
 @InterfaceAudience.Internal
-public class BloomDMModel implements Serializable {
-  private static final long serialVersionUID = 7281578747306832771L;
+public class BloomDMModel implements Writable {
   private int blockletNo;
-  private BloomFilter<byte[]> bloomFilter;
+  private CarbonBloomFilter bloomFilter;
 
-  public BloomDMModel(int blockletNo, BloomFilter<byte[]> bloomFilter) {
+  public BloomDMModel() {
+  }
+
+  public BloomDMModel(int blockletNo, CarbonBloomFilter bloomFilter) {
     this.blockletNo = blockletNo;
     this.bloomFilter = bloomFilter;
   }
@@ -40,15 +45,29 @@ public class BloomDMModel implements Serializable {
     return blockletNo;
   }
 
-  public BloomFilter<byte[]> getBloomFilter() {
+  public CarbonBloomFilter getBloomFilter() {
     return bloomFilter;
   }
 
-  @Override public String toString() {
+  @Override
+  public String toString() {
     final StringBuilder sb = new StringBuilder("BloomDMModel{");
     sb.append(", blockletNo=").append(blockletNo);
     sb.append(", bloomFilter=").append(bloomFilter);
     sb.append('}');
     return sb.toString();
   }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(blockletNo);
+    bloomFilter.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    blockletNo = in.readInt();
+    bloomFilter = new CarbonBloomFilter();
+    bloomFilter.readFields(in);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
index fa1aef7..e9929e3 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import org.apache.hadoop.util.bloom.Key;
+
 /**
  * Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
  */
@@ -36,10 +38,10 @@ import org.apache.carbondata.core.util.CarbonUtil;
 public class BloomDataMapBuilder extends BloomDataMapWriter implements DataMapBuilder {
 
   BloomDataMapBuilder(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp)
-      throws IOException {
-    super(tablePath, dataMapName, indexColumns, segment, shardName,
-        bloomFilterSize, bloomFilterFpp);
+      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp,
+      boolean bloomCompress) throws IOException {
+    super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize, bloomFilterFpp,
+        bloomCompress);
   }
 
   @Override
@@ -70,7 +72,7 @@ public class BloomDataMapBuilder extends BloomDataMapWriter implements DataMapBu
       } else {
         indexValue = CarbonUtil.getValueAsBytes(dataType, data);
       }
-      indexBloomFilters.get(i).put(indexValue);
+      indexBloomFilters.get(i).add(new Key(indexValue));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
index 2411cf4..3de77ad 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapCache.java
@@ -19,7 +19,6 @@ package org.apache.carbondata.datamap.bloom;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -133,15 +132,14 @@ public class BloomDataMapCache implements Serializable {
    */
   private List<BloomDMModel> loadBloomDataMapModel(CacheKey cacheKey) {
     DataInputStream dataInStream = null;
-    ObjectInputStream objectInStream = null;
     List<BloomDMModel> bloomDMModels = new ArrayList<BloomDMModel>();
     try {
       String indexFile = getIndexFileFromCacheKey(cacheKey);
       dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
-      objectInStream = new ObjectInputStream(dataInStream);
       try {
-        BloomDMModel model = null;
-        while ((model = (BloomDMModel) objectInStream.readObject()) != null) {
+        while (dataInStream.available() > 0) {
+          BloomDMModel model = new BloomDMModel();
+          model.readFields(dataInStream);
           bloomDMModels.add(model);
         }
       } catch (EOFException e) {
@@ -150,12 +148,12 @@ public class BloomDataMapCache implements Serializable {
       }
       this.bloomDMCache.put(cacheKey, bloomDMModels);
       return bloomDMModels;
-    } catch (ClassNotFoundException | IOException e) {
+    } catch (IOException e) {
       clear(cacheKey);
       LOGGER.error(e, "Error occurs while reading bloom index");
       throw new RuntimeException("Error occurs while reading bloom index", e);
     } finally {
-      CarbonUtil.closeStreams(objectInStream, dataInStream);
+      CarbonUtil.closeStreams(dataInStream);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index f6eb331..b3e69f4 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.datamap.bloom;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -34,8 +33,9 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonUtil;
 
-import com.google.common.hash.BloomFilter;
-import com.google.common.hash.Funnels;
+import org.apache.hadoop.util.bloom.CarbonBloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.hadoop.util.hash.Hash;
 
 /**
  * BloomDataMap is constructed in CG level (blocklet level).
@@ -50,23 +50,23 @@ public class BloomDataMapWriter extends DataMapWriter {
       BloomDataMapWriter.class.getCanonicalName());
   private int bloomFilterSize;
   private double bloomFilterFpp;
+  private boolean compressBloom;
   protected int currentBlockletId;
   private List<String> currentDMFiles;
   private List<DataOutputStream> currentDataOutStreams;
-  private List<ObjectOutputStream> currentObjectOutStreams;
-  protected List<BloomFilter<byte[]>> indexBloomFilters;
+  protected List<CarbonBloomFilter> indexBloomFilters;
 
   BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp)
+      Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp,
+      boolean compressBloom)
       throws IOException {
     super(tablePath, dataMapName, indexColumns, segment, shardName);
     this.bloomFilterSize = bloomFilterSize;
     this.bloomFilterFpp = bloomFilterFpp;
-
-    currentDMFiles = new ArrayList<String>(indexColumns.size());
-    currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size());
-    currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexColumns.size());
-    indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexColumns.size());
+    this.compressBloom = compressBloom;
+    currentDMFiles = new ArrayList<>(indexColumns.size());
+    currentDataOutStreams = new ArrayList<>(indexColumns.size());
+    indexBloomFilters = new ArrayList<>(indexColumns.size());
     initDataMapFile();
     resetBloomFilters();
   }
@@ -86,12 +86,31 @@ public class BloomDataMapWriter extends DataMapWriter {
   protected void resetBloomFilters() {
     indexBloomFilters.clear();
     List<CarbonColumn> indexColumns = getIndexColumns();
+    int[] stats = calculateBloomStats();
     for (int i = 0; i < indexColumns.size(); i++) {
-      indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
-          bloomFilterSize, bloomFilterFpp));
+      indexBloomFilters
+          .add(new CarbonBloomFilter(stats[0], stats[1], Hash.MURMUR_HASH, compressBloom));
     }
   }
 
+  /**
+   * It calculates the bits size and number of hash functions to calculate bloom.
+   */
+  private int[] calculateBloomStats() {
+    /*
+     * n: how many items you expect to have in your filter
+     * p: your acceptable false positive rate
+     * Number of bits (m) = -n*ln(p) / (ln(2)^2)
+     * Number of hashes(k) = m/n * ln(2)
+     */
+    double sizeinBits = -bloomFilterSize * Math.log(bloomFilterFpp) / (Math.pow(Math.log(2), 2));
+    double numberOfHashes = sizeinBits / bloomFilterSize * Math.log(2);
+    int[] stats = new int[2];
+    stats[0] = (int) Math.ceil(sizeinBits);
+    stats[1] = (int) Math.ceil(numberOfHashes);
+    return stats;
+  }
+
   @Override
   public void onBlockletEnd(int blockletId) {
     writeBloomDataMapFile();
@@ -117,7 +136,7 @@ public class BloomDataMapWriter extends DataMapWriter {
         } else {
           indexValue = CarbonUtil.getValueAsBytes(dataType, data);
         }
-        indexBloomFilters.get(i).put(indexValue);
+        indexBloomFilters.get(i).add(new Key(indexValue));
       }
     }
   }
@@ -140,20 +159,17 @@ public class BloomDataMapWriter extends DataMapWriter {
       String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath,
           indexColumns.get(indexColId).getColName());
       DataOutputStream dataOutStream = null;
-      ObjectOutputStream objectOutStream = null;
       try {
         FileFactory.createNewFile(dmFile, FileFactory.getFileType(dmFile));
         dataOutStream = FileFactory.getDataOutputStream(dmFile,
             FileFactory.getFileType(dmFile));
-        objectOutStream = new ObjectOutputStream(dataOutStream);
       } catch (IOException e) {
-        CarbonUtil.closeStreams(objectOutStream, dataOutStream);
+        CarbonUtil.closeStreams(dataOutStream);
         throw new IOException(e);
       }
 
       this.currentDMFiles.add(dmFile);
       this.currentDataOutStreams.add(dataOutStream);
-      this.currentObjectOutStreams.add(objectOutStream);
     }
   }
 
@@ -165,14 +181,10 @@ public class BloomDataMapWriter extends DataMapWriter {
             new BloomDMModel(this.currentBlockletId, indexBloomFilters.get(indexColId));
         // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface.
         // In lower version, we use default java serializer to write bloomfilter.
-        this.currentObjectOutStreams.get(indexColId).writeObject(model);
-        this.currentObjectOutStreams.get(indexColId).flush();
+        model.write(this.currentDataOutStreams.get(indexColId));
         this.currentDataOutStreams.get(indexColId).flush();
       }
     } catch (Exception e) {
-      for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
-        CarbonUtil.closeStreams(objectOutputStream);
-      }
       for (DataOutputStream dataOutputStream : currentDataOutStreams) {
         CarbonUtil.closeStreams(dataOutputStream);
       }
@@ -194,7 +206,7 @@ public class BloomDataMapWriter extends DataMapWriter {
     List<CarbonColumn> indexColumns = getIndexColumns();
     for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) {
       CarbonUtil.closeStreams(
-          currentDataOutStreams.get(indexColId), currentObjectOutStreams.get(indexColId));
+          currentDataOutStreams.get(indexColId));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/77a11107/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
new file mode 100644
index 0000000..7c39cad
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/hadoop/util/bloom/CarbonBloomFilter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.bloom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * It is the extendable class to hadoop bloomfilter, it is extendable to implement compressed bloom
+ * and fast serialize and deserialize of bloom.
+ */
+public class CarbonBloomFilter extends BloomFilter {
+
+  private RoaringBitmap bitmap;
+
+  private boolean compress;
+
+  public CarbonBloomFilter() {
+  }
+
+  public CarbonBloomFilter(int vectorSize, int nbHash, int hashType, boolean compress) {
+    super(vectorSize, nbHash, hashType);
+    this.compress = compress;
+  }
+
+  @Override
+  public boolean membershipTest(Key key) {
+    if (key == null) {
+      throw new NullPointerException("key cannot be null");
+    }
+
+    int[] h = hash.hash(key);
+    hash.clear();
+    if (compress) {
+      // If it is compressed check in roaring bitmap
+      for (int i = 0; i < nbHash; i++) {
+        if (!bitmap.contains(h[i])) {
+          return false;
+        }
+      }
+    } else {
+      for (int i = 0; i < nbHash; i++) {
+        if (!bits.get(h[i])) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(this.nbHash);
+    out.writeByte(this.hashType);
+    out.writeInt(this.vectorSize);
+    out.writeBoolean(compress);
+    if (!compress) {
+      byte[] bytes = bits.toByteArray();
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    } else {
+      RoaringBitmap bitmap = new RoaringBitmap();
+      int length = bits.cardinality();
+      int nextSetBit = bits.nextSetBit(0);
+      for (int i = 0; i < length; ++i) {
+        bitmap.add(nextSetBit);
+        nextSetBit = bits.nextSetBit(nextSetBit + 1);
+      }
+      bitmap.serialize(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.nbHash = in.readInt();
+    this.hashType = in.readByte();
+    this.vectorSize = in.readInt();
+    this.compress = in.readBoolean();
+    if (!compress) {
+      int len = in.readInt();
+      byte[] bytes = new byte[len];
+      in.readFully(bytes);
+      this.bits = BitSet.valueOf(bytes);
+    } else {
+      this.bitmap = new RoaringBitmap();
+      bitmap.deserialize(in);
+    }
+    this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
+  }
+}