You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/21 17:14:32 UTC

carbondata git commit: [CARBONDATA-2373][DataMap] Add bloom datamap to support precise equal query

Repository: carbondata
Updated Branches:
  refs/heads/master 3ff574d29 -> b86ff926d


[CARBONDATA-2373][DataMap] Add bloom datamap to support precise equal query

For each indexed column, adding a bloom filter for each blocklet to
indicate whether it belongs to this blocklet.
Currently bloom filter is using guava version.

This closes #2200


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b86ff926
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b86ff926
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b86ff926

Branch: refs/heads/master
Commit: b86ff926d4f60a4ab62c0adfb659816b45cae767
Parents: 3ff574d
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Sat Apr 21 10:59:04 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Apr 22 01:13:45 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapMeta.java    |  17 ++
 datamap/bloom/pom.xml                           |  81 ++++++
 .../datamap/bloom/BloomCoarseGrainDataMap.java  | 252 +++++++++++++++++++
 .../bloom/BloomCoarseGrainDataMapFactory.java   | 238 ++++++++++++++++++
 .../carbondata/datamap/bloom/BloomDMModel.java  |  58 +++++
 .../datamap/bloom/BloomDataMapWriter.java       | 223 ++++++++++++++++
 .../bloom/BloomCoarseGrainDataMapSuite.scala    | 127 ++++++++++
 integration/spark-common-test/pom.xml           |   6 +
 pom.xml                                         |   2 +
 9 files changed, 1004 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index 396c5db..cf51b11 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 @InterfaceAudience.Developer("DataMap")
 @InterfaceStability.Evolving
 public class DataMapMeta {
+  private String dataMapName;
 
   private List<String> indexedColumns;
 
@@ -39,6 +40,16 @@ public class DataMapMeta {
     this.optimizedOperation = optimizedOperation;
   }
 
+  public DataMapMeta(String dataMapName, List<String> indexedColumns,
+      List<ExpressionType> optimizedOperation) {
+    this(indexedColumns, optimizedOperation);
+    this.dataMapName = dataMapName;
+  }
+
+  public String getDataMapName() {
+    return dataMapName;
+  }
+
   public List<String> getIndexedColumns() {
     return indexedColumns;
   }
@@ -46,4 +57,10 @@ public class DataMapMeta {
   public List<ExpressionType> getOptimizedOperation() {
     return optimizedOperation;
   }
+
+  @Override
+  public String toString() {
+    return "DataMapMeta{" + "dataMapName='" + dataMapName + '\'' + ", indexedColumns="
+        + indexedColumns + ", optimizedOperation=" + optimizedOperation + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/bloom/pom.xml b/datamap/bloom/pom.xml
new file mode 100644
index 0000000..472df36
--- /dev/null
+++ b/datamap/bloom/pom.xml
@@ -0,0 +1,81 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-bloom</artifactId>
+  <name>Apache CarbonData :: Bloom Index DataMap</name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <resources>
+      <resource>
+        <directory>src/resources</directory>
+      </resource>
+      <resource>
+        <directory>.</directory>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
new file mode 100644
index 0000000..6e1a2eb
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed column,
+ * a bloom filter is constructed to indicate whether a value belongs to this blocklet.
+ * More information of the index file can be found in the corresponding datamap writer.
+ */
+@InterfaceAudience.Internal
+public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
+  private String[] indexFilePath;
+  private Set<String> indexedColumn;
+  private List<BloomDMModel> bloomIndexList;
+  private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
+  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
+
+  @Override
+  public void init(DataMapModel dataMapModel) throws MemoryException, IOException {
+    Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+    if (!fs.exists(indexPath)) {
+      throw new IOException(
+          String.format("Path %s for Bloom index dataMap does not exist", indexPath));
+    }
+    if (!fs.isDirectory(indexPath)) {
+      throw new IOException(
+          String.format("Path %s for Bloom index dataMap must be a directory", indexPath));
+    }
+
+    FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() {
+      @Override public boolean accept(Path path) {
+        return path.getName().endsWith(BLOOM_INDEX_SUFFIX);
+      }
+    });
+    indexFilePath = new String[indexFileStatus.length];
+    indexedColumn = new HashSet<String>();
+    bloomIndexList = new ArrayList<BloomDMModel>();
+    indexCol2BloomDMList = ArrayListMultimap.create();
+    for (int i = 0; i < indexFileStatus.length; i++) {
+      indexFilePath[i] = indexFileStatus[i].getPath().toString();
+      String indexCol = StringUtils.substringBetween(indexFilePath[i], ".carbondata.",
+          BLOOM_INDEX_SUFFIX);
+      indexedColumn.add(indexCol);
+      bloomIndexList.addAll(readBloomIndex(indexFilePath[i]));
+      indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i]));
+    }
+    LOGGER.info("find bloom index datamap for column: "
+        + StringUtils.join(indexedColumn, ", "));
+  }
+
+  private List<BloomDMModel> readBloomIndex(String indexFile) throws IOException {
+    LOGGER.info("read bloom index from file: " + indexFile);
+    List<BloomDMModel> bloomDMModelList = new ArrayList<BloomDMModel>();
+    DataInputStream dataInStream = null;
+    ObjectInputStream objectInStream = null;
+    try {
+      dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
+      objectInStream = new ObjectInputStream(dataInStream);
+      try {
+        BloomDMModel model = null;
+        while ((model = (BloomDMModel) objectInStream.readObject()) != null) {
+          LOGGER.info("read bloom index: " + model);
+          bloomDMModelList.add(model);
+        }
+      } catch (EOFException e) {
+        LOGGER.info("read " + bloomDMModelList.size() + " bloom indices from " + indexFile);
+      }
+      return bloomDMModelList;
+    } catch (ClassNotFoundException e) {
+      LOGGER.error("Error occrus while reading bloom index");
+      throw new RuntimeException("Error occrus while reading bloom index", e);
+    } finally {
+      CarbonUtil.closeStreams(objectInStream, dataInStream);
+    }
+  }
+
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+      List<PartitionSpec> partitions) throws IOException {
+    List<Blocklet> hitBlocklets = new ArrayList<Blocklet>();
+    if (filterExp == null) {
+      // null is different from empty here. Empty means after pruning, no blocklet need to scan.
+      return null;
+    }
+
+    List<BloomQueryModel> bloomQueryModels = getQueryValue(filterExp.getFilterExpression());
+
+    for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
+      LOGGER.info("prune blocklet for query: " + bloomQueryModel);
+      for (List<BloomDMModel> bloomDMModels : indexCol2BloomDMList.get(
+          bloomQueryModel.columnName)) {
+        for (BloomDMModel bloomDMModel : bloomDMModels) {
+          boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
+              convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue));
+          if (scanRequired) {
+            LOGGER.info(String.format(
+                "BloomCoarseGrainDataMap: Need to scan block#%s -> blocklet#%s",
+                bloomDMModel.getBlockId(), String.valueOf(bloomDMModel.getBlockletNo())));
+            Blocklet blocklet = new Blocklet(bloomDMModel.getBlockId(),
+                String.valueOf(bloomDMModel.getBlockletNo()));
+            hitBlocklets.add(blocklet);
+          } else {
+            LOGGER.info(String.format(
+                "BloomCoarseGrainDataMap: Skip scan block#%s -> blocklet#%s",
+                bloomDMModel.getBlockId(), String.valueOf(bloomDMModel.getBlockletNo())));
+          }
+        }
+      }
+    }
+
+    return hitBlocklets;
+  }
+
+  private byte[] convertValueToBytes(DataType dataType, Object value) {
+    try {
+      if (dataType == DataTypes.STRING) {
+        if (value instanceof byte[]) {
+          return (byte[]) value;
+        } else {
+          return String.valueOf(value).getBytes("utf-8");
+        }
+      } else {
+        return CarbonUtil.getValueAsBytes(dataType, value);
+      }
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("Error occurs while converting " + value + " to " + dataType, e);
+    }
+  }
+
+  private List<BloomQueryModel> getQueryValue(Expression expression) {
+    List<BloomQueryModel> queryModels = new ArrayList<BloomQueryModel>();
+    if (expression instanceof EqualToExpression) {
+      Expression left = ((EqualToExpression) expression).getLeft();
+      Expression right = ((EqualToExpression) expression).getRight();
+      String column;
+      DataType dataType;
+      Object value;
+      if (left instanceof ColumnExpression && right instanceof LiteralExpression) {
+        column = ((ColumnExpression) left).getColumnName();
+        if (indexedColumn.contains(column)) {
+          dataType = ((ColumnExpression) left).getDataType();
+          value = ((LiteralExpression) right).getLiteralExpValue();
+          BloomQueryModel bloomQueryModel = new BloomQueryModel(column, dataType, value);
+          queryModels.add(bloomQueryModel);
+        }
+        return queryModels;
+      } else if (left instanceof LiteralExpression && right instanceof ColumnExpression) {
+        column = ((ColumnExpression) right).getColumnName();
+        if (indexedColumn.contains(column)) {
+          dataType = ((ColumnExpression) right).getDataType();
+          value = ((LiteralExpression) left).getLiteralExpValue();
+          BloomQueryModel bloomQueryModel = new BloomQueryModel(column, dataType, value);
+          queryModels.add(bloomQueryModel);
+        }
+        return queryModels;
+      }
+    }
+
+    for (Expression child : expression.getChildren()) {
+      queryModels.addAll(getQueryValue(child));
+    }
+    return queryModels;
+  }
+
+  @Override
+  public boolean isScanRequired(FilterResolverIntf filterExp) {
+    return true;
+  }
+
+  @Override
+  public void clear() {
+    bloomIndexList.clear();
+    bloomIndexList = null;
+  }
+
+  static class BloomQueryModel {
+    private String columnName;
+    private DataType dataType;
+    private Object filterValue;
+
+    public BloomQueryModel(String columnName, DataType dataType, Object filterValue) {
+      this.columnName = columnName;
+      this.dataType = dataType;
+      this.filterValue = filterValue;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("BloomQueryModel{");
+      sb.append("columnName='").append(columnName).append('\'');
+      sb.append(", dataType=").append(dataType);
+      sb.append(", filterValue=").append(filterValue);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..3430a65
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.commons.lang3.StringUtils;
+
+@InterfaceAudience.Internal
+public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrainDataMap> {
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      BloomCoarseGrainDataMapFactory.class.getName());
+  /**
+   * property for indexed column
+   */
+  private static final String BLOOM_COLUMNS = "bloom_columns";
+  /**
+   * property for size of bloom filter
+   */
+  private static final String BLOOM_SIZE = "bloom_size";
+  /**
+   * default size for bloom filter: suppose one blocklet contains 20 pages
+   * and all the indexed value is distinct.
+   */
+  private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20;
+  private CarbonTable carbonTable;
+  private DataMapMeta dataMapMeta;
+  private String dataMapName;
+  private int bloomFilterSize;
+
+  @Override
+  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
+      throws IOException, MalformedDataMapCommandException {
+    Objects.requireNonNull(carbonTable);
+    Objects.requireNonNull(dataMapSchema);
+
+    this.carbonTable = carbonTable;
+    this.dataMapName = dataMapSchema.getDataMapName();
+
+    List<String> indexedColumns = validateAndGetIndexedColumns(dataMapSchema, carbonTable);
+    this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
+    List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
+    // todo: support more optimize operations
+    optimizedOperations.add(ExpressionType.EQUALS);
+    this.dataMapMeta = new DataMapMeta(this.dataMapName, indexedColumns, optimizedOperations);
+    LOGGER.info(String.format("DataMap %s works for %s with bloom size %d",
+        this.dataMapName, this.dataMapMeta, this.bloomFilterSize));
+  }
+
+  /**
+   * validate Lucene DataMap BLOOM_COLUMNS
+   * 1. require BLOOM_COLUMNS property
+   * 2. BLOOM_COLUMNS can't contains illegal argument(empty, blank)
+   * 3. BLOOM_COLUMNS can't contains duplicate same columns
+   * 4. BLOOM_COLUMNS should be exists in table columns
+   */
+  private List<String> validateAndGetIndexedColumns(DataMapSchema dmSchema,
+      CarbonTable carbonTable) throws MalformedDataMapCommandException {
+    String bloomColumnsStr = dmSchema.getProperties().get(BLOOM_COLUMNS);
+    if (StringUtils.isBlank(bloomColumnsStr)) {
+      throw new MalformedDataMapCommandException(
+          String.format("Bloom coarse datamap require proper %s property", BLOOM_COLUMNS));
+    }
+    String[] bloomColumns = StringUtils.split(bloomColumnsStr, ",", -1);
+    List<String> bloomColumnList = new ArrayList<String>(bloomColumns.length);
+    Set<String> bloomColumnSet = new HashSet<String>(bloomColumns.length);
+    for (String bloomCol : bloomColumns) {
+      CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(),
+          bloomCol.trim().toLowerCase());
+      if (null == column) {
+        throw new MalformedDataMapCommandException(
+            String.format("%s: %s does not exist in table. Please check create datamap statement",
+                BLOOM_COLUMNS, bloomCol));
+      }
+      if (!bloomColumnSet.add(column.getColName())) {
+        throw new MalformedDataMapCommandException(String.format("%s has duplicate column: %s",
+            BLOOM_COLUMNS, bloomCol));
+      }
+      bloomColumnList.add(column.getColName());
+    }
+    return bloomColumnList;
+  }
+
+  /**
+   * validate Lucene DataMap BLOOM_SIZE
+   * 1. BLOOM_SIZE property is optional, 32000 * 20 will be the default size.
+   * 2. BLOOM_SIZE should be an integer that greater than 0
+   */
+  private int validateAndGetBloomFilterSize(DataMapSchema dmSchema)
+      throws MalformedDataMapCommandException {
+    String bloomFilterSizeStr = dmSchema.getProperties().get(BLOOM_SIZE);
+    if (StringUtils.isBlank(bloomFilterSizeStr)) {
+      LOGGER.warn(
+          String.format("Bloom filter size is not configured for datamap %s, use default value %d",
+              dataMapName, DEFAULT_BLOOM_FILTER_SIZE));
+      return DEFAULT_BLOOM_FILTER_SIZE;
+    }
+    int bloomFilterSize;
+    try {
+      bloomFilterSize = Integer.parseInt(bloomFilterSizeStr);
+    } catch (NumberFormatException e) {
+      throw new MalformedDataMapCommandException(
+          String.format("Invalid value of bloom filter size '%s', it should be an integer",
+              bloomFilterSizeStr));
+    }
+    // todo: reconsider the boundaries of bloom filter size
+    if (bloomFilterSize <= 0) {
+      throw new MalformedDataMapCommandException(
+          String.format("Invalid value of bloom filter size '%s', it should be greater than 0",
+              bloomFilterSizeStr));
+    }
+    return bloomFilterSize;
+  }
+
+  @Override
+  public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+    LOGGER.info(
+        String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
+            this.dataMapName, this.carbonTable.getTableName() , writeDirectoryPath));
+    return new BloomDataMapWriter(this.carbonTable.getAbsoluteTableIdentifier(),
+        this.dataMapMeta, this.bloomFilterSize, segment, writeDirectoryPath);
+  }
+
+  @Override
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
+    List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
+    BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
+    try {
+      bloomDM.init(new DataMapModel(BloomDataMapWriter.genDataMapStorePath(
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segment.getSegmentNo()),
+          dataMapName)));
+    } catch (MemoryException e) {
+      throw new IOException("Error occurs while init Bloom DataMap", e);
+    }
+    dataMaps.add(bloomDM);
+    return dataMaps;
+  }
+
+  @Override
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public List<DataMapDistributable> toDistributable(Segment segment) {
+    return null;
+  }
+
+  @Override
+  public void fireEvent(Event event) {
+
+  }
+
+  @Override
+  public void clear(Segment segment) {
+
+  }
+
+  @Override
+  public void clear() {
+
+  }
+
+  @Override
+  public void deleteDatamapData() {
+    SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+    try {
+      List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
+      for (Segment segment : validSegments) {
+        String segmentId = segment.getSegmentNo();
+        String datamapPath = CarbonTablePath.getSegmentPath(
+            carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId)
+            + File.separator + dataMapName;
+        if (FileFactory.isFileExist(datamapPath)) {
+          CarbonFile file = FileFactory.getCarbonFile(datamapPath,
+              FileFactory.getFileType(datamapPath));
+          CarbonUtil.deleteFoldersAndFilesSilent(file);
+        }
+      }
+    } catch (IOException | InterruptedException ex) {
+      LOGGER.error("drop datamap failed, failed to delete datamap directory");
+    }
+  }
+  @Override
+  public DataMapMeta getMeta() {
+    return this.dataMapMeta;
+  }
+
+  @Override
+  public DataMapLevel getDataMapType() {
+    return DataMapLevel.CG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
new file mode 100644
index 0000000..b72f08f
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import com.google.common.hash.BloomFilter;
+
+@InterfaceAudience.Internal
+public class BloomDMModel implements Serializable {
+  private static final long serialVersionUID = 7281578747306832771L;
+  private String blockId;
+  private int blockletNo;
+  private BloomFilter<byte[]> bloomFilter;
+
+  public BloomDMModel(String blockId, int blockletNo, BloomFilter<byte[]> bloomFilter) {
+    this.blockId = blockId;
+    this.blockletNo = blockletNo;
+    this.bloomFilter = bloomFilter;
+  }
+
+  public String getBlockId() {
+    return blockId;
+  }
+
+  public int getBlockletNo() {
+    return blockletNo;
+  }
+
+  public BloomFilter<byte[]> getBloomFilter() {
+    return bloomFilter;
+  }
+
+  @Override public String toString() {
+    final StringBuilder sb = new StringBuilder("BloomDMModel{");
+    sb.append("blockId='").append(blockId).append('\'');
+    sb.append(", blockletNo=").append(blockletNo);
+    sb.append(", bloomFilter=").append(bloomFilter);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
new file mode 100644
index 0000000..4065523
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is
+ * constructed to indicate whether a value belongs to this blocklet. Bloom filter of blocklet that
+ * belongs to same block will be written to one index file suffixed with .bloomindex. So the number
+ * of bloom index file will be equal to that of the blocks.
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapWriter extends DataMapWriter {
+  private String dataMapName;
+  private List<String> indexedColumns;
+  private int bloomFilterSize;
+  // map column name to ordinal in pages
+  private Map<String, Integer> col2Ordianl;
+  private Map<String, DataType> col2DataType;
+  private String currentBlockId;
+  private int currentBlockletId;
+  private List<String> currentDMFiles;
+  private List<DataOutputStream> currentDataOutStreams;
+  private List<ObjectOutputStream> currentObjectOutStreams;
+  private List<BloomFilter<byte[]>> indexBloomFilters;
+
+  @InterfaceAudience.Internal
+  public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta dataMapMeta,
+      int bloomFilterSize, Segment segment, String writeDirectoryPath) {
+    super(identifier, segment, writeDirectoryPath);
+    dataMapName = dataMapMeta.getDataMapName();
+    indexedColumns = dataMapMeta.getIndexedColumns();
+    this.bloomFilterSize = bloomFilterSize;
+    col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
+    col2DataType = new HashMap<String, DataType>(indexedColumns.size());
+
+    currentDMFiles = new ArrayList<String>(indexedColumns.size());
+    currentDataOutStreams = new ArrayList<DataOutputStream>(indexedColumns.size());
+    currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexedColumns.size());
+
+    indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
+  }
+
+  @Override
+  public void onBlockStart(String blockId, long taskId) throws IOException {
+    this.currentBlockId = blockId;
+    this.currentBlockletId = 0;
+    currentDMFiles.clear();
+    currentDataOutStreams.clear();
+    currentObjectOutStreams.clear();
+    initDataMapFile();
+  }
+
+  @Override
+  public void onBlockEnd(String blockId) throws IOException {
+    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+      CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId),
+          this.currentObjectOutStreams.get(indexColId));
+      commitFile(this.currentDMFiles.get(indexColId));
+    }
+  }
+
+  @Override
+  public void onBlockletStart(int blockletId) {
+    this.currentBlockletId = blockletId;
+    indexBloomFilters.clear();
+    for (int i = 0; i < indexedColumns.size(); i++) {
+      indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
+          bloomFilterSize, 0.00001d));
+    }
+  }
+
+  @Override
+  public void onBlockletEnd(int blockletId) {
+    try {
+      writeBloomDataMapFile();
+    } catch (Exception e) {
+      for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
+        CarbonUtil.closeStreams(objectOutputStream);
+      }
+      for (DataOutputStream dataOutputStream : currentDataOutStreams) {
+        CarbonUtil.closeStreams(dataOutputStream);
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  // notice that the input pages only contains the indexed columns
+  @Override
+  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
+      throws IOException {
+    col2Ordianl.clear();
+    col2DataType.clear();
+    for (int colId = 0; colId < pages.length; colId++) {
+      String columnName = pages[colId].getColumnSpec().getFieldName().toLowerCase();
+      col2Ordianl.put(columnName, colId);
+      DataType columnType = pages[colId].getColumnSpec().getSchemaDataType();
+      col2DataType.put(columnName, columnType);
+    }
+
+    // for each row
+    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
+      // for each indexed column
+      for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+        String indexedCol = indexedColumns.get(indexColId);
+        byte[] indexValue;
+        if (DataTypes.STRING == col2DataType.get(indexedCol)
+            || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
+          byte[] originValue = (byte[]) pages[col2Ordianl.get(indexedCol)].getData(rowId);
+          indexValue = new byte[originValue.length - 2];
+          System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
+        } else {
+          Object originValue = pages[col2Ordianl.get(indexedCol)].getData(rowId);
+          indexValue = CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
+        }
+
+        indexBloomFilters.get(indexColId).put(indexValue);
+      }
+    }
+  }
+
+  private void initDataMapFile() throws IOException {
+    String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, this.dataMapName);
+    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+      String dmFile = dataMapDir + File.separator + this.currentBlockId
+          + '.' + indexedColumns.get(indexColId) + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
+      DataOutputStream dataOutStream = null;
+      ObjectOutputStream objectOutStream = null;
+      try {
+        FileFactory.createNewFile(dmFile, FileFactory.getFileType(dmFile));
+        dataOutStream = FileFactory.getDataOutputStream(dmFile,
+            FileFactory.getFileType(dmFile));
+        objectOutStream = new ObjectOutputStream(dataOutStream);
+      } catch (IOException e) {
+        CarbonUtil.closeStreams(objectOutStream, dataOutStream);
+        throw new IOException(e);
+      }
+
+      this.currentDMFiles.add(dmFile);
+      this.currentDataOutStreams.add(dataOutStream);
+      this.currentObjectOutStreams.add(objectOutStream);
+    }
+  }
+
+  private void writeBloomDataMapFile() throws IOException {
+    for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+      BloomDMModel model = new BloomDMModel(this.currentBlockId, this.currentBlockletId,
+          indexBloomFilters.get(indexColId));
+      // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface.
+      // In lower version, we use default java serializer to write bloomfilter.
+      this.currentObjectOutStreams.get(indexColId).writeObject(model);
+      this.currentObjectOutStreams.get(indexColId).flush();
+      this.currentDataOutStreams.get(indexColId).flush();
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+
+  }
+
+  @Override
+  protected void commitFile(String dataMapFile) throws IOException {
+    super.commitFile(dataMapFile);
+  }
+
+  /**
+   * create and return path that will store the datamap
+   *
+   * @param dataPath patch to store the carbondata factdata
+   * @param dataMapName datamap name
+   * @return path to store the datamap
+   * @throws IOException
+   */
+  public static String genDataMapStorePath(String dataPath, String dataMapName)
+      throws IOException {
+    String dmDir = dataPath + File.separator + dataMapName;
+    Path dmPath = FileFactory.getPath(dmDir);
+    FileSystem fs = FileFactory.getFileSystem(dmPath);
+    if (!fs.exists(dmPath)) {
+      fs.mkdirs(dmPath);
+    }
+    return dmDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
new file mode 100644
index 0000000..e7bab95
--- /dev/null
+++ b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom
+
+import java.io.{File, PrintWriter}
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+  val inputFile = s"$resourcesPath/bloom_datamap_input.csv"
+  val normalTable = "carbon_normal"
+  val bloomDMSampleTable = "carbon_bloom"
+  val dataMapName = "bloom_dm"
+  val lineNum = 500000
+
+  override protected def beforeAll(): Unit = {
+    createFile(inputFile, line = lineNum, start = 0)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  test("test bloom datamap") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+         | USING '${classOf[BloomCoarseGrainDataMapFactory].getName}'
+         | DMProperties('BLOOM_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+
+    sql(s"show datamap on table $bloomDMSampleTable").show(false)
+    sql(s"select * from $bloomDMSampleTable where city = 'city_5'").show(false)
+    sql(s"select * from $bloomDMSampleTable limit 5").show(false)
+
+    checkExistence(sql(s"show datamap on table $bloomDMSampleTable"), true, dataMapName)
+    checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"),
+      Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, "(NA)"))
+    checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 1"),
+      sql(s"select * from $normalTable where id = 1"))
+    checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 999"),
+      sql(s"select * from $normalTable where id = 999"))
+    checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_1'"),
+      sql(s"select * from $normalTable where city = 'city_1'"))
+    checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_999'"),
+      sql(s"select * from $normalTable where city = 'city_999'"))
+    checkAnswer(sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+                    s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"),
+      sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+          s" count(distinct s1), count(distinct s2) from $normalTable"))
+    checkAnswer(sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+                    s" from $bloomDMSampleTable"),
+      sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+          s" from $normalTable"))
+  }
+
+  // todo: will add more tests on bloom datamap, such as exception, delete datamap, show profiler
+
+  override protected def afterAll(): Unit = {
+    deleteFile(inputFile)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+  }
+
+  private def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+    if (!new File(fileName).exists()) {
+      val write = new PrintWriter(new File(fileName))
+      for (i <- start until (start + line)) {
+        write.println(
+          s"$i,n$i,city_$i,${ Random.nextInt(80) }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }")
+      }
+      write.close()
+    }
+  }
+
+  private def deleteFile(fileName: String): Unit = {
+    val file = new File(fileName)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index 1c6cee9..d70fa2e 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -106,6 +106,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-bloom</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-store-sdk</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7d420ac..9e629f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -483,6 +483,7 @@
         <module>streaming</module>
         <module>examples/spark2</module>
         <module>datamap/lucene</module>
+        <module>datamap/bloom</module>
       </modules>
       <build>
         <plugins>
@@ -538,6 +539,7 @@
         <module>streaming</module>
         <module>examples/spark2</module>
         <module>datamap/lucene</module>
+        <module>datamap/bloom</module>
       </modules>
       <build>
         <plugins>