You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/06/15 10:07:59 UTC
[iotdb] branch master updated: [IOTDB-1415] Fix OOM caused by
ChunkCache (#3308)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 42448cb [IOTDB-1415] Fix OOM caused by ChunkCache (#3308)
42448cb is described below
commit 42448cb80d1f949bc9dd7f642441c5bcabcb8e93
Author: Jackie Tien <Ja...@foxmail.com>
AuthorDate: Tue Jun 15 18:07:23 2021 +0800
[IOTDB-1415] Fix OOM caused by ChunkCache (#3308)
Co-authored-by: Haonan <hh...@outlook.com>
---
LICENSE-binary | 2 +-
server/pom.xml | 5 +
.../db/engine/cache/CacheHitRatioMonitor.java | 16 +-
.../engine/cache/CacheHitRatioMonitorMXBean.java | 8 +-
.../apache/iotdb/db/engine/cache/ChunkCache.java | 171 +++++-----
.../iotdb/db/engine/cache/LRULinkedHashMap.java | 138 --------
.../db/engine/cache/TimeSeriesMetadataCache.java | 358 ++++++++++-----------
.../db/query/reader/chunk/DiskChunkLoader.java | 9 +-
.../chunk/metadata/DiskChunkMetadataLoader.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 9 -
10 files changed, 261 insertions(+), 457 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index faa2b38..ce5c2aa 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -230,6 +230,7 @@ com.fasterxml.jackson.core:jackson-databind:2.10.0
javax.inject:javax.inject:1
net.jpountz.lz4:1.3.0
com.github.stephenc.jcip:jcip-annotations:1.0-1
+com.github.ben-manes.caffeine:caffeine:2.9.1
org.eclipse.jetty:jetty-http:9.4.24.v20191120
org.eclipse.jetty:jetty-io:9.4.24.v20191120
org.eclipse.jetty:jetty-security:9.4.24.v20191120
@@ -270,7 +271,6 @@ org.xerial.snappy:snappy-java:1.1.8.4
io.airlift.airline:0.8
net.minidev:accessors-smart:1.2
-
BSD 2-Clause
------------
jline:jline:2.14.5
diff --git a/server/pom.xml b/server/pom.xml
index 9b57f1b..7949b02 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -200,6 +200,11 @@
<version>4.3.5</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.9.1</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java
index 51e9b1b..beda7fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitor.java
@@ -61,8 +61,8 @@ public class CacheHitRatioMonitor implements CacheHitRatioMonitorMXBean, IServic
}
@Override
- public long getChunkCacheUsedMemory() {
- return ChunkCache.getInstance().getUsedMemory();
+ public long getChunkEvictionCount() {
+ return ChunkCache.getInstance().getEvictionCount();
}
@Override
@@ -71,8 +71,8 @@ public class CacheHitRatioMonitor implements CacheHitRatioMonitorMXBean, IServic
}
@Override
- public double getChunkCacheUsedMemoryProportion() {
- return ChunkCache.getInstance().getUsedMemoryProportion();
+ public double getChunkCacheAverageLoadPenalty() {
+ return ChunkCache.getInstance().getAverageLoadPenalty();
}
@Override
@@ -86,8 +86,8 @@ public class CacheHitRatioMonitor implements CacheHitRatioMonitorMXBean, IServic
}
@Override
- public long getTimeSeriesMetadataCacheUsedMemory() {
- return TimeSeriesMetadataCache.getInstance().getUsedMemory();
+ public long getTimeSeriesMetadataCacheEvictionCount() {
+ return TimeSeriesMetadataCache.getInstance().getEvictionCount();
}
@Override
@@ -96,8 +96,8 @@ public class CacheHitRatioMonitor implements CacheHitRatioMonitorMXBean, IServic
}
@Override
- public double getTimeSeriesCacheUsedMemoryProportion() {
- return TimeSeriesMetadataCache.getInstance().getUsedMemoryProportion();
+ public double getTimeSeriesCacheAverageLoadPenalty() {
+ return TimeSeriesMetadataCache.getInstance().getAverageLoadPenalty();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java
index 854fb1a..b1d48b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/CacheHitRatioMonitorMXBean.java
@@ -22,21 +22,21 @@ public interface CacheHitRatioMonitorMXBean {
double getChunkHitRatio();
- long getChunkCacheUsedMemory();
+ long getChunkEvictionCount();
long getChunkCacheMaxMemory();
- double getChunkCacheUsedMemoryProportion();
+ double getChunkCacheAverageLoadPenalty();
long getChunkCacheAverageSize();
double getTimeSeriesMetadataHitRatio();
- long getTimeSeriesMetadataCacheUsedMemory();
+ long getTimeSeriesMetadataCacheEvictionCount();
long getTimeSeriesMetadataCacheMaxMemory();
- double getTimeSeriesCacheUsedMemoryProportion();
+ double getTimeSeriesCacheAverageLoadPenalty();
long getTimeSeriesMetaDataCacheAverageSize();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
index 6a126cf..12cd0df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -28,13 +28,15 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class is used to cache <code>Chunk</code> of <code>ChunkMetaData</code> in IoTDB. The
@@ -49,44 +51,69 @@ public class ChunkCache {
config.getAllocateMemoryForChunkCache();
private static final boolean CACHE_ENABLE = config.isMetaDataCacheEnable();
- private final LRULinkedHashMap<ChunkMetadata, Chunk> lruCache;
+ private final LoadingCache<ChunkMetadata, Chunk> lruCache;
- private final AtomicLong cacheHitNum = new AtomicLong();
- private final AtomicLong cacheRequestNum = new AtomicLong();
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final AtomicLong entryAverageSize = new AtomicLong(0);
private ChunkCache() {
if (CACHE_ENABLE) {
logger.info("ChunkCache size = " + MEMORY_THRESHOLD_IN_CHUNK_CACHE);
}
lruCache =
- new LRULinkedHashMap<ChunkMetadata, Chunk>(MEMORY_THRESHOLD_IN_CHUNK_CACHE) {
-
- /**
- * The calculation is time consuming, so we won't calculate each entry' size each time.
- * Every 100,000 entry, we will calculate the average size of the first 10 entries, and
- * use that to represent the next 99,990 entries' size.
- */
- @Override
- protected long calEntrySize(ChunkMetadata key, Chunk value) {
- long currentSize;
- if (count < 10) {
- currentSize =
- RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.sizeOf(value);
- averageSize = ((averageSize * count) + currentSize) / (++count);
- } else if (count < 100000) {
- count++;
- currentSize = averageSize;
- } else {
- averageSize =
- RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.sizeOf(value);
- count = 1;
- currentSize = averageSize;
- }
- return currentSize;
- }
- };
+ Caffeine.newBuilder()
+ .maximumWeight(MEMORY_THRESHOLD_IN_CHUNK_CACHE)
+ .weigher(
+ new Weigher<ChunkMetadata, Chunk>() {
+
+ int count = 0;
+ int averageSize = 0;
+
+ /**
+ * The calculation is time consuming, so we won't calculate each entry' size each
+ * time. Every 100,000 entry, we will calculate the average size of the first 10
+ * entries, and use that to represent the next 99,990 entries' size.
+ */
+ @Override
+ public int weigh(ChunkMetadata chunkMetadata, Chunk chunk) {
+ int currentSize;
+ if (count < 10) {
+ currentSize =
+ (int)
+ (RamUsageEstimator.NUM_BYTES_OBJECT_REF
+ + RamUsageEstimator.sizeOf(chunk));
+ averageSize = ((averageSize * count) + currentSize) / (++count);
+ entryAverageSize.set(averageSize);
+ } else if (count < 100000) {
+ count++;
+ currentSize = averageSize;
+ } else {
+ averageSize =
+ (int)
+ (RamUsageEstimator.NUM_BYTES_OBJECT_REF
+ + RamUsageEstimator.sizeOf(chunk));
+ count = 1;
+ currentSize = averageSize;
+ entryAverageSize.set(averageSize);
+ }
+ return currentSize;
+ }
+ })
+ .recordStats()
+ .build(
+ new CacheLoader<ChunkMetadata, Chunk>() {
+ @Override
+ public Chunk load(ChunkMetadata chunkMetadata) throws Exception {
+ try {
+ TsFileSequenceReader reader =
+ FileReaderManager.getInstance()
+ .get(chunkMetadata.getFilePath(), chunkMetadata.isClosed());
+ return reader.readMemChunk(chunkMetadata);
+ } catch (IOException e) {
+ logger.error("Something wrong happened in reading {}", chunkMetadata, e);
+ throw e;
+ }
+ }
+ });
}
public static ChunkCache getInstance() {
@@ -110,42 +137,12 @@ public class ChunkCache {
chunkMetaData.getStatistics());
}
- cacheRequestNum.incrementAndGet();
-
- Chunk chunk;
- lock.readLock().lock();
- try {
- chunk = lruCache.get(chunkMetaData);
- } finally {
- lock.readLock().unlock();
- }
- if (chunk != null) {
- cacheHitNum.incrementAndGet();
- printCacheLog(true);
- } else {
- printCacheLog(false);
- TsFileSequenceReader reader =
- FileReaderManager.getInstance()
- .get(chunkMetaData.getFilePath(), chunkMetaData.isClosed());
- try {
- chunk = reader.readMemChunk(chunkMetaData);
- } catch (IOException e) {
- logger.error("something wrong happened while reading {}", reader.getFileName());
- throw e;
- }
- lock.writeLock().lock();
- try {
- if (!lruCache.containsKey(chunkMetaData)) {
- lruCache.put(chunkMetaData, chunk);
- }
- } finally {
- lock.writeLock().unlock();
- }
- }
+ Chunk chunk = lruCache.get(chunkMetaData);
if (debug) {
DEBUG_LOGGER.info("get chunk from cache whose meta data is: " + chunkMetaData);
}
+
return new Chunk(
chunk.getHeader(),
chunk.getData().duplicate(),
@@ -153,61 +150,39 @@ public class ChunkCache {
chunkMetaData.getStatistics());
}
- private void printCacheLog(boolean isHit) {
- if (!logger.isDebugEnabled()) {
- return;
- }
- logger.debug(
- "[ChunkMetaData cache {}hit] The number of requests for cache is {}, hit rate is {}.",
- isHit ? "" : "didn't ",
- cacheRequestNum.get(),
- cacheHitNum.get() * 1.0 / cacheRequestNum.get());
- }
-
public double calculateChunkHitRatio() {
- if (cacheRequestNum.get() != 0) {
- return cacheHitNum.get() * 1.0 / cacheRequestNum.get();
- } else {
- return 0;
- }
+ return lruCache.stats().hitRate();
}
- public long getUsedMemory() {
- return lruCache.getUsedMemory();
+ public long getEvictionCount() {
+ return lruCache.stats().evictionCount();
}
public long getMaxMemory() {
- return lruCache.getMaxMemory();
+ return MEMORY_THRESHOLD_IN_CHUNK_CACHE;
}
- public double getUsedMemoryProportion() {
- return lruCache.getUsedMemoryProportion();
+ public double getAverageLoadPenalty() {
+ return lruCache.stats().averageLoadPenalty();
}
public long getAverageSize() {
- return lruCache.getAverageSize();
+ return entryAverageSize.get();
}
/** clear LRUCache. */
public void clear() {
- lock.writeLock().lock();
- if (lruCache != null) {
- lruCache.clear();
- }
- lock.writeLock().unlock();
+ lruCache.invalidateAll();
+ lruCache.cleanUp();
}
public void remove(ChunkMetadata chunkMetaData) {
- lock.writeLock().lock();
- if (chunkMetaData != null) {
- lruCache.remove(chunkMetaData);
- }
- lock.writeLock().unlock();
+ lruCache.invalidate(chunkMetaData);
}
@TestOnly
public boolean isEmpty() {
- return lruCache.isEmpty();
+ return lruCache.asMap().isEmpty();
}
/** singleton pattern. */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/LRULinkedHashMap.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/LRULinkedHashMap.java
deleted file mode 100644
index 6246574..0000000
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/LRULinkedHashMap.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.cache;
-
-import org.apache.iotdb.tsfile.common.cache.Accountable;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/** This class is an LRU cache. <b>Note: It's not thread safe.</b> */
-public abstract class LRULinkedHashMap<K extends Accountable, V> {
-
- private static final float LOAD_FACTOR_MAP = 0.75f;
- private static final int INITIAL_CAPACITY = 128;
- private static final float RETAIN_PERCENT = 0.9f;
- private static final int MAP_ENTRY_SIZE = 40;
-
- private final LinkedHashMap<K, V> linkedHashMap;
-
- /** maximum memory threshold. */
- private final long maxMemory;
- /** current used memory. */
- private long usedMemory;
-
- /** memory size we need to retain while the cache is full */
- private final long retainMemory;
-
- protected int count = 0;
- protected long averageSize = 0;
-
- public LRULinkedHashMap(long maxMemory) {
- this.linkedHashMap = new LinkedHashMap<>(INITIAL_CAPACITY, LOAD_FACTOR_MAP, true);
- this.maxMemory = maxMemory;
- this.retainMemory = (long) (maxMemory * RETAIN_PERCENT);
- }
-
- public V put(K key, V value) {
- long size = calEntrySize(key, value) + MAP_ENTRY_SIZE;
- key.setRamSize(size);
- usedMemory += size;
- V v = linkedHashMap.put(key, value);
- if (usedMemory > maxMemory) {
- Iterator<Entry<K, V>> iterator = linkedHashMap.entrySet().iterator();
- while (usedMemory > retainMemory && iterator.hasNext()) {
- Entry<K, V> entry = iterator.next();
- usedMemory -= entry.getKey().getRamSize();
- iterator.remove();
- }
- }
- return v;
- }
-
- public V get(K key) {
- return linkedHashMap.get(key);
- }
-
- public boolean containsKey(K key) {
- return linkedHashMap.containsKey(key);
- }
-
- public void clear() {
- linkedHashMap.clear();
- usedMemory = 0;
- count = 0;
- averageSize = 0;
- }
-
- public V remove(K key) {
- V v = linkedHashMap.remove(key);
- if (v != null && key != null) {
- usedMemory -= key.getRamSize();
- }
- return v;
- }
-
- /** approximately estimate the additional size of key and value. */
- protected abstract long calEntrySize(K key, V value);
-
- /** calculate the proportion of used memory. */
- public double getUsedMemoryProportion() {
- return usedMemory * 1.0 / maxMemory;
- }
-
- public long getUsedMemory() {
- return usedMemory;
- }
-
- public long getMaxMemory() {
- return maxMemory;
- }
-
- public long getAverageSize() {
- return averageSize;
- }
-
- public Set<Entry<K, V>> entrySet() {
- return linkedHashMap.entrySet();
- }
-
- public boolean isEmpty() {
- return linkedHashMap.isEmpty();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- return super.equals(o);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index 889ba9e..fdcea1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.tsfile.common.cache.Accountable;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -32,6 +31,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,14 +42,13 @@ import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching strategy is
@@ -61,12 +63,9 @@ public class TimeSeriesMetadataCache {
config.getAllocateMemoryForTimeSeriesMetaDataCache();
private static final boolean CACHE_ENABLE = config.isMetaDataCacheEnable();
- private final LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache;
+ private final LoadingCache<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache;
- private final AtomicLong cacheHitNum = new AtomicLong();
- private final AtomicLong cacheRequestNum = new AtomicLong();
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final AtomicLong entryAverageSize = new AtomicLong(0);
private final Map<String, WeakReference<String>> devices =
Collections.synchronizedMap(new WeakHashMap<>());
@@ -78,51 +77,80 @@ public class TimeSeriesMetadataCache {
"TimeseriesMetadataCache size = " + MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE);
}
lruCache =
- new LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata>(
- MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE) {
-
- /**
- * The calculation is time consuming, so we won't calculate each entry' size each time.
- * Every 100,000 entry, we will calculate the average size of the first 10 entries, and
- * use that to represent the next 99,990 entries' size.
- */
- @Override
- protected long calEntrySize(TimeSeriesMetadataCacheKey key, TimeseriesMetadata value) {
- long currentSize;
- if (count < 10) {
- currentSize =
- RamUsageEstimator.shallowSizeOf(key)
- + RamUsageEstimator.sizeOf(key.device)
- + RamUsageEstimator.sizeOf(key.measurement)
- + RamUsageEstimator.shallowSizeOf(value)
- + RamUsageEstimator.sizeOf(value.getMeasurementId())
- + RamUsageEstimator.shallowSizeOf(value.getStatistics())
- + (((ChunkMetadata) value.getChunkMetadataList().get(0)).calculateRamSize()
- + RamUsageEstimator.NUM_BYTES_OBJECT_REF)
- * value.getChunkMetadataList().size()
- + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList());
- averageSize = ((averageSize * count) + currentSize) / (++count);
- } else if (count < 100000) {
- count++;
- currentSize = averageSize;
- } else {
- averageSize =
- RamUsageEstimator.shallowSizeOf(key)
- + RamUsageEstimator.sizeOf(key.device)
- + RamUsageEstimator.sizeOf(key.measurement)
- + RamUsageEstimator.shallowSizeOf(value)
- + RamUsageEstimator.sizeOf(value.getMeasurementId())
- + RamUsageEstimator.shallowSizeOf(value.getStatistics())
- + (((ChunkMetadata) value.getChunkMetadataList().get(0)).calculateRamSize()
- + RamUsageEstimator.NUM_BYTES_OBJECT_REF)
- * value.getChunkMetadataList().size()
- + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList());
- count = 1;
- currentSize = averageSize;
- }
- return currentSize;
- }
- };
+ Caffeine.newBuilder()
+ .maximumWeight(MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE)
+ .weigher(
+ new Weigher<TimeSeriesMetadataCacheKey, TimeseriesMetadata>() {
+
+ int count = 0;
+ int averageSize = 0;
+
+ /**
+ * The calculation is time consuming, so we won't calculate each entry' size each
+ * time. Every 100,000 entry, we will calculate the average size of the first 10
+ * entries, and use that to represent the next 99,990 entries' size.
+ */
+ @Override
+ public int weigh(TimeSeriesMetadataCacheKey key, TimeseriesMetadata value) {
+ int currentSize;
+ if (count < 10) {
+ currentSize =
+ (int)
+ (RamUsageEstimator.shallowSizeOf(key)
+ + RamUsageEstimator.sizeOf(key.device)
+ + RamUsageEstimator.sizeOf(key.measurement)
+ + RamUsageEstimator.shallowSizeOf(value)
+ + RamUsageEstimator.sizeOf(value.getMeasurementId())
+ + RamUsageEstimator.shallowSizeOf(value.getStatistics())
+ + (((ChunkMetadata) value.getChunkMetadataList().get(0))
+ .calculateRamSize()
+ + RamUsageEstimator.NUM_BYTES_OBJECT_REF)
+ * value.getChunkMetadataList().size()
+ + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList()));
+ averageSize = ((averageSize * count) + currentSize) / (++count);
+ entryAverageSize.set(averageSize);
+ } else if (count < 100000) {
+ count++;
+ currentSize = averageSize;
+ } else {
+ averageSize =
+ (int)
+ (RamUsageEstimator.shallowSizeOf(key)
+ + RamUsageEstimator.sizeOf(key.device)
+ + RamUsageEstimator.sizeOf(key.measurement)
+ + RamUsageEstimator.shallowSizeOf(value)
+ + RamUsageEstimator.sizeOf(value.getMeasurementId())
+ + RamUsageEstimator.shallowSizeOf(value.getStatistics())
+ + (((ChunkMetadata) value.getChunkMetadataList().get(0))
+ .calculateRamSize()
+ + RamUsageEstimator.NUM_BYTES_OBJECT_REF)
+ * value.getChunkMetadataList().size()
+ + RamUsageEstimator.shallowSizeOf(value.getChunkMetadataList()));
+ count = 1;
+ currentSize = averageSize;
+ entryAverageSize.set(averageSize);
+ }
+ return currentSize;
+ }
+ })
+ .recordStats()
+ .build(
+ new CacheLoader<TimeSeriesMetadataCacheKey, TimeseriesMetadata>() {
+ @Override
+ public TimeseriesMetadata load(TimeSeriesMetadataCacheKey key) throws Exception {
+ // bloom filter part
+ TsFileSequenceReader reader =
+ FileReaderManager.getInstance().get(key.filePath, true);
+ BloomFilter bloomFilter = reader.readBloomFilter();
+ if (bloomFilter != null
+ && !bloomFilter.contains(
+ key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) {
+ return null;
+ }
+ return reader.readTimeseriesMetadata(
+ new Path(key.device, key.measurement), false);
+ }
+ });
}
public static TimeSeriesMetadataCache getInstance() {
@@ -148,20 +176,9 @@ public class TimeSeriesMetadataCache {
return reader.readTimeseriesMetadata(new Path(key.device, key.measurement), false);
}
- cacheRequestNum.incrementAndGet();
-
- TimeseriesMetadata timeseriesMetadata;
- lock.readLock().lock();
- try {
- timeseriesMetadata = lruCache.get(key);
- } finally {
- lock.readLock().unlock();
- }
+ TimeseriesMetadata timeseriesMetadata = lruCache.getIfPresent(key);
- if (timeseriesMetadata != null) {
- cacheHitNum.incrementAndGet();
- printCacheLog(true);
- } else {
+ if (timeseriesMetadata == null) {
if (debug) {
DEBUG_LOGGER.info(
"Cache miss: {}.{} in file: {}", key.device, key.measurement, key.filePath);
@@ -171,16 +188,8 @@ public class TimeSeriesMetadataCache {
synchronized (
devices.computeIfAbsent(key.device + SEPARATOR + key.filePath, WeakReference::new)) {
// double check
- lock.readLock().lock();
- try {
- timeseriesMetadata = lruCache.get(key);
- } finally {
- lock.readLock().unlock();
- }
- if (timeseriesMetadata != null) {
- cacheHitNum.incrementAndGet();
- printCacheLog(true);
- } else {
+ timeseriesMetadata = lruCache.getIfPresent(key);
+ if (timeseriesMetadata == null) {
Path path = new Path(key.device, key.measurement);
// bloom filter part
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
@@ -191,24 +200,17 @@ public class TimeSeriesMetadataCache {
}
return null;
}
- printCacheLog(false);
List<TimeseriesMetadata> timeSeriesMetadataList =
reader.readTimeseriesMetadata(path, allSensors);
// put TimeSeriesMetadata of all sensors used in this query into cache
- lock.writeLock().lock();
- try {
- timeSeriesMetadataList.forEach(
- metadata -> {
- TimeSeriesMetadataCacheKey k =
- new TimeSeriesMetadataCacheKey(
- key.filePath, key.device, metadata.getMeasurementId());
- if (!lruCache.containsKey(k)) {
- lruCache.put(k, metadata);
- }
- });
- timeseriesMetadata = lruCache.get(key);
- } finally {
- lock.writeLock().unlock();
+ for (TimeseriesMetadata metadata : timeSeriesMetadataList) {
+ TimeSeriesMetadataCacheKey k =
+ new TimeSeriesMetadataCacheKey(
+ key.filePath, key.device, metadata.getMeasurementId());
+ lruCache.put(k, metadata);
+ if (metadata.getMeasurementId().equals(key.measurement)) {
+ timeseriesMetadata = metadata;
+ }
}
}
}
@@ -261,16 +263,11 @@ public class TimeSeriesMetadataCache {
return reader.readTimeseriesMetadata(new Path(key.device, key.measurement), subSensorList);
}
- cacheRequestNum.incrementAndGet();
-
List<TimeseriesMetadata> res = new ArrayList<>();
getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
- if (!res.isEmpty()) {
- cacheHitNum.incrementAndGet();
- printCacheLog(true);
- } else {
+ if (res.isEmpty()) {
if (debug) {
DEBUG_LOGGER.info(
"Cache miss: {}.{} in file: {}", key.device, key.measurement, key.filePath);
@@ -281,10 +278,7 @@ public class TimeSeriesMetadataCache {
devices.computeIfAbsent(key.device + SEPARATOR + key.filePath, WeakReference::new)) {
// double check
getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
- if (!res.isEmpty()) {
- cacheHitNum.incrementAndGet();
- printCacheLog(true);
- } else {
+ if (res.isEmpty()) {
Path path = new Path(key.device, key.measurement);
// bloom filter part
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
@@ -295,31 +289,25 @@ public class TimeSeriesMetadataCache {
}
return Collections.emptyList();
}
- printCacheLog(false);
List<TimeseriesMetadata> timeSeriesMetadataList =
reader.readTimeseriesMetadata(path, allSensors);
+ Map<TimeSeriesMetadataCacheKey, TimeseriesMetadata> map = new HashMap<>();
// put TimeSeriesMetadata of all sensors used in this query into cache
- lock.writeLock().lock();
- try {
- timeSeriesMetadataList.forEach(
- metadata -> {
- // for root.sg1.d1.vector1.s1, key.device of vector will only return root.sg1.d1
- // metadata.getMeasurementId() will return s1, the vector1 is saved in
- // key.measurement
- // so we should concat them to get the deviceId for root.sg1.d1.vector1.s1
- TimeSeriesMetadataCacheKey k =
- new TimeSeriesMetadataCacheKey(
- key.filePath,
- key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement,
- metadata.getMeasurementId());
- if (!lruCache.containsKey(k)) {
- lruCache.put(k, metadata);
- }
- });
- getVectorTimeSeriesMetadataListFromCache(key, subSensorList, res);
- } finally {
- lock.writeLock().unlock();
- }
+ timeSeriesMetadataList.forEach(
+ metadata -> {
+ // for root.sg1.d1.vector1.s1, key.device of vector will only return root.sg1.d1
+ // metadata.getMeasurementId() will return s1, the vector1 is saved in
+ // key.measurement
+ // so we should concat them to get the deviceId for root.sg1.d1.vector1.s1
+ TimeSeriesMetadataCacheKey k =
+ new TimeSeriesMetadataCacheKey(
+ key.filePath, key.device, metadata.getMeasurementId());
+ lruCache.put(k, metadata);
+ map.put(k, metadata);
+ });
+ // The reason we don't get from cache is in case that
+ // the cache capacity is too small to contains all the sub sensors of this vector
+ getVectorTimeSeriesMetadataListFromMap(key, subSensorList, res, map);
}
}
}
@@ -354,103 +342,97 @@ public class TimeSeriesMetadataCache {
* TimeSeriesMetadataCacheKey for vector1.s2 should be {filePath: ""./data/data/seq/.......,
* device: root.sg1.d1.vector1, measurement: s2}
*/
- private void getVectorTimeSeriesMetadataListFromCache(
- TimeSeriesMetadataCacheKey key, List<String> subSensorList, List<TimeseriesMetadata> res) {
- lock.readLock().lock();
- try {
- TimeseriesMetadata timeseriesMetadata =
- lruCache.get(
- new TimeSeriesMetadataCacheKey(
- key.filePath,
- key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement,
- key.measurement));
- if (timeseriesMetadata != null) {
- res.add(timeseriesMetadata);
- for (String subSensor : subSensorList) {
- timeseriesMetadata =
- lruCache.get(
- new TimeSeriesMetadataCacheKey(
- key.filePath,
- key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement,
- subSensor));
- if (timeseriesMetadata != null) {
- res.add(timeseriesMetadata);
- } else {
- res.clear();
- break;
- }
+ private void getVectorTimeSeriesMetadataListFromMap(
+ TimeSeriesMetadataCacheKey key,
+ List<String> subSensorList,
+ List<TimeseriesMetadata> res,
+ Map<TimeSeriesMetadataCacheKey, TimeseriesMetadata> map) {
+ TimeseriesMetadata timeseriesMetadata = map.get(key);
+ if (timeseriesMetadata != null) {
+ res.add(timeseriesMetadata);
+ for (String subSensor : subSensorList) {
+ timeseriesMetadata =
+ map.get(new TimeSeriesMetadataCacheKey(key.filePath, key.device, subSensor));
+ if (timeseriesMetadata != null) {
+ res.add(timeseriesMetadata);
+ } else {
+ res.clear();
+ break;
}
}
- } finally {
- lock.readLock().unlock();
}
}
- private void printCacheLog(boolean isHit) {
- if (!logger.isDebugEnabled()) {
- return;
+ /**
+ * !!!Attention!!!
+ *
+ * <p>For a vector, e.g. root.sg1.d1.vector1(s1, s2) TimeSeriesMetadataCacheKey for vector1 should
+ * be {filePath: ""./data/data/seq/......., device: root.sg1.d1.vector1, measurement: vector1},
+ * vector1 will be in both device and measurement TimeSeriesMetadataCacheKey for vector1.s1 should
+ * be {filePath: ""./data/data/seq/......., device: root.sg1.d1.vector1, measurement: s1}
+ * TimeSeriesMetadataCacheKey for vector1.s2 should be {filePath: ""./data/data/seq/.......,
+ * device: root.sg1.d1.vector1, measurement: s2}
+ */
+ private void getVectorTimeSeriesMetadataListFromCache(
+ TimeSeriesMetadataCacheKey key, List<String> subSensorList, List<TimeseriesMetadata> res) {
+ TimeseriesMetadata timeseriesMetadata = lruCache.getIfPresent(key);
+ if (timeseriesMetadata != null) {
+ res.add(timeseriesMetadata);
+ for (String subSensor : subSensorList) {
+ timeseriesMetadata =
+ lruCache.getIfPresent(
+ new TimeSeriesMetadataCacheKey(key.filePath, key.device, subSensor));
+ if (timeseriesMetadata != null) {
+ res.add(timeseriesMetadata);
+ } else {
+ res.clear();
+ break;
+ }
+ }
}
- logger.debug(
- "[TimeSeriesMetadata cache {}hit] The number of requests for cache is {}, hit rate is {}.",
- isHit ? "" : "didn't ",
- cacheRequestNum.get(),
- cacheHitNum.get() * 1.0 / cacheRequestNum.get());
}
public double calculateTimeSeriesMetadataHitRatio() {
- if (cacheRequestNum.get() != 0) {
- return cacheHitNum.get() * 1.0 / cacheRequestNum.get();
- } else {
- return 0;
- }
+ return lruCache.stats().hitRate();
}
- public long getUsedMemory() {
- return lruCache.getUsedMemory();
+ public long getEvictionCount() {
+ return lruCache.stats().evictionCount();
}
public long getMaxMemory() {
- return lruCache.getMaxMemory();
+ return MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE;
}
- public double getUsedMemoryProportion() {
- return lruCache.getUsedMemoryProportion();
+ public double getAverageLoadPenalty() {
+ return lruCache.stats().averageLoadPenalty();
}
public long getAverageSize() {
- return lruCache.getAverageSize();
+ return entryAverageSize.get();
}
/** clear LRUCache. */
public void clear() {
- lock.writeLock().lock();
- if (lruCache != null) {
- lruCache.clear();
- }
- lock.writeLock().unlock();
+ lruCache.invalidateAll();
+ lruCache.cleanUp();
}
public void remove(TimeSeriesMetadataCacheKey key) {
- lock.writeLock().lock();
- if (key != null) {
- lruCache.remove(key);
- }
- lock.writeLock().unlock();
+ lruCache.invalidate(key);
}
@TestOnly
public boolean isEmpty() {
- return lruCache.isEmpty();
+ return lruCache.asMap().isEmpty();
}
- public static class TimeSeriesMetadataCacheKey implements Accountable {
+ public static class TimeSeriesMetadataCacheKey {
private final String filePath;
private final String device;
private final String measurement;
- private long ramSize;
-
public TimeSeriesMetadataCacheKey(String filePath, String device, String measurement) {
this.filePath = filePath;
this.device = device;
@@ -475,16 +457,6 @@ public class TimeSeriesMetadataCache {
public int hashCode() {
return Objects.hash(filePath, device, measurement);
}
-
- @Override
- public void setRamSize(long size) {
- this.ramSize = size;
- }
-
- @Override
- public long getRamSize() {
- return ramSize;
- }
}
/** singleton pattern. */
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
index 36619e2..15ace5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.query.reader.chunk;
import org.apache.iotdb.db.engine.cache.ChunkCache;
-import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
@@ -30,15 +29,15 @@ import java.io.IOException;
/** To read one chunk from disk, and only used in iotdb server module */
public class DiskChunkLoader implements IChunkLoader {
- private final QueryContext context;
+ private final boolean debug;
- public DiskChunkLoader(QueryContext context) {
- this.context = context;
+ public DiskChunkLoader(boolean debug) {
+ this.debug = debug;
}
@Override
public Chunk loadChunk(ChunkMetadata chunkMetaData) throws IOException {
- return ChunkCache.getInstance().get(chunkMetaData, context.isDebug());
+ return ChunkCache.getInstance().get(chunkMetaData, debug);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 63c8c5c..32293ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -120,7 +120,7 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader {
if (chunkMetadata.needSetChunkLoader()) {
chunkMetadata.setFilePath(resource.getTsFilePath());
chunkMetadata.setClosed(resource.isClosed());
- chunkMetadata.setChunkLoader(new DiskChunkLoader(context));
+ chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug()));
}
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 88e13ac..6a2c8e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.engine.cache.ChunkCache;
-import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -915,13 +913,6 @@ public class TSServiceImpl implements TSIService.Iface {
if (costTime >= config.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement);
}
- if (plan.isDebug()) {
- SLOW_SQL_LOGGER.info(
- "ChunkCache used memory proportion: {}\n"
- + "TimeSeriesMetadataCache used memory proportion: {}",
- ChunkCache.getInstance().getUsedMemoryProportion(),
- TimeSeriesMetadataCache.getInstance().getUsedMemoryProportion());
- }
}
}