You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/21 17:14:32 UTC
carbondata git commit: [CARBONDATA-2373][DataMap] Add bloom datamap
to support precise equal query
Repository: carbondata
Updated Branches:
refs/heads/master 3ff574d29 -> b86ff926d
[CARBONDATA-2373][DataMap] Add bloom datamap to support precise equal query
For each indexed column, adding a bloom filter for each blocklet to
indicate whether it belongs to this blocklet.
Currently bloom filter is using guava version.
This closes #2200
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b86ff926
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b86ff926
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b86ff926
Branch: refs/heads/master
Commit: b86ff926d4f60a4ab62c0adfb659816b45cae767
Parents: 3ff574d
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Sat Apr 21 10:59:04 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sun Apr 22 01:13:45 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/datamap/DataMapMeta.java | 17 ++
datamap/bloom/pom.xml | 81 ++++++
.../datamap/bloom/BloomCoarseGrainDataMap.java | 252 +++++++++++++++++++
.../bloom/BloomCoarseGrainDataMapFactory.java | 238 ++++++++++++++++++
.../carbondata/datamap/bloom/BloomDMModel.java | 58 +++++
.../datamap/bloom/BloomDataMapWriter.java | 223 ++++++++++++++++
.../bloom/BloomCoarseGrainDataMapSuite.scala | 127 ++++++++++
integration/spark-common-test/pom.xml | 6 +
pom.xml | 2 +
9 files changed, 1004 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
index 396c5db..cf51b11 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
@InterfaceAudience.Developer("DataMap")
@InterfaceStability.Evolving
public class DataMapMeta {
+ private String dataMapName;
private List<String> indexedColumns;
@@ -39,6 +40,16 @@ public class DataMapMeta {
this.optimizedOperation = optimizedOperation;
}
+ public DataMapMeta(String dataMapName, List<String> indexedColumns,
+ List<ExpressionType> optimizedOperation) {
+ this(indexedColumns, optimizedOperation);
+ this.dataMapName = dataMapName;
+ }
+
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
public List<String> getIndexedColumns() {
return indexedColumns;
}
@@ -46,4 +57,10 @@ public class DataMapMeta {
public List<ExpressionType> getOptimizedOperation() {
return optimizedOperation;
}
+
+ @Override
+ public String toString() {
+ return "DataMapMeta{" + "dataMapName='" + dataMapName + '\'' + ", indexedColumns="
+ + indexedColumns + ", optimizedOperation=" + optimizedOperation + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/bloom/pom.xml b/datamap/bloom/pom.xml
new file mode 100644
index 0000000..472df36
--- /dev/null
+++ b/datamap/bloom/pom.xml
@@ -0,0 +1,81 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>carbondata-bloom</artifactId>
+ <name>Apache CarbonData :: Bloom Index DataMap</name>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/resources</directory>
+ </resource>
+ <resource>
+ <directory>.</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
new file mode 100644
index 0000000..6e1a2eb
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * BloomDataCoarseGrainMap is constructed in blocklet level. For each indexed column,
+ * a bloom filter is constructed to indicate whether a value belongs to this blocklet.
+ * More information of the index file can be found in the corresponding datamap writer.
+ */
+@InterfaceAudience.Internal
+public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
+ private String[] indexFilePath;
+ private Set<String> indexedColumn;
+ private List<BloomDMModel> bloomIndexList;
+ private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
+ public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
+
+ @Override
+ public void init(DataMapModel dataMapModel) throws MemoryException, IOException {
+ Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
+ FileSystem fs = FileFactory.getFileSystem(indexPath);
+ if (!fs.exists(indexPath)) {
+ throw new IOException(
+ String.format("Path %s for Bloom index dataMap does not exist", indexPath));
+ }
+ if (!fs.isDirectory(indexPath)) {
+ throw new IOException(
+ String.format("Path %s for Bloom index dataMap must be a directory", indexPath));
+ }
+
+ FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return path.getName().endsWith(BLOOM_INDEX_SUFFIX);
+ }
+ });
+ indexFilePath = new String[indexFileStatus.length];
+ indexedColumn = new HashSet<String>();
+ bloomIndexList = new ArrayList<BloomDMModel>();
+ indexCol2BloomDMList = ArrayListMultimap.create();
+ for (int i = 0; i < indexFileStatus.length; i++) {
+ indexFilePath[i] = indexFileStatus[i].getPath().toString();
+ String indexCol = StringUtils.substringBetween(indexFilePath[i], ".carbondata.",
+ BLOOM_INDEX_SUFFIX);
+ indexedColumn.add(indexCol);
+ bloomIndexList.addAll(readBloomIndex(indexFilePath[i]));
+ indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i]));
+ }
+ LOGGER.info("find bloom index datamap for column: "
+ + StringUtils.join(indexedColumn, ", "));
+ }
+
+ private List<BloomDMModel> readBloomIndex(String indexFile) throws IOException {
+ LOGGER.info("read bloom index from file: " + indexFile);
+ List<BloomDMModel> bloomDMModelList = new ArrayList<BloomDMModel>();
+ DataInputStream dataInStream = null;
+ ObjectInputStream objectInStream = null;
+ try {
+ dataInStream = FileFactory.getDataInputStream(indexFile, FileFactory.getFileType(indexFile));
+ objectInStream = new ObjectInputStream(dataInStream);
+ try {
+ BloomDMModel model = null;
+ while ((model = (BloomDMModel) objectInStream.readObject()) != null) {
+ LOGGER.info("read bloom index: " + model);
+ bloomDMModelList.add(model);
+ }
+ } catch (EOFException e) {
+ LOGGER.info("read " + bloomDMModelList.size() + " bloom indices from " + indexFile);
+ }
+ return bloomDMModelList;
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("Error occrus while reading bloom index");
+ throw new RuntimeException("Error occrus while reading bloom index", e);
+ } finally {
+ CarbonUtil.closeStreams(objectInStream, dataInStream);
+ }
+ }
+
+ @Override
+ public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties,
+ List<PartitionSpec> partitions) throws IOException {
+ List<Blocklet> hitBlocklets = new ArrayList<Blocklet>();
+ if (filterExp == null) {
+ // null is different from empty here. Empty means after pruning, no blocklet need to scan.
+ return null;
+ }
+
+ List<BloomQueryModel> bloomQueryModels = getQueryValue(filterExp.getFilterExpression());
+
+ for (BloomQueryModel bloomQueryModel : bloomQueryModels) {
+ LOGGER.info("prune blocklet for query: " + bloomQueryModel);
+ for (List<BloomDMModel> bloomDMModels : indexCol2BloomDMList.get(
+ bloomQueryModel.columnName)) {
+ for (BloomDMModel bloomDMModel : bloomDMModels) {
+ boolean scanRequired = bloomDMModel.getBloomFilter().mightContain(
+ convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue));
+ if (scanRequired) {
+ LOGGER.info(String.format(
+ "BloomCoarseGrainDataMap: Need to scan block#%s -> blocklet#%s",
+ bloomDMModel.getBlockId(), String.valueOf(bloomDMModel.getBlockletNo())));
+ Blocklet blocklet = new Blocklet(bloomDMModel.getBlockId(),
+ String.valueOf(bloomDMModel.getBlockletNo()));
+ hitBlocklets.add(blocklet);
+ } else {
+ LOGGER.info(String.format(
+ "BloomCoarseGrainDataMap: Skip scan block#%s -> blocklet#%s",
+ bloomDMModel.getBlockId(), String.valueOf(bloomDMModel.getBlockletNo())));
+ }
+ }
+ }
+ }
+
+ return hitBlocklets;
+ }
+
+ private byte[] convertValueToBytes(DataType dataType, Object value) {
+ try {
+ if (dataType == DataTypes.STRING) {
+ if (value instanceof byte[]) {
+ return (byte[]) value;
+ } else {
+ return String.valueOf(value).getBytes("utf-8");
+ }
+ } else {
+ return CarbonUtil.getValueAsBytes(dataType, value);
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Error occurs while converting " + value + " to " + dataType, e);
+ }
+ }
+
+ private List<BloomQueryModel> getQueryValue(Expression expression) {
+ List<BloomQueryModel> queryModels = new ArrayList<BloomQueryModel>();
+ if (expression instanceof EqualToExpression) {
+ Expression left = ((EqualToExpression) expression).getLeft();
+ Expression right = ((EqualToExpression) expression).getRight();
+ String column;
+ DataType dataType;
+ Object value;
+ if (left instanceof ColumnExpression && right instanceof LiteralExpression) {
+ column = ((ColumnExpression) left).getColumnName();
+ if (indexedColumn.contains(column)) {
+ dataType = ((ColumnExpression) left).getDataType();
+ value = ((LiteralExpression) right).getLiteralExpValue();
+ BloomQueryModel bloomQueryModel = new BloomQueryModel(column, dataType, value);
+ queryModels.add(bloomQueryModel);
+ }
+ return queryModels;
+ } else if (left instanceof LiteralExpression && right instanceof ColumnExpression) {
+ column = ((ColumnExpression) right).getColumnName();
+ if (indexedColumn.contains(column)) {
+ dataType = ((ColumnExpression) right).getDataType();
+ value = ((LiteralExpression) left).getLiteralExpValue();
+ BloomQueryModel bloomQueryModel = new BloomQueryModel(column, dataType, value);
+ queryModels.add(bloomQueryModel);
+ }
+ return queryModels;
+ }
+ }
+
+ for (Expression child : expression.getChildren()) {
+ queryModels.addAll(getQueryValue(child));
+ }
+ return queryModels;
+ }
+
+ @Override
+ public boolean isScanRequired(FilterResolverIntf filterExp) {
+ return true;
+ }
+
+ @Override
+ public void clear() {
+ bloomIndexList.clear();
+ bloomIndexList = null;
+ }
+
+ static class BloomQueryModel {
+ private String columnName;
+ private DataType dataType;
+ private Object filterValue;
+
+ public BloomQueryModel(String columnName, DataType dataType, Object filterValue) {
+ this.columnName = columnName;
+ this.dataType = dataType;
+ this.filterValue = filterValue;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("BloomQueryModel{");
+ sb.append("columnName='").append(columnName).append('\'');
+ sb.append(", dataType=").append(dataType);
+ sb.append(", filterValue=").append(filterValue);
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
new file mode 100644
index 0000000..3430a65
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.commons.lang3.StringUtils;
+
+@InterfaceAudience.Internal
+public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrainDataMap> {
+ private static final LogService LOGGER = LogServiceFactory.getLogService(
+ BloomCoarseGrainDataMapFactory.class.getName());
+ /**
+ * property for indexed column
+ */
+ private static final String BLOOM_COLUMNS = "bloom_columns";
+ /**
+ * property for size of bloom filter
+ */
+ private static final String BLOOM_SIZE = "bloom_size";
+ /**
+ * default size for bloom filter: suppose one blocklet contains 20 pages
+ * and all the indexed value is distinct.
+ */
+ private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20;
+ private CarbonTable carbonTable;
+ private DataMapMeta dataMapMeta;
+ private String dataMapName;
+ private int bloomFilterSize;
+
+ @Override
+ public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
+ throws IOException, MalformedDataMapCommandException {
+ Objects.requireNonNull(carbonTable);
+ Objects.requireNonNull(dataMapSchema);
+
+ this.carbonTable = carbonTable;
+ this.dataMapName = dataMapSchema.getDataMapName();
+
+ List<String> indexedColumns = validateAndGetIndexedColumns(dataMapSchema, carbonTable);
+ this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema);
+ List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>();
+ // todo: support more optimize operations
+ optimizedOperations.add(ExpressionType.EQUALS);
+ this.dataMapMeta = new DataMapMeta(this.dataMapName, indexedColumns, optimizedOperations);
+ LOGGER.info(String.format("DataMap %s works for %s with bloom size %d",
+ this.dataMapName, this.dataMapMeta, this.bloomFilterSize));
+ }
+
+ /**
+ * validate Lucene DataMap BLOOM_COLUMNS
+ * 1. require BLOOM_COLUMNS property
+ * 2. BLOOM_COLUMNS can't contains illegal argument(empty, blank)
+ * 3. BLOOM_COLUMNS can't contains duplicate same columns
+ * 4. BLOOM_COLUMNS should be exists in table columns
+ */
+ private List<String> validateAndGetIndexedColumns(DataMapSchema dmSchema,
+ CarbonTable carbonTable) throws MalformedDataMapCommandException {
+ String bloomColumnsStr = dmSchema.getProperties().get(BLOOM_COLUMNS);
+ if (StringUtils.isBlank(bloomColumnsStr)) {
+ throw new MalformedDataMapCommandException(
+ String.format("Bloom coarse datamap require proper %s property", BLOOM_COLUMNS));
+ }
+ String[] bloomColumns = StringUtils.split(bloomColumnsStr, ",", -1);
+ List<String> bloomColumnList = new ArrayList<String>(bloomColumns.length);
+ Set<String> bloomColumnSet = new HashSet<String>(bloomColumns.length);
+ for (String bloomCol : bloomColumns) {
+ CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(),
+ bloomCol.trim().toLowerCase());
+ if (null == column) {
+ throw new MalformedDataMapCommandException(
+ String.format("%s: %s does not exist in table. Please check create datamap statement",
+ BLOOM_COLUMNS, bloomCol));
+ }
+ if (!bloomColumnSet.add(column.getColName())) {
+ throw new MalformedDataMapCommandException(String.format("%s has duplicate column: %s",
+ BLOOM_COLUMNS, bloomCol));
+ }
+ bloomColumnList.add(column.getColName());
+ }
+ return bloomColumnList;
+ }
+
+ /**
+ * validate Lucene DataMap BLOOM_SIZE
+ * 1. BLOOM_SIZE property is optional, 32000 * 20 will be the default size.
+ * 2. BLOOM_SIZE should be an integer that greater than 0
+ */
+ private int validateAndGetBloomFilterSize(DataMapSchema dmSchema)
+ throws MalformedDataMapCommandException {
+ String bloomFilterSizeStr = dmSchema.getProperties().get(BLOOM_SIZE);
+ if (StringUtils.isBlank(bloomFilterSizeStr)) {
+ LOGGER.warn(
+ String.format("Bloom filter size is not configured for datamap %s, use default value %d",
+ dataMapName, DEFAULT_BLOOM_FILTER_SIZE));
+ return DEFAULT_BLOOM_FILTER_SIZE;
+ }
+ int bloomFilterSize;
+ try {
+ bloomFilterSize = Integer.parseInt(bloomFilterSizeStr);
+ } catch (NumberFormatException e) {
+ throw new MalformedDataMapCommandException(
+ String.format("Invalid value of bloom filter size '%s', it should be an integer",
+ bloomFilterSizeStr));
+ }
+ // todo: reconsider the boundaries of bloom filter size
+ if (bloomFilterSize <= 0) {
+ throw new MalformedDataMapCommandException(
+ String.format("Invalid value of bloom filter size '%s', it should be greater than 0",
+ bloomFilterSizeStr));
+ }
+ return bloomFilterSize;
+ }
+
+ @Override
+ public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+ LOGGER.info(
+ String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s",
+ this.dataMapName, this.carbonTable.getTableName() , writeDirectoryPath));
+ return new BloomDataMapWriter(this.carbonTable.getAbsoluteTableIdentifier(),
+ this.dataMapMeta, this.bloomFilterSize, segment, writeDirectoryPath);
+ }
+
+ @Override
+ public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
+ List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1);
+ BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
+ try {
+ bloomDM.init(new DataMapModel(BloomDataMapWriter.genDataMapStorePath(
+ CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segment.getSegmentNo()),
+ dataMapName)));
+ } catch (MemoryException e) {
+ throw new IOException("Error occurs while init Bloom DataMap", e);
+ }
+ dataMaps.add(bloomDM);
+ return dataMaps;
+ }
+
+ @Override
+ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<DataMapDistributable> toDistributable(Segment segment) {
+ return null;
+ }
+
+ @Override
+ public void fireEvent(Event event) {
+
+ }
+
+ @Override
+ public void clear(Segment segment) {
+
+ }
+
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public void deleteDatamapData() {
+ SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+ try {
+ List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
+ for (Segment segment : validSegments) {
+ String segmentId = segment.getSegmentNo();
+ String datamapPath = CarbonTablePath.getSegmentPath(
+ carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId)
+ + File.separator + dataMapName;
+ if (FileFactory.isFileExist(datamapPath)) {
+ CarbonFile file = FileFactory.getCarbonFile(datamapPath,
+ FileFactory.getFileType(datamapPath));
+ CarbonUtil.deleteFoldersAndFilesSilent(file);
+ }
+ }
+ } catch (IOException | InterruptedException ex) {
+ LOGGER.error("drop datamap failed, failed to delete datamap directory");
+ }
+ }
+ @Override
+ public DataMapMeta getMeta() {
+ return this.dataMapMeta;
+ }
+
+ @Override
+ public DataMapLevel getDataMapType() {
+ return DataMapLevel.CG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
new file mode 100644
index 0000000..b72f08f
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import com.google.common.hash.BloomFilter;
+
+@InterfaceAudience.Internal
+public class BloomDMModel implements Serializable {
+ private static final long serialVersionUID = 7281578747306832771L;
+ private String blockId;
+ private int blockletNo;
+ private BloomFilter<byte[]> bloomFilter;
+
+ public BloomDMModel(String blockId, int blockletNo, BloomFilter<byte[]> bloomFilter) {
+ this.blockId = blockId;
+ this.blockletNo = blockletNo;
+ this.bloomFilter = bloomFilter;
+ }
+
+ public String getBlockId() {
+ return blockId;
+ }
+
+ public int getBlockletNo() {
+ return blockletNo;
+ }
+
+ public BloomFilter<byte[]> getBloomFilter() {
+ return bloomFilter;
+ }
+
+ @Override public String toString() {
+ final StringBuilder sb = new StringBuilder("BloomDMModel{");
+ sb.append("blockId='").append(blockId).append('\'');
+ sb.append(", blockletNo=").append(blockletNo);
+ sb.append(", bloomFilter=").append(bloomFilter);
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
new file mode 100644
index 0000000..4065523
--- /dev/null
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is
+ * constructed to indicate whether a value belongs to this blocklet. Bloom filter of blocklet that
+ * belongs to same block will be written to one index file suffixed with .bloomindex. So the number
+ * of bloom index file will be equal to that of the blocks.
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapWriter extends DataMapWriter {
+ private String dataMapName;
+ private List<String> indexedColumns;
+ private int bloomFilterSize;
+ // map column name to ordinal in pages
+ private Map<String, Integer> col2Ordianl;
+ private Map<String, DataType> col2DataType;
+ private String currentBlockId;
+ private int currentBlockletId;
+ private List<String> currentDMFiles;
+ private List<DataOutputStream> currentDataOutStreams;
+ private List<ObjectOutputStream> currentObjectOutStreams;
+ private List<BloomFilter<byte[]>> indexBloomFilters;
+
+ @InterfaceAudience.Internal
+ public BloomDataMapWriter(AbsoluteTableIdentifier identifier, DataMapMeta dataMapMeta,
+ int bloomFilterSize, Segment segment, String writeDirectoryPath) {
+ super(identifier, segment, writeDirectoryPath);
+ dataMapName = dataMapMeta.getDataMapName();
+ indexedColumns = dataMapMeta.getIndexedColumns();
+ this.bloomFilterSize = bloomFilterSize;
+ col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
+ col2DataType = new HashMap<String, DataType>(indexedColumns.size());
+
+ currentDMFiles = new ArrayList<String>(indexedColumns.size());
+ currentDataOutStreams = new ArrayList<DataOutputStream>(indexedColumns.size());
+ currentObjectOutStreams = new ArrayList<ObjectOutputStream>(indexedColumns.size());
+
+ indexBloomFilters = new ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
+ }
+
+ @Override
+ public void onBlockStart(String blockId, long taskId) throws IOException {
+ this.currentBlockId = blockId;
+ this.currentBlockletId = 0;
+ currentDMFiles.clear();
+ currentDataOutStreams.clear();
+ currentObjectOutStreams.clear();
+ initDataMapFile();
+ }
+
+ @Override
+ public void onBlockEnd(String blockId) throws IOException {
+ for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+ CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId),
+ this.currentObjectOutStreams.get(indexColId));
+ commitFile(this.currentDMFiles.get(indexColId));
+ }
+ }
+
+ @Override
+ public void onBlockletStart(int blockletId) {
+ this.currentBlockletId = blockletId;
+ indexBloomFilters.clear();
+ for (int i = 0; i < indexedColumns.size(); i++) {
+ indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
+ bloomFilterSize, 0.00001d));
+ }
+ }
+
+ @Override
+ public void onBlockletEnd(int blockletId) {
+ try {
+ writeBloomDataMapFile();
+ } catch (Exception e) {
+ for (ObjectOutputStream objectOutputStream : currentObjectOutStreams) {
+ CarbonUtil.closeStreams(objectOutputStream);
+ }
+ for (DataOutputStream dataOutputStream : currentDataOutStreams) {
+ CarbonUtil.closeStreams(dataOutputStream);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ // notice that the input pages only contains the indexed columns
+ @Override
+ public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
+ throws IOException {
+ col2Ordianl.clear();
+ col2DataType.clear();
+ for (int colId = 0; colId < pages.length; colId++) {
+ String columnName = pages[colId].getColumnSpec().getFieldName().toLowerCase();
+ col2Ordianl.put(columnName, colId);
+ DataType columnType = pages[colId].getColumnSpec().getSchemaDataType();
+ col2DataType.put(columnName, columnType);
+ }
+
+ // for each row
+ for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
+ // for each indexed column
+ for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+ String indexedCol = indexedColumns.get(indexColId);
+ byte[] indexValue;
+ if (DataTypes.STRING == col2DataType.get(indexedCol)
+ || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
+ byte[] originValue = (byte[]) pages[col2Ordianl.get(indexedCol)].getData(rowId);
+ indexValue = new byte[originValue.length - 2];
+ System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 2);
+ } else {
+ Object originValue = pages[col2Ordianl.get(indexedCol)].getData(rowId);
+ indexValue = CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
+ }
+
+ indexBloomFilters.get(indexColId).put(indexValue);
+ }
+ }
+ }
+
+ private void initDataMapFile() throws IOException {
+ String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, this.dataMapName);
+ for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+ String dmFile = dataMapDir + File.separator + this.currentBlockId
+ + '.' + indexedColumns.get(indexColId) + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX;
+ DataOutputStream dataOutStream = null;
+ ObjectOutputStream objectOutStream = null;
+ try {
+ FileFactory.createNewFile(dmFile, FileFactory.getFileType(dmFile));
+ dataOutStream = FileFactory.getDataOutputStream(dmFile,
+ FileFactory.getFileType(dmFile));
+ objectOutStream = new ObjectOutputStream(dataOutStream);
+ } catch (IOException e) {
+ CarbonUtil.closeStreams(objectOutStream, dataOutStream);
+ throw new IOException(e);
+ }
+
+ this.currentDMFiles.add(dmFile);
+ this.currentDataOutStreams.add(dataOutStream);
+ this.currentObjectOutStreams.add(objectOutStream);
+ }
+ }
+
+ private void writeBloomDataMapFile() throws IOException {
+ for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) {
+ BloomDMModel model = new BloomDMModel(this.currentBlockId, this.currentBlockletId,
+ indexBloomFilters.get(indexColId));
+ // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface.
+ // In lower version, we use default java serializer to write bloomfilter.
+ this.currentObjectOutStreams.get(indexColId).writeObject(model);
+ this.currentObjectOutStreams.get(indexColId).flush();
+ this.currentDataOutStreams.get(indexColId).flush();
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+
+ }
+
+ @Override
+ protected void commitFile(String dataMapFile) throws IOException {
+ super.commitFile(dataMapFile);
+ }
+
+ /**
+ * create and return path that will store the datamap
+ *
+ * @param dataPath patch to store the carbondata factdata
+ * @param dataMapName datamap name
+ * @return path to store the datamap
+ * @throws IOException
+ */
+ public static String genDataMapStorePath(String dataPath, String dataMapName)
+ throws IOException {
+ String dmDir = dataPath + File.separator + dataMapName;
+ Path dmPath = FileFactory.getPath(dmDir);
+ FileSystem fs = FileFactory.getFileSystem(dmPath);
+ if (!fs.exists(dmPath)) {
+ fs.mkdirs(dmPath);
+ }
+ return dmDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
new file mode 100644
index 0000000..e7bab95
--- /dev/null
+++ b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom
+
+import java.io.{File, PrintWriter}
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+ val inputFile = s"$resourcesPath/bloom_datamap_input.csv"
+ val normalTable = "carbon_normal"
+ val bloomDMSampleTable = "carbon_bloom"
+ val dataMapName = "bloom_dm"
+ val lineNum = 500000
+
+ override protected def beforeAll(): Unit = {
+ createFile(inputFile, line = lineNum, start = 0)
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+ }
+
+ test("test bloom datamap") {
+ sql(
+ s"""
+ | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+ sql(
+ s"""
+ | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT,
+ | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+ | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+ | """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable
+ | USING '${classOf[BloomCoarseGrainDataMapFactory].getName}'
+ | DMProperties('BLOOM_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+ """.stripMargin)
+
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+
+ sql(s"show datamap on table $bloomDMSampleTable").show(false)
+ sql(s"select * from $bloomDMSampleTable where city = 'city_5'").show(false)
+ sql(s"select * from $bloomDMSampleTable limit 5").show(false)
+
+ checkExistence(sql(s"show datamap on table $bloomDMSampleTable"), true, dataMapName)
+ checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"),
+ Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, "(NA)"))
+ checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 1"),
+ sql(s"select * from $normalTable where id = 1"))
+ checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 999"),
+ sql(s"select * from $normalTable where id = 999"))
+ checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_1'"),
+ sql(s"select * from $normalTable where city = 'city_1'"))
+ checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_999'"),
+ sql(s"select * from $normalTable where city = 'city_999'"))
+ checkAnswer(sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+ s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"),
+ sql(s"select count(distinct id), count(distinct name), count(distinct city)," +
+ s" count(distinct s1), count(distinct s2) from $normalTable"))
+ checkAnswer(sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+ s" from $bloomDMSampleTable"),
+ sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" +
+ s" from $normalTable"))
+ }
+
+ // todo: will add more tests on bloom datamap, such as exception, delete datamap, show profiler
+
+ override protected def afterAll(): Unit = {
+ deleteFile(inputFile)
+ sql(s"DROP TABLE IF EXISTS $normalTable")
+ sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable")
+ }
+
+ private def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+ if (!new File(fileName).exists()) {
+ val write = new PrintWriter(new File(fileName))
+ for (i <- start until (start + line)) {
+ write.println(
+ s"$i,n$i,city_$i,${ Random.nextInt(80) }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+ s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }")
+ }
+ write.close()
+ }
+ }
+
+ private def deleteFile(fileName: String): Unit = {
+ val file = new File(fileName)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index 1c6cee9..d70fa2e 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -106,6 +106,12 @@
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-bloom</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-store-sdk</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b86ff926/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7d420ac..9e629f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -483,6 +483,7 @@
<module>streaming</module>
<module>examples/spark2</module>
<module>datamap/lucene</module>
+ <module>datamap/bloom</module>
</modules>
<build>
<plugins>
@@ -538,6 +539,7 @@
<module>streaming</module>
<module>examples/spark2</module>
<module>datamap/lucene</module>
+ <module>datamap/bloom</module>
</modules>
<build>
<plugins>