You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/11/15 12:24:02 UTC
[iotdb] branch master updated: [IOTDB-1950] Add Bloom Filter cache for query (#4350)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5a7abfb [IOTDB-1950] Add Bloom Filter cache for query (#4350)
5a7abfb is described below
commit 5a7abfb2c3e52b49bddad535ddb68cc1ea9e92b2
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Mon Nov 15 20:23:37 2021 +0800
[IOTDB-1950] Add Bloom Filter cache for query (#4350)
---
.../resources/conf/iotdb-engine.properties | 8 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 17 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +-
.../iotdb/db/engine/cache/BloomFilterCache.java | 188 +++++++++++++++++++
.../db/engine/cache/CacheHitRatioMonitor.java | 25 +++
.../engine/cache/CacheHitRatioMonitorMXBean.java | 10 +
.../db/engine/cache/TimeSeriesMetadataCache.java | 8 +-
.../db/engine/cache/BloomFilterCacheTest.java | 208 +++++++++++++++++++++
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +
.../apache/iotdb/spark/db/EnvironmentUtils.java | 2 +
.../org/apache/iotdb/tsfile/utils/BloomFilter.java | 39 ++++
11 files changed, 503 insertions(+), 12 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ab779be..3e2da93 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -534,12 +534,12 @@ timestamp_precision=ms
### Metadata Cache Configuration
####################
-# whether to cache meta data(ChunkMetadata and TimeSeriesMetadata) or not.
+# whether to cache meta data(BloomFilter, ChunkMetadata and TimeSeriesMetadata) or not.
# Datatype: boolean
# meta_data_cache_enable=true
-# Read memory Allocation Ratio: ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
-# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 1:2:3:4
-# chunk_timeseriesmeta_free_memory_proportion=1:2:3:4
+# Read memory Allocation Ratio: BloomFilterCache, ChunkCache, TimeSeriesMetadataCache, memory used for constructing QueryDataSet and Free Memory Used in Query.
+# The parameter form is a:b:c:d:e, where a, b, c, d and e are integers. for example: 1:1:1:1:1 , 1:100:200:300:400
+# chunk_timeseriesmeta_free_memory_proportion=1:100:200:300:400
# cache size for MManager.
# This cache is used to improve insert speed where all path check and TSDataType will be cached in MManager with corresponding Path.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 84c9cfe..4e53f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -119,7 +119,7 @@ public class IoTDBConfig {
private long allocateMemoryForSchema = Runtime.getRuntime().maxMemory() * 1 / 10;
/** Memory allocated for the read process besides cache */
- private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 3 / 10;
+ private long allocateMemoryForReadWithoutCache = allocateMemoryForRead * 300 / 1001;
private volatile int maxQueryDeduplicatedPathNum = 1000;
@@ -408,11 +408,14 @@ public class IoTDBConfig {
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
+ /** Memory allocated for bloomFilter cache in read process */
+ private long allocateMemoryForBloomFilterCache = allocateMemoryForRead / 1001;
+
/** Memory allocated for timeSeriesMetaData cache in read process */
- private long allocateMemoryForTimeSeriesMetaDataCache = allocateMemoryForRead / 5;
+ private long allocateMemoryForTimeSeriesMetaDataCache = allocateMemoryForRead * 200 / 1001;
/** Memory allocated for chunk cache in read process */
- private long allocateMemoryForChunkCache = allocateMemoryForRead / 10;
+ private long allocateMemoryForChunkCache = allocateMemoryForRead * 100 / 1001;
/** Whether to enable Last cache */
private boolean lastCacheEnable = true;
@@ -1722,6 +1725,14 @@ public class IoTDBConfig {
this.metaDataCacheEnable = metaDataCacheEnable;
}
+ public long getAllocateMemoryForBloomFilterCache() {
+ return allocateMemoryForBloomFilterCache;
+ }
+
+ public void setAllocateMemoryForBloomFilterCache(long allocateMemoryForBloomFilterCache) {
+ this.allocateMemoryForBloomFilterCache = allocateMemoryForBloomFilterCache;
+ }
+
public long getAllocateMemoryForTimeSeriesMetaDataCache() {
return allocateMemoryForTimeSeriesMetaDataCache;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index fbd9c45..1d0ae09 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1266,12 +1266,14 @@ public class IoTDBDescriptor {
long maxMemoryAvailable = conf.getAllocateMemoryForRead();
if (proportionSum != 0) {
try {
- conf.setAllocateMemoryForChunkCache(
+ conf.setAllocateMemoryForBloomFilterCache(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
- conf.setAllocateMemoryForTimeSeriesMetaDataCache(
+ conf.setAllocateMemoryForChunkCache(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
- conf.setAllocateMemoryForReadWithoutCache(
+ conf.setAllocateMemoryForTimeSeriesMetaDataCache(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
+ conf.setAllocateMemoryForReadWithoutCache(
+ maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum);
} catch (Exception e) {
throw new RuntimeException(
"Each subsection of configuration item chunkmeta_chunk_timeseriesmeta_free_memory_proportion"
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java
new file mode 100644
index 0000000..7b65477
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/BloomFilterCache.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.cache;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** This class is used to cache <code>BloomFilter</code> in IoTDB. The caching strategy is LRU. */
+public class BloomFilterCache {
+
+ private static final Logger logger = LoggerFactory.getLogger(BloomFilterCache.class);
+ private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final long MEMORY_THRESHOLD_IN_BLOOM_FILTER_CACHE =
+ config.getAllocateMemoryForBloomFilterCache();
+ private static final boolean CACHE_ENABLE = config.isMetaDataCacheEnable();
+ private final AtomicLong entryAverageSize = new AtomicLong(0);
+
+ private final LoadingCache<BloomFilterCacheKey, BloomFilter> lruCache;
+
+ private BloomFilterCache() {
+ if (CACHE_ENABLE) {
+ logger.info("BloomFilterCache size = " + MEMORY_THRESHOLD_IN_BLOOM_FILTER_CACHE);
+ }
+ lruCache =
+ Caffeine.newBuilder()
+ .maximumWeight(MEMORY_THRESHOLD_IN_BLOOM_FILTER_CACHE)
+ .weigher(
+ (Weigher<BloomFilterCacheKey, BloomFilter>)
+ (key, bloomFilter) ->
+ (int)
+ (RamUsageEstimator.shallowSizeOf(key)
+ + RamUsageEstimator.sizeOf(key.tsFilePrefixPath)
+ + RamUsageEstimator.sizeOf(bloomFilter)))
+ .recordStats()
+ .build(
+ key -> {
+ try {
+ TsFileSequenceReader reader =
+ FileReaderManager.getInstance().get(key.filePath, true);
+ return reader.readBloomFilter();
+ } catch (IOException e) {
+ logger.error(
+ "Something wrong happened in reading bloom filter in tsfile {}",
+ key.filePath,
+ e);
+ throw e;
+ }
+ });
+ }
+
+ public static BloomFilterCache getInstance() {
+ return BloomFilterCacheHolder.INSTANCE;
+ }
+
+ public BloomFilter get(BloomFilterCacheKey key) throws IOException {
+ return get(key, false);
+ }
+
+ public BloomFilter get(BloomFilterCacheKey key, boolean debug) throws IOException {
+ if (!CACHE_ENABLE) {
+ TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
+ return reader.readBloomFilter();
+ }
+
+ BloomFilter bloomFilter = lruCache.get(key);
+
+ if (debug) {
+ DEBUG_LOGGER.info("get bloomFilter from cache where filePath is: " + key.filePath);
+ }
+
+ return bloomFilter;
+ }
+
+ public double calculateChunkHitRatio() {
+ return lruCache.stats().hitRate();
+ }
+
+ public long getEvictionCount() {
+ return lruCache.stats().evictionCount();
+ }
+
+ public long getMaxMemory() {
+ return MEMORY_THRESHOLD_IN_BLOOM_FILTER_CACHE;
+ }
+
+ public double getAverageLoadPenalty() {
+ return lruCache.stats().averageLoadPenalty();
+ }
+
+ public long getAverageSize() {
+ return entryAverageSize.get();
+ }
+
+ /** clear LRUCache. */
+ public void clear() {
+ lruCache.invalidateAll();
+ lruCache.cleanUp();
+ }
+
+ @TestOnly
+ public void remove(BloomFilterCacheKey key) {
+ lruCache.invalidate(key);
+ }
+
+ @TestOnly
+ public BloomFilter getIfPresent(BloomFilterCacheKey key) {
+ return lruCache.getIfPresent(key);
+ }
+
+ public static class BloomFilterCacheKey {
+
+ // There is no need to add this field size while calculating the size of BloomFilterCacheKey,
+ // because filePath is get from TsFileResource, different BloomFilterCacheKey of the same file
+ // share this String.
+ private final String filePath;
+ private final String tsFilePrefixPath;
+ private final long tsFileVersion;
+ // high 32 bit is compaction level, low 32 bit is merge count
+ private final long compactionVersion;
+
+ public BloomFilterCacheKey(String filePath) {
+ this.filePath = filePath;
+ Pair<String, long[]> tsFilePrefixPathAndTsFileVersionPair =
+ FilePathUtils.getTsFilePrefixPathAndTsFileVersionPair(filePath);
+ this.tsFilePrefixPath = tsFilePrefixPathAndTsFileVersionPair.left;
+ this.tsFileVersion = tsFilePrefixPathAndTsFileVersionPair.right[0];
+ this.compactionVersion = tsFilePrefixPathAndTsFileVersionPair.right[1];
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BloomFilterCache.BloomFilterCacheKey that = (BloomFilterCache.BloomFilterCacheKey) o;
+ return tsFileVersion == that.tsFileVersion
+ && compactionVersion == that.compactionVersion
+ && tsFilePrefixPath.equals(that.tsFilePrefixPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tsFilePrefixPath, tsFileVersion, compactionVersion);
+ }
+ }
+
+ /** singleton pattern. */
+ private static class BloomFilterCacheHolder {
+ private static final BloomFilterCache INSTANCE = new BloomFilterCache();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java
index beda7fc..afe39ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java
@@ -105,6 +105,31 @@ public class CacheHitRatioMonitor implements CacheHitRatioMonitorMXBean, IServic
return TimeSeriesMetadataCache.getInstance().getAverageSize();
}
+ @Override
+ public double getBloomFilterHitRatio() {
+ return BloomFilterCache.getInstance().calculateChunkHitRatio();
+ }
+
+ @Override
+ public long getBloomFilterCacheEvictionCount() {
+ return BloomFilterCache.getInstance().getEvictionCount();
+ }
+
+ @Override
+ public long getBloomFilterCacheMaxMemory() {
+ return BloomFilterCache.getInstance().getMaxMemory();
+ }
+
+ @Override
+ public double getBloomFilterCacheAverageLoadPenalty() {
+ return BloomFilterCache.getInstance().getAverageLoadPenalty();
+ }
+
+ @Override
+ public long getBloomFilterCacheAverageSize() {
+ return BloomFilterCache.getInstance().getAverageSize();
+ }
+
public static CacheHitRatioMonitor getInstance() {
return instance;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java
index b1d48b7..7cc5969 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java
@@ -40,6 +40,16 @@ public interface CacheHitRatioMonitorMXBean {
long getTimeSeriesMetaDataCacheAverageSize();
+ double getBloomFilterHitRatio();
+
+ long getBloomFilterCacheEvictionCount();
+
+ long getBloomFilterCacheMaxMemory();
+
+ double getBloomFilterCacheAverageLoadPenalty();
+
+ long getBloomFilterCacheAverageSize();
+
long getTotalMemTableSize();
double getFlushThershold();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 81c14c8..a13fe93 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -145,7 +145,9 @@ public class TimeSeriesMetadataCache {
Path path = new Path(key.device, key.measurement);
// bloom filter part
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
- BloomFilter bloomFilter = reader.readBloomFilter();
+ BloomFilter bloomFilter =
+ BloomFilterCache.getInstance()
+ .get(new BloomFilterCache.BloomFilterCacheKey(key.filePath), debug);
if (bloomFilter != null && !bloomFilter.contains(path.getFullPath())) {
if (debug) {
DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key);
@@ -241,7 +243,9 @@ public class TimeSeriesMetadataCache {
Path path = new Path(key.device, key.measurement);
// bloom filter part
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
- BloomFilter bloomFilter = reader.readBloomFilter();
+ BloomFilter bloomFilter =
+ BloomFilterCache.getInstance()
+ .get(new BloomFilterCache.BloomFilterCacheKey(key.filePath), debug);
if (bloomFilter != null && !bloomFilter.contains(path.getFullPath())) {
if (debug) {
DEBUG_LOGGER.info("TimeSeries meta data {} is filter by bloomFilter!", key);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/BloomFilterCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/BloomFilterCacheTest.java
new file mode 100644
index 0000000..c7f5752
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/BloomFilterCacheTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.cache;
+
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BloomFilterCacheTest {
+ private List<String> pathList;
+ private int pathSize = 3;
+ private BloomFilterCache bloomFilterCache;
+
+ @Before
+ public void setUp() throws Exception {
+ // set up and create tsFile
+ pathList = new ArrayList<>();
+ for (int i = 0; i < pathSize; i++) {
+ String path =
+ "target"
+ .concat(File.separator)
+ .concat("data")
+ .concat(File.separator)
+ .concat("data")
+ .concat(File.separator)
+ .concat("sequence")
+ .concat(File.separator)
+ .concat("root.sg" + (i + 1))
+ .concat(File.separator)
+ .concat("0")
+ .concat(File.separator)
+ .concat("0")
+ .concat(File.separator)
+ .concat("1-0-0-0.tsfile");
+ String device = "d" + (i + 1);
+ pathList.add(path);
+ createTsFile(path, device);
+ }
+ bloomFilterCache = BloomFilterCache.getInstance();
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ // clear opened file streams
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ for (String filePath : pathList) {
+ FileUtils.forceDelete(new File(filePath));
+ }
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGet() {
+ try {
+ for (String filePath : pathList) {
+ BloomFilter bloomFilter =
+ bloomFilterCache.get(new BloomFilterCache.BloomFilterCacheKey(filePath));
+ TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, true);
+ BloomFilter bloomFilter1 = reader.readBloomFilter();
+ Assert.assertEquals(bloomFilter1, bloomFilter);
+ reader.close();
+ }
+ } catch (IOException e) {
+ Assert.fail();
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testRemove() {
+ try {
+ String path = pathList.get(0);
+ BloomFilterCache.BloomFilterCacheKey key = new BloomFilterCache.BloomFilterCacheKey(path);
+ BloomFilter bloomFilter = bloomFilterCache.get(key);
+ TsFileSequenceReader reader = FileReaderManager.getInstance().get(path, true);
+ BloomFilter bloomFilter1 = reader.readBloomFilter();
+ Assert.assertEquals(bloomFilter1, bloomFilter);
+ bloomFilterCache.remove(key);
+ bloomFilter = bloomFilterCache.getIfPresent(key);
+ Assert.assertNull(bloomFilter);
+ reader.close();
+ } catch (IOException e) {
+ Assert.fail();
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testClear() {
+ try {
+ for (String path : pathList) {
+ BloomFilterCache.BloomFilterCacheKey key = new BloomFilterCache.BloomFilterCacheKey(path);
+ BloomFilter bloomFilter = bloomFilterCache.get(key);
+ TsFileSequenceReader reader = FileReaderManager.getInstance().get(path, true);
+ BloomFilter bloomFilter1 = reader.readBloomFilter();
+ Assert.assertEquals(bloomFilter1, bloomFilter);
+ reader.close();
+ }
+ bloomFilterCache.clear();
+ for (String path : pathList) {
+ BloomFilterCache.BloomFilterCacheKey key = new BloomFilterCache.BloomFilterCacheKey(path);
+ BloomFilter bloomFilter = bloomFilterCache.getIfPresent(key);
+ Assert.assertNull(bloomFilter);
+ }
+ } catch (IOException e) {
+ Assert.fail();
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * construct tsFile for test
+ *
+ * @param path tsFile path
+ * @param device device in tsFile
+ */
+ private void createTsFile(String path, String device) throws Exception {
+ try {
+ File f = FSFactoryProducer.getFSFactory().getFile(path);
+ if (f.exists() && !f.delete()) {
+ throw new RuntimeException("can not delete " + f.getAbsolutePath());
+ }
+ Schema schema = new Schema();
+ String sensorPrefix = "sensor_";
+ // the number of rows to include in the tablet
+ int rowNum = 1000000;
+ // the number of values to include in the tablet
+ int sensorNum = 10;
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ // add measurements into file schema (all with INT64 data type)
+ for (int i = 0; i < sensorNum; i++) {
+ IMeasurementSchema measurementSchema =
+ new UnaryMeasurementSchema(
+ sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF);
+ measurementSchemas.add(measurementSchema);
+ schema.registerTimeseries(
+ new Path(device, sensorPrefix + (i + 1)),
+ new UnaryMeasurementSchema(
+ sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
+ }
+ // add measurements into TSFileWriter
+ try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
+ // construct the tablet
+ Tablet tablet = new Tablet(device, measurementSchemas);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ long timestamp = 1;
+ long value = 1000000L;
+ for (int r = 0; r < rowNum; r++, value++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = timestamp++;
+ for (int i = 0; i < sensorNum; i++) {
+ long[] sensor = (long[]) values[i];
+ sensor[row] = value;
+ }
+ // write Tablet to TsFile
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ // write Tablet to TsFile
+ if (tablet.rowSize != 0) {
+ tsFileWriter.write(tablet);
+ tablet.reset();
+ }
+ }
+ } catch (Exception e) {
+ throw new Exception("meet error in TsFileWrite with tablet", e);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index a8d10df..34784c2 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
@@ -142,6 +143,7 @@ public class EnvironmentUtils {
if (config.isMetaDataCacheEnable()) {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
+ BloomFilterCache.getInstance().clear();
}
// close metadata
IoTDB.metaManager.clear();
diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java
index afdfc99..99bc6b0 100644
--- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java
+++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/EnvironmentUtils.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -120,6 +121,7 @@ public class EnvironmentUtils {
if (config.isMetaDataCacheEnable()) {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
+ BloomFilterCache.getInstance().clear();
}
// close metadata
IoTDB.metaManager.clear();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BloomFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BloomFilter.java
index 74c54db..4117a5f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BloomFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BloomFilter.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.tsfile.utils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import java.util.Arrays;
import java.util.BitSet;
+import java.util.Objects;
public class BloomFilter {
@@ -127,6 +129,26 @@ public class BloomFilter {
return bits.toByteArray();
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BloomFilter that = (BloomFilter) o;
+ return size == that.size
+ && hashFunctionSize == that.hashFunctionSize
+ && Objects.equals(bits, that.bits)
+ && Arrays.equals(func, that.func);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(size, hashFunctionSize, bits, func);
+ }
+
private class HashFunction {
private int cap;
@@ -140,5 +162,22 @@ public class BloomFilter {
public int hash(String value) {
return Math.abs(Murmur128Hash.hash(value, seed)) % cap;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HashFunction that = (HashFunction) o;
+ return cap == that.cap && seed == that.seed;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cap, seed);
+ }
}
}