You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/08/04 14:13:58 UTC
svn commit: r1880569 - in /jackrabbit/oak/trunk: oak-run/
oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
oak-segment-...
Author: adulceanu
Date: Tue Aug 4 14:13:58 2020
New Revision: 1880569
URL: http://svn.apache.org/viewvc?rev=1880569&view=rev
Log:
OAK-7744 - Persistent cache for the Segment Node Store
Added cache metrics
Contribution by Miroslav Smiljanic
Added:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java
Modified:
jackrabbit/oak/trunk/oak-run/pom.xml
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
jackrabbit/oak/trunk/oak-segment-tar/pom.xml
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
Modified: jackrabbit/oak/trunk/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/pom.xml?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-run/pom.xml Tue Aug 4 14:13:58 2020
@@ -35,7 +35,7 @@
<groovy.version>2.4.17</groovy.version>
<!--
Size History:
- + 55 MB Add support for 3rd level cache (GRANITE-30735)
+ + 55 MB Add support for segment persistent cache (OAK-7744)
+ 54 MB AWS support for segment-tar (OAK-8827)
+ 52 MB AWS java sdk update (OAK-8875)
+ 51 MB AWS java sdk update (OAK-7536)
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java Tue Aug 4 14:13:58 2020
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
import static org.apache.jackrabbit.oak.segment.remote.persistentcache.Configuration.PID;
@@ -58,6 +75,12 @@ public @interface Configuration {
int redisCacheExpireSeconds() default DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
@AttributeDefinition(
+ name = "Redis cache db index",
+ description = "Redis cache db index (see Jedis#select(int))"
+ )
+ int redisDBIndex() default 1;
+
+ @AttributeDefinition(
name = "Redis socket timeout",
description = "Number of seconds to wait for response for request"
)
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java Tue Aug 4 14:13:58 2020
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * This {@code IOMonitor} implementations registers the following monitoring endpoints
+ * with the Metrics library if available:
+ * <ul>
+ * <li>{@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES}:
+ * a meter metrics for the number of bytes read from segment disk cache</li>
+ * <li>{@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES}:
+ * a meter metrics for the number of bytes written to segment disk cache</li>
+ * <li>{@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME}:
+ * a timer metrics for the time spent reading from segment disk cache</li>
+ * <li>{@link #OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME}:
+ * a timer metrics for the time spent writing to segment disk cache</li>
+ * </ul>
+ */
+public class DiskCacheIOMonitor extends IOMonitorAdapter {
+ public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES = "oak.segment.cache.disk.segment-read-bytes";
+ public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES = "oak.segment.cache.disk.segment-write-bytes";
+ public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME = "oak.segment.cache.disk.segment-read-time";
+ public static final String OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME = "oak.segment.cache.disk.segment-write-time";
+
+ private final MeterStats segmentReadBytes;
+ private final MeterStats segmentWriteBytes;
+ private final TimerStats segmentReadTime;
+ private final TimerStats segmentWriteTime;
+
+ public DiskCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) {
+ segmentReadBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY);
+ segmentWriteBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY);
+ segmentReadTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY);
+ segmentWriteTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY);
+ }
+
+ @Override
+ public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) {
+ segmentReadBytes.mark(length);
+ segmentReadTime.update(elapsed, NANOSECONDS);
+ }
+
+ @Override
+ public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) {
+ segmentWriteBytes.mark(length);
+ segmentWriteTime.update(elapsed, NANOSECONDS);
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java Tue Aug 4 14:13:58 2020
@@ -17,9 +17,12 @@
*/
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,19 +40,25 @@ import java.nio.file.attribute.FileTime;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
public class PersistentDiskCache extends AbstractPersistentCache {
private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class);
public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512;
+ public static final String NAME = "Segment Disk Cache";
private final File directory;
private final long maxCacheSizeBytes;
+ private final IOMonitor diskCacheIOMonitor;
final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
+ final AtomicLong evictionCount = new AtomicLong();
+
private static final Comparator<Path> sortedByAccessTime = (path1, path2) -> {
try {
FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime();
@@ -61,43 +70,53 @@ public class PersistentDiskCache extends
return 0;
};
- public PersistentDiskCache(File directory, int cacheMaxSizeMB) {
+ public PersistentDiskCache(File directory, int cacheMaxSizeMB, IOMonitor diskCacheIOMonitor) {
this.directory = directory;
this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L;
-
+ this.diskCacheIOMonitor = diskCacheIOMonitor;
if (!directory.exists()) {
directory.mkdirs();
}
- cacheSize.set(FileUtils.sizeOfDirectory(directory));
+ segmentCacheStats = new SegmentCacheStats(
+ NAME,
+ () -> maxCacheSizeBytes,
+ () -> Long.valueOf(directory.listFiles().length),
+ () -> FileUtils.sizeOfDirectory(directory),
+ () -> evictionCount.get());
}
@Override
- public Buffer readSegment(long msb, long lsb) {
- String segmentId = new UUID(msb, lsb).toString();
- File segmentFile = new File(directory, segmentId);
+ protected Buffer readSegmentInternal(long msb, long lsb) {
+ try {
+ String segmentId = new UUID(msb, lsb).toString();
+ File segmentFile = new File(directory, segmentId);
- if (segmentFile.exists()) {
- try (FileInputStream fis = new FileInputStream(segmentFile);
- FileChannel channel = fis.getChannel()) {
- int length = (int) channel.size();
-
- Buffer buffer = Buffer.allocateDirect(length);
- if (buffer.readFully(channel, 0) < length) {
- throw new EOFException();
- }
- buffer.flip();
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ if (segmentFile.exists()) {
+ diskCacheIOMonitor.beforeSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length());
+ try (FileInputStream fis = new FileInputStream(segmentFile); FileChannel channel = fis.getChannel()) {
+ int length = (int) channel.size();
+
+ Buffer buffer = Buffer.allocateDirect(length);
+ if (buffer.readFully(channel, 0) < length) {
+ throw new EOFException();
+ }
- return buffer;
- } catch (FileNotFoundException e) {
- logger.info("Segment {} deleted from file system!", segmentId);
- } catch (IOException e) {
- logger.error("Error loading segment {} from cache: {}", segmentId, e);
- }
- }
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ diskCacheIOMonitor.afterSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length(), elapsed);
- if (nextCache != null) {
- return nextCache.readSegment(msb, lsb);
+ buffer.flip();
+
+ return buffer;
+ } catch (FileNotFoundException e) {
+ logger.info("Segment {} deleted from file system!", segmentId);
+ } catch (IOException e) {
+ logger.error("Error loading segment {} from cache:", segmentId, e);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Exception while reading segment {} from the cache:", new UUID(msb, lsb), e);
}
return null;
@@ -162,6 +181,7 @@ public class PersistentDiskCache extends
if (cacheSize.get() > maxCacheSizeBytes * 0.66) {
cacheSize.addAndGet(-path.toFile().length());
path.toFile().delete();
+ evictionCount.incrementAndGet();
} else {
breaker.stop();
}
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java Tue Aug 4 14:13:58 2020
@@ -17,34 +17,45 @@
*/
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.exceptions.JedisException;
+import redis.clients.jedis.params.SetParams;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.StringReader;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.util.Properties;
import java.util.UUID;
-
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
-import redis.clients.jedis.Protocol;
-import redis.clients.jedis.exceptions.JedisException;
+import java.util.concurrent.TimeUnit;
public class PersistentRedisCache extends AbstractPersistentCache {
private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class);
public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2;
+ public static final String NAME = "Segment Redis Cache";
private static final String REDIS_PREFIX = "SEGMENT";
- private final int redisExpireSeconds;
+ private final IOMonitor redisCacheIOMonitor;
private JedisPool redisPool;
+ private SetParams setParamsWithExpire;
- public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout, int redisConnectionTimeout,
- int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections) {
- this.redisExpireSeconds = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+ public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout,
+ int redisConnectionTimeout, int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections,
+ int redisDBIndex, IOMonitor redisCacheIOMonitor) {
+ this.redisCacheIOMonitor = redisCacheIOMonitor;
+ int redisExpireSeconds1 = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+ setParamsWithExpire = SetParams.setParams().ex(redisExpireSeconds1);
if (redisPort == 0) {
redisPort = 6379;
@@ -57,28 +68,79 @@ public class PersistentRedisCache extend
jedisPoolConfig.setMaxIdle(redisMaxConnections);
jedisPoolConfig.setMaxTotal(redisMaxTotalConnections);
- this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout, redisSocketTimeout, null, Protocol.DEFAULT_DATABASE, null);
+ this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout,
+ redisSocketTimeout, null, redisDBIndex, null);
+ this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount,
+ this::getCurrentWeight, this::getNumberOfEvictedKeys);
+ }
+
+ private long getCacheElementCount() {
+ try(Jedis redis = redisPool.getResource()) {
+ return redis.dbSize();
+ } catch (JedisException e) {
+ logger.error("Error getting number of elements in redis", e);
+ }
+
+ return -1;
+ }
+
+ private long getRedisMaxMemory() {
+ try{
+ return Long.parseLong(getRedisProperty("memory", "maxmemory"));
+ } catch (JedisException | IOException e) {
+ logger.error("Error getting redis configuration value for 'maxmemory'", e);
+ }
+ return -1;
+ }
+
+ private long getCurrentWeight() {
+ try{
+ return Long.parseLong(getRedisProperty("memory", "used_memory"));
+ } catch (JedisException | IOException e) {
+ logger.error("Error getting number of elements in redis", e);
+ }
+ return -1;
+ }
+
+ private long getNumberOfEvictedKeys() {
+ try{
+ return Long.parseLong(getRedisProperty("stats", "evicted_keys"));
+ } catch (JedisException | IOException e) {
+ logger.error("Error getting number of evicted elements in redis", e);
+ }
+ return -1;
+ }
+
+ private String getRedisProperty(String section, String propertyName) throws IOException {
+ try(Jedis redis = redisPool.getResource()) {
+ String redisInfoString = redis.info(section);
+ Properties props = new Properties();
+ props.load(new StringReader(redisInfoString));
+ return (String) props.get(propertyName);
+ }
}
@Override
- public Buffer readSegment(long msb, long lsb) {
+ protected Buffer readSegmentInternal(long msb, long lsb) {
String segmentId = new UUID(msb, lsb).toString();
+ Stopwatch stopwatch = Stopwatch.createStarted();
try(Jedis redis = redisPool.getResource()) {
+ redisCacheIOMonitor.beforeSegmentRead(null, msb, lsb, 0);
+
final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes());
if (bytes != null) {
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ redisCacheIOMonitor.afterSegmentRead(null, msb, lsb, bytes.length, elapsed);
+
Buffer buffer = Buffer.allocateDirect(bytes.length);
buffer.put(bytes);
buffer.flip();
return buffer;
}
- } catch (JedisException e) {
- logger.error("Error loading segment {} from cache: {}", segmentId, e);
- }
-
- if (nextCache != null) {
- return nextCache.readSegment(msb, lsb);
+ } catch (Exception e) {
+ logger.error("Error loading segment {} from cache", segmentId, e);
}
return null;
@@ -88,7 +150,7 @@ public class PersistentRedisCache extend
public boolean containsSegment(long msb, long lsb) {
String segmentId = new UUID(msb, lsb).toString();
- try(Jedis redis = redisPool.getResource()) {
+ try (Jedis redis = redisPool.getResource()) {
return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes());
} catch (JedisException e) {
logger.error("Error checking segment existence {} in cache: {}", segmentId, e);
@@ -98,7 +160,7 @@ public class PersistentRedisCache extend
}
@Override
- public void writeSegment(long msb, long lsb, Buffer buffer){
+ public void writeSegment(long msb, long lsb, Buffer buffer) {
String segmentId = new UUID(msb, lsb).toString();
Buffer bufferCopy = buffer.duplicate();
@@ -110,8 +172,7 @@ public class PersistentRedisCache extend
bufferCopy.write(channel);
}
final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes();
- redis.set(key, bos.toByteArray());
- redis.expire(key, redisExpireSeconds);
+ redis.set(key, bos.toByteArray(), setParamsWithExpire);
cacheSize.addAndGet(bos.size());
} catch (Throwable t) {
logger.error("Error writing segment {} to cache: {}", segmentId, t);
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java Tue Aug 4 14:13:58 2020
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * This {@code IOMonitor} implementations registers the following monitoring endpoints
+ * with the Metrics library if available:
+ * <ul>
+ * <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES}:
+ * a meter metrics for the number of bytes read from segment redis cache</li>
+ * <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES}:
+ * a meter metrics for the number of bytes written to segment redis cache</li>
+ * <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME}:
+ * a timer metrics for the time spent reading from segment redis cache</li>
+ * <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME}:
+ * a timer metrics for the time spent writing to segment redis cache</li>
+ * </ul>
+ */
+public class RedisCacheIOMonitor extends IOMonitorAdapter {
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES = "oak.segment.cache.redis.segment-read-bytes";
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES = "oak.segment.cache.redis.segment-write-bytes";
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME = "oak.segment.cache.redis.segment-read-time";
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME = "oak.segment.cache.redis.segment-write-time";
+
+ private final MeterStats segmentReadBytes;
+ private final MeterStats segmentWriteBytes;
+ private final TimerStats segmentReadTime;
+ private final TimerStats segmentWriteTime;
+
+ public RedisCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) {
+ segmentReadBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY);
+ segmentWriteBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY);
+ segmentReadTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY);
+ segmentWriteTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY);
+ }
+
+ @Override
+ public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) {
+ segmentReadBytes.mark(length);
+ segmentReadTime.update(elapsed, NANOSECONDS);
+ }
+
+ @Override
+ public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) {
+ segmentWriteBytes.mark(length);
+ segmentWriteTime.update(elapsed, NANOSECONDS);
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java Tue Aug 4 14:13:58 2020
@@ -1,16 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
import com.google.common.io.Closer;
+import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import java.io.File;
import java.io.IOException;
@@ -26,8 +51,14 @@ public class RemotePersistentCacheServic
private final Closer closer = Closer.create();
+ private OsgiWhiteboard osgiWhiteboard;
+
+ @Reference
+ private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
+
@Activate
public void activate(ComponentContext context, Configuration config) throws IOException {
+ osgiWhiteboard = new OsgiWhiteboard(context.getBundleContext());
persistentCache = createPersistentCache(config, closer);
registration = context.getBundleContext().registerService(PersistentCache.class.getName(), persistentCache, new Properties());
}
@@ -42,24 +73,47 @@ public class RemotePersistentCacheServic
persistentCache = null;
}
- private static PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+ protected void registerCloseable(final Registration registration) {
+ closer.register(registration::unregister);
+ }
+
+ protected <T> Registration registerMBean(Class<T> clazz, T bean, String type, String name) {
+ return WhiteboardUtils.registerMBean(osgiWhiteboard, clazz, bean, type, name);
+ }
+
+ private PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+
+ RoleStatisticsProvider roleStatisticsProvider = new RoleStatisticsProvider(statisticsProvider, "remote_persistence");
+
+ DiskCacheIOMonitor diskCacheIOMonitor = new DiskCacheIOMonitor(roleStatisticsProvider);
+ RedisCacheIOMonitor redisCacheIOMonitor = new RedisCacheIOMonitor(roleStatisticsProvider);
+
if (configuration.diskCacheEnabled()) {
- PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB());
+ PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB(), diskCacheIOMonitor);
closer.register(persistentDiskCache);
+ CacheStatsMBean diskCacheStatsMBean = persistentDiskCache.getCacheStats();
+ registerCloseable(registerMBean(CacheStatsMBean.class, diskCacheStatsMBean, CacheStats.TYPE, diskCacheStatsMBean.getName()));
+
if (configuration.redisCacheEnabled()) {
PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
- configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+ configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
persistentDiskCache.linkWith(redisCache);
closer.register(redisCache);
+
+ CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
+ registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));
}
return persistentDiskCache;
} else if (configuration.redisCacheEnabled()) {
PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
- configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+ configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
closer.register(redisCache);
+ CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
+ registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));
+
return redisCache;
}
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java Tue Aug 4 14:13:58 2020
@@ -104,7 +104,7 @@ public abstract class AbstractPersistent
final TestSegment segment = testSegments.get(nSegment);
final long[] id = segment.getSegmentId();
try {
- final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+ final Buffer segmentRead = persistentCache.readSegment(id[0], id[1], () -> null);
segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
} finally {
done.incrementAndGet();
@@ -146,7 +146,7 @@ public abstract class AbstractPersistent
if (persistentCache.containsSegment(msb, lsb)) {
containsFailures.incrementAndGet();
}
- if (persistentCache.readSegment(msb, lsb) != null) {
+ if (persistentCache.readSegment(msb, lsb, () -> null) != null) {
readFailures.incrementAndGet();
}
} catch (Throwable t) {
@@ -184,7 +184,7 @@ public abstract class AbstractPersistent
if (!persistentCache.containsSegment(segmentId[0], segmentId[1])) {
containsFailures.incrementAndGet();
}
- if (persistentCache.readSegment(segmentId[0], segmentId[1]) == null) {
+ if (persistentCache.readSegment(segmentId[0], segmentId[1], () -> null) == null) {
readFailures.incrementAndGet();
}
} catch (Throwable t) {
@@ -219,7 +219,7 @@ public abstract class AbstractPersistent
waitWhile.accept(() -> done.get() < SEGMENTS);
- Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1]);
+ Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1], () -> null);
assertNotNull("The segment was not found", segmentRead);
assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead);
}
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java Tue Aug 4 14:13:58 2020
@@ -18,19 +18,29 @@
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import java.io.File;
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class PersistentDiskCacheTest extends AbstractPersistentCacheTest {
@@ -39,13 +49,13 @@ public class PersistentDiskCacheTest ext
@Before
public void setUp() throws Exception {
- persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024);
+ persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024, new IOMonitorAdapter());
}
@Test
public void cleanupTest() throws Exception {
persistentCache.close();
- persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0);
+ persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0, new IOMonitorAdapter());
final List<TestSegment> testSegments = new ArrayList<>(SEGMENTS);
final List<Map<String, Buffer>> segmentsRead = new ArrayList<>(THREADS);
@@ -86,7 +96,7 @@ public class PersistentDiskCacheTest ext
final long[] id = segment.getSegmentId();
try {
final Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(nThread);
- final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+ final Buffer segmentRead = persistentCache.readSegment(id[0], id[1], () -> null);
segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
} catch (Throwable t) {
errors.incrementAndGet();
@@ -115,4 +125,30 @@ public class PersistentDiskCacheTest ext
}
assertEquals("Segment(s) not cleaned up in cache", 0, SEGMENTS - errors.get());
}
+
+ @Test
+ public void testIOMonitor() throws IOException {
+ IOMonitorAdapter ioMonitorAdapter = Mockito.mock(IOMonitorAdapter.class);
+
+ persistentCache.close();
+ File cacheFolder = temporaryFolder.newFolder();
+ persistentCache = new PersistentDiskCache(cacheFolder, 0, ioMonitorAdapter);
+
+ UUID segmentUUID = UUID.randomUUID();
+
+ persistentCache.readSegment(segmentUUID.getMostSignificantBits(), segmentUUID.getLeastSignificantBits(), () -> null);
+
+ //Segment not in cache, monitor methods not invoked
+ verify(ioMonitorAdapter, never()).beforeSegmentRead(any(), anyLong(), anyLong(), anyInt());
+ verify(ioMonitorAdapter, never()).afterSegmentRead(any(), anyLong(), anyLong(), anyInt(), anyLong());
+
+ //place segment in disk cache
+ File segmentFile = new File(cacheFolder, segmentUUID.toString());
+ segmentFile.createNewFile();
+
+ persistentCache.readSegment(segmentUUID.getMostSignificantBits(), segmentUUID.getLeastSignificantBits(), () -> null);
+
+ verify(ioMonitorAdapter, times(1)).beforeSegmentRead(eq(segmentFile), eq(segmentUUID.getMostSignificantBits()), eq(segmentUUID.getLeastSignificantBits()), anyInt());
+ verify(ioMonitorAdapter, times(1)).afterSegmentRead(eq(segmentFile), eq(segmentUUID.getMostSignificantBits()), eq(segmentUUID.getLeastSignificantBits()), anyInt(), anyLong());
+ }
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java Tue Aug 4 14:13:58 2020
@@ -17,19 +17,37 @@
*/
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
import redis.embedded.RedisServer;
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class PersistentRedisCacheTest extends AbstractPersistentCacheTest {
private RedisServer redisServer;
+ private IOMonitorAdapter ioMonitorAdapter;
@Before
public void setUp() throws Exception {
redisServer = RedisServer.builder().build();
redisServer.start();
int port = redisServer.ports().get(0);
+ ioMonitorAdapter = Mockito.mock(IOMonitorAdapter.class);
+
persistentCache = new PersistentRedisCache(
"localhost",
port,
@@ -38,7 +56,9 @@ public class PersistentRedisCacheTest ex
50,
10,
2000,
- 200000
+ 200000,
+ 0,
+ ioMonitorAdapter
);
}
@@ -46,4 +66,26 @@ public class PersistentRedisCacheTest ex
public void tearDown() throws Exception {
redisServer.stop();
}
+
+ @Test
+ public void testIOMonitor() throws IOException, InterruptedException {
+
+ UUID segmentUUID = UUID.randomUUID();
+ long msb = segmentUUID.getMostSignificantBits();
+ long lsb = segmentUUID.getLeastSignificantBits();
+
+ persistentCache.readSegment(msb, lsb, () -> null);
+
+ //Segment not in cache, monitor methods not invoked
+ verify(ioMonitorAdapter, never()).afterSegmentRead(any(), anyLong(), anyLong(), anyInt(), anyLong());
+
+ persistentCache.writeSegment(msb, lsb, Buffer.wrap("segment_content".getBytes()));
+
+ Thread.sleep(300);
+
+ persistentCache.readSegment(msb, lsb, () -> null);
+
+ verify(ioMonitorAdapter, times(1)).afterSegmentRead(any(), eq(msb), eq(lsb), anyInt(), anyLong());
+ }
+
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-segment-tar/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/pom.xml?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/pom.xml Tue Aug 4 14:13:58 2020
@@ -48,7 +48,8 @@
org.apache.jackrabbit.oak.segment.spi,
org.apache.jackrabbit.oak.segment.spi.monitor,
org.apache.jackrabbit.oak.segment.spi.persistence,
- org.apache.jackrabbit.oak.segment.spi.persistence.split
+ org.apache.jackrabbit.oak.segment.spi.persistence.split,
+ org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache
</Export-Package>
<Embed-Dependency>
netty-*,
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java Tue Aug 4 14:13:58 2020
@@ -47,6 +47,7 @@ import org.apache.jackrabbit.api.stats.R
import org.apache.jackrabbit.api.stats.TimeSeries;
import org.apache.jackrabbit.oak.osgi.OsgiUtil;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -641,51 +642,6 @@ public class SegmentNodeStoreFactory {
}
private static StatisticsProvider getRoleStatisticsProvider(StatisticsProvider delegate, String role) {
- RepositoryStatistics repositoryStatistics = new RepositoryStatistics() {
-
- @Override
- public TimeSeries getTimeSeries(Type type) {
- return getTimeSeries(type.name(), type.isResetValueEachSecond());
- }
-
- @Override
- public TimeSeries getTimeSeries(String type, boolean resetValueEachSecond) {
- return delegate.getStats().getTimeSeries(addRoleToName(type, role), resetValueEachSecond);
- }
- };
-
- return new StatisticsProvider() {
-
- @Override
- public RepositoryStatistics getStats() {
- return repositoryStatistics;
- }
-
- @Override
- public MeterStats getMeter(String name, StatsOptions options) {
- return delegate.getMeter(addRoleToName(name, role), options);
- }
-
- @Override
- public CounterStats getCounterStats(String name, StatsOptions options) {
- return delegate.getCounterStats(addRoleToName(name, role), options);
- }
-
- @Override
- public TimerStats getTimer(String name, StatsOptions options) {
- return delegate.getTimer(addRoleToName(name, role), options);
- }
-
- @Override
- public HistogramStats getHistogram(String name, StatsOptions options) {
- return delegate.getHistogram(addRoleToName(name, role), options);
- }
-
- };
+ return new RoleStatisticsProvider(delegate, role);
}
-
- private static String addRoleToName(String name, String role) {
- return role + '.' + name;
- }
-
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java Tue Aug 4 14:13:58 2020
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.monitor;
+
+import org.apache.jackrabbit.api.stats.RepositoryStatistics;
+import org.apache.jackrabbit.api.stats.TimeSeries;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+
+public class RoleStatisticsProvider implements StatisticsProvider{
+
+ private final StatisticsProvider delegate;
+ private final String role;
+ private final RepositoryStatistics repositoryStatistics;
+
+ public RoleStatisticsProvider(StatisticsProvider delegate, String role) {
+ this.delegate = delegate;
+ this.role = role;
+
+ this.repositoryStatistics = new RepositoryStatistics() {
+
+ @Override
+ public TimeSeries getTimeSeries(Type type) {
+ return getTimeSeries(type.name(), type.isResetValueEachSecond());
+ }
+
+ @Override
+ public TimeSeries getTimeSeries(String type, boolean resetValueEachSecond) {
+ return delegate.getStats().getTimeSeries(addRoleToName(type, role), resetValueEachSecond);
+ }
+ };
+ }
+
+ @Override
+ public RepositoryStatistics getStats() {
+ return repositoryStatistics;
+ }
+
+ @Override
+ public MeterStats getMeter(String name, StatsOptions options) {
+ return delegate.getMeter(addRoleToName(name, role), options);
+ }
+
+ @Override
+ public CounterStats getCounterStats(String name, StatsOptions options) {
+ return delegate.getCounterStats(addRoleToName(name, role), options);
+ }
+
+ @Override
+ public TimerStats getTimer(String name, StatsOptions options) {
+ return delegate.getTimer(addRoleToName(name, role), options);
+ }
+
+ @Override
+ public HistogramStats getHistogram(String name, StatsOptions options) {
+ return delegate.getHistogram(addRoleToName(name, role), options);
+ }
+
+ private static String addRoleToName(String name, String role) {
+ return role + '.' + name;
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java Tue Aug 4 14:13:58 2020
@@ -17,18 +17,26 @@
*/
package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
public abstract class AbstractPersistentCache implements PersistentCache, Closeable {
private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class);
@@ -39,17 +47,83 @@ public abstract class AbstractPersistent
protected PersistentCache nextCache;
protected final HashSet<String> writesPending;
+ protected SegmentCacheStats segmentCacheStats;
+
public AbstractPersistentCache() {
executor = Executors.newFixedThreadPool(THREADS);
writesPending = new HashSet<>();
}
- public PersistentCache linkWith(PersistentCache nextCache) {
+ public PersistentCache linkWith(AbstractPersistentCache nextCache) {
this.nextCache = nextCache;
return nextCache;
}
@Override
+ public Buffer readSegment(long msb, long lsb, @NotNull Callable<Buffer> loader) {
+ Buffer segment = readSegmentInternal(msb, lsb);
+ if (segment != null) {
+ segmentCacheStats.hitCount.incrementAndGet();
+ return segment;
+ }
+ segmentCacheStats.missCount.incrementAndGet();
+
+ // Either use the next cache or the 'loader'
+ Callable<Buffer> nextLoader = nextCache != null
+ ? () -> nextCache.readSegment(msb, lsb, loader)
+ : loader;
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ segment = nextLoader.call();
+
+ if (segment != null) {
+ recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), true);
+ writeSegment(msb, lsb, segment);
+ }
+
+ return segment;
+ } catch (Exception t) {
+ logger.error("Exception while loading segment {} from remote store or linked cache", new UUID(msb, lsb), t);
+ recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), false);
+ }
+ return segment;
+ }
+
+ /**
+ * Reads the segment from the cache.
+ * If segment is not found, this method does not query next cache that was set with {@link #linkWith(AbstractPersistentCache)}
+ *
+ * @param msb the most significant bits of the identifier of the segment
+ * @param lsb the least significant bits of the identifier of the segment
+ * @return byte buffer containing the segment data or null if the segment doesn't exist
+ */
+ protected abstract Buffer readSegmentInternal(long msb, long lsb);
+
+ /**
+ * Records time spent to load data from external source, after cache miss.
+ *
+ * @param loadTime load time in nanoseconds
+ * @param successful indicates whether loading of the segment into cache was successful
+ */
+ protected final void recordCacheLoadTimeInternal(long loadTime, boolean successful) {
+ if (successful) {
+ segmentCacheStats.loadSuccessCount.incrementAndGet();
+ } else {
+ segmentCacheStats.loadExceptionCount.incrementAndGet();
+ }
+ segmentCacheStats.loadTime.addAndGet(loadTime);
+ }
+
+ /**
+ * @return Statistics for this cache.
+ */
+ @NotNull
+ public AbstractCacheStats getCacheStats() {
+ return segmentCacheStats;
+ }
+
+ @Override
public void close() {
try {
executor.shutdown();
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java Tue Aug 4 14:13:58 2020
@@ -45,14 +45,7 @@ public class CachingSegmentArchiveReader
@Override
@Nullable
public Buffer readSegment(long msb, long lsb) throws IOException {
- Buffer buffer = persistentCache.readSegment(msb, lsb);
- if (buffer == null) {
- buffer = delegate.readSegment(msb, lsb);
- if (buffer != null) {
- persistentCache.writeSegment(msb, lsb, buffer);
- }
- }
- return buffer;
+ return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb));
}
@Override
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java Tue Aug 4 14:13:58 2020
@@ -18,8 +18,11 @@
package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import java.util.concurrent.Callable;
+
/**
* This interface represents a cache which survives segment store restarts.
* The cache is agnostic to any archive structure. Segments are only
@@ -32,10 +35,11 @@ public interface PersistentCache {
*
* @param msb the most significant bits of the identifier of the segment
* @param lsb the least significant bits of the identifier of the segment
+ * @param loader in case of cache miss, with {@code loader.call()} missing element will be retrieved
* @return byte buffer containing the segment data or null if the segment doesn't exist
*/
@Nullable
- Buffer readSegment(long msb, long lsb);
+ Buffer readSegment(long msb, long lsb, @NotNull Callable<Buffer> loader);
/**
* Check if the segment exists in the cache.
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java Tue Aug 4 14:13:58 2020
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import com.google.common.cache.CacheStats;
+import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class SegmentCacheStats extends AbstractCacheStats {
+ private final @NotNull Supplier<Long> maximumWeight;
+
+ @NotNull
+ private final Supplier<Long> elementCount;
+
+ @NotNull
+ final Supplier<Long> currentWeight;
+
+ @NotNull
+ final AtomicLong loadSuccessCount = new AtomicLong();
+
+ @NotNull
+ final AtomicInteger loadExceptionCount = new AtomicInteger();
+
+ @NotNull
+ final AtomicLong loadTime = new AtomicLong();
+
+ @NotNull
+ final Supplier<Long> evictionCount;
+
+ @NotNull
+ final AtomicLong hitCount = new AtomicLong();
+
+ @NotNull
+ final AtomicLong missCount = new AtomicLong();
+
+ public SegmentCacheStats(@NotNull String name,
+ @NotNull Supplier<Long> maximumWeight,
+ @NotNull Supplier<Long> elementCount,
+ @NotNull Supplier<Long> currentWeight,
+ @NotNull Supplier<Long> evictionCount) {
+ super(name);
+ this.maximumWeight = maximumWeight;
+ this.elementCount = checkNotNull(elementCount);
+ this.currentWeight = checkNotNull(currentWeight);
+ this.evictionCount = evictionCount;
+ }
+
+ @Override
+ protected CacheStats getCurrentStats() {
+ return new CacheStats(
+ hitCount.get(),
+ missCount.get(),
+ loadSuccessCount.get(),
+ loadExceptionCount.get(),
+ loadTime.get(),
+ evictionCount.get()
+ );
+ }
+
+ @Override
+ public long getElementCount() {
+ return elementCount.get();
+ }
+
+ @Override
+ public long getMaxTotalWeight() {
+ return maximumWeight.get();
+ }
+
+ @Override
+ public long estimateCurrentWeight() {
+ return currentWeight.get();
+ }
+}
+
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java Tue Aug 4 14:13:58 2020
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Internal(since = "1.0.0")
+@Version("1.0.0")
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.annotations.Internal;
+import org.osgi.annotation.versioning.Version;
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java Tue Aug 4 14:13:58 2020
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PersistentCacheStatsTest {
+
+ @Test
+ public void testCacheStats() {
+ AbstractPersistentCache cache = new PersistentCacheImpl();
+
+ cache.writeSegment(1, 1, Buffer.wrap(new byte[]{1}));
+
+ //segment in cache
+ Buffer segment = cache.readSegment(1, 1, () -> null);
+ assertNotNull(segment);
+
+ assertEquals(1, cache.getCacheStats().getHitCount());
+ long loadTime = cache.getCacheStats().getTotalLoadTime();
+ assertEquals(0, loadTime);
+
+
+ //segment not in cache but loaded from remote store
+ segment = cache.readSegment(0, 0, () -> Buffer.wrap(new byte[]{0}));
+ assertNotNull(segment);
+ assertEquals(1, cache.getCacheStats().getMissCount());
+ loadTime = cache.getCacheStats().getTotalLoadTime();
+ assertTrue(loadTime > 0);
+
+ //segment not in cache and exception while loading from remote store
+ segment = cache.readSegment(2, 2, () -> {
+ throw new Exception();
+ });
+ assertNull(segment);
+ long loadTime2 = cache.getCacheStats().getTotalLoadTime();
+ assertTrue(loadTime2 > loadTime);
+ assertEquals(2, cache.getCacheStats().getMissCount());
+ assertEquals(1, cache.getCacheStats().getLoadExceptionCount());
+
+ cache.close();
+ }
+
+ @Test
+ public void testCacheStatsForLinkedCaches() {
+ AbstractPersistentCache cache1 = new PersistentCacheImpl();
+ AbstractPersistentCache cache2 = new PersistentCacheImpl();
+
+ cache1.linkWith(cache2);
+
+ //segment not in either cache
+ Buffer segment = cache1.readSegment(0 ,0, () -> null);
+
+ assertNull(segment);
+ assertEquals(1, cache1.getCacheStats().getMissCount());
+ assertEquals(1, cache2.getCacheStats().getMissCount());
+ assertEquals(0, cache1.getCacheStats().getHitCount());
+ assertEquals(0, cache2.getCacheStats().getHitCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+ //segment in first cache
+ cache1.writeSegment(1, 1, Buffer.wrap(new byte[]{1}));
+ segment = cache1.readSegment(1 ,1, () -> null);
+
+ assertNotNull(segment);
+ assertEquals(1, cache1.getCacheStats().getMissCount());
+ assertEquals(1, cache2.getCacheStats().getMissCount());
+ assertEquals(1, cache1.getCacheStats().getHitCount());
+ assertEquals(0, cache2.getCacheStats().getHitCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+ //segment in second cache
+ cache2.writeSegment(2, 2, Buffer.wrap(new byte[]{2}));
+ segment = cache1.readSegment(2 ,2, () -> null);
+
+ assertNotNull(segment);
+ assertEquals(2, cache1.getCacheStats().getMissCount());
+ assertEquals(1, cache2.getCacheStats().getMissCount());
+ assertEquals(1, cache1.getCacheStats().getHitCount());
+ assertEquals(1, cache2.getCacheStats().getHitCount());
+ assertEquals(1, cache1.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+ //segment loaded from the remote storage
+ segment = cache1.readSegment(3 ,3, () -> Buffer.wrap(new byte[]{3}));
+
+ assertNotNull(segment);
+ assertEquals(3, cache1.getCacheStats().getMissCount());
+ assertEquals(2, cache2.getCacheStats().getMissCount());
+ assertEquals(1, cache1.getCacheStats().getHitCount());
+ assertEquals(1, cache2.getCacheStats().getHitCount());
+ assertEquals(2, cache1.segmentCacheStats.getLoadCount());
+ assertEquals(1, cache2.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+ //exception while loading segment from the remote storage
+ segment = cache1.readSegment(4 ,4, () -> {
+ throw new Exception();
+ });
+
+ assertNull(segment);
+ assertEquals(4, cache1.getCacheStats().getMissCount());
+ assertEquals(3, cache2.getCacheStats().getMissCount());
+ assertEquals(1, cache1.getCacheStats().getHitCount());
+ assertEquals(1, cache2.getCacheStats().getHitCount());
+ assertEquals(2, cache1.segmentCacheStats.getLoadCount());
+ assertEquals(2, cache2.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+ assertEquals(1, cache2.segmentCacheStats.getLoadExceptionCount());
+
+ //linked cache throws exception
+ cache2 = new PersistentCacheImpl(){
+ @Override
+ protected Buffer readSegmentInternal(long msb, long lsb) {
+ throw new RuntimeException();
+ }
+ };
+ cache1.linkWith(cache2);
+ segment = cache1.readSegment(5 ,5, () -> Buffer.wrap(new byte[]{5}));
+
+ assertNull(segment);
+ assertEquals(5, cache1.getCacheStats().getMissCount());
+ assertEquals(0, cache2.getCacheStats().getMissCount());
+ assertEquals(1, cache1.getCacheStats().getHitCount());
+ assertEquals(0, cache2.getCacheStats().getHitCount());
+ assertEquals(3, cache1.segmentCacheStats.getLoadCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+ assertEquals(1, cache1.segmentCacheStats.getLoadExceptionCount());
+ assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+ cache2.close();
+ cache1.close();
+ }
+
+ class PersistentCacheImpl extends AbstractPersistentCache {
+ HashMap<UUID, Buffer> segments = new HashMap<>();
+
+ public PersistentCacheImpl() {
+ segmentCacheStats = new SegmentCacheStats("stats", () -> maximumWeight, () -> elementCount.get(), () -> currentWeight.get(), () -> evictionCount.get());
+ }
+
+ long maximumWeight = Long.MAX_VALUE;
+ AtomicLong elementCount = new AtomicLong();
+ AtomicLong currentWeight = new AtomicLong();
+ AtomicLong evictionCount = new AtomicLong();
+
+ void AbstractPersistentCache() {
+ segmentCacheStats = new SegmentCacheStats("stats", () -> maximumWeight, () -> elementCount.get(), () -> currentWeight.get(), () -> evictionCount.get());
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return segments.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, Buffer buffer) {
+ segments.put(new UUID(msb, lsb), buffer);
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ @Override
+ protected Buffer readSegmentInternal(long msb, long lsb) {
+ return segments.get(new UUID(msb, lsb));
+ }
+ }
+}