You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/08/04 14:13:58 UTC

svn commit: r1880569 - in /jackrabbit/oak/trunk: oak-run/ oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/ oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/ oak-segment-...

Author: adulceanu
Date: Tue Aug  4 14:13:58 2020
New Revision: 1880569

URL: http://svn.apache.org/viewvc?rev=1880569&view=rev
Log:
OAK-7744 - Persistent cache for the Segment Node Store
Added cache metrics
Contribution by Miroslav Smiljanic

Added:
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java
Modified:
    jackrabbit/oak/trunk/oak-run/pom.xml
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
    jackrabbit/oak/trunk/oak-segment-tar/pom.xml
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java

Modified: jackrabbit/oak/trunk/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/pom.xml?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-run/pom.xml Tue Aug  4 14:13:58 2020
@@ -35,7 +35,7 @@
     <groovy.version>2.4.17</groovy.version>
     <!--
       Size History:
-      + 55 MB Add support for 3rd level cache (GRANITE-30735)
+      + 55 MB Add support for segment persistent cache (OAK-7744)
       + 54 MB AWS support for segment-tar (OAK-8827)
       + 52 MB AWS java sdk update (OAK-8875)
       + 51 MB AWS java sdk update (OAK-7536)

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java Tue Aug  4 14:13:58 2020
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 package org.apache.jackrabbit.oak.segment.remote.persistentcache;
 
 import static org.apache.jackrabbit.oak.segment.remote.persistentcache.Configuration.PID;
@@ -58,6 +75,12 @@ public @interface Configuration {
     int redisCacheExpireSeconds() default DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
 
     @AttributeDefinition(
+            name = "Redis cache db index",
+            description = "Redis cache db index (see Jedis#select(int))"
+    )
+    int redisDBIndex() default 1;
+
+    @AttributeDefinition(
             name = "Redis socket timeout",
             description = "Number of seconds to wait for response for request"
     )

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java Tue Aug  4 14:13:58 2020
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * This {@code IOMonitor} implementations registers the following monitoring endpoints
+ * with the Metrics library if available:
+ * <ul>
+ *     <li>{@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES}:
+ *          a meter metrics for the number of bytes read from segment disk cache</li>
+ *     <li>{@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES}:
+ *          a meter metrics for the number of bytes written to segment disk cache</li>
+ *     <li>{@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME}:
+ *          a timer metrics for the time spent reading from segment disk cache</li>
+ *     <li>{@link #OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME}:
+ *          a timer metrics for the time spent writing to segment disk cache</li>
+ * </ul>
+ */
+public class DiskCacheIOMonitor extends IOMonitorAdapter {
+    public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES = "oak.segment.cache.disk.segment-read-bytes";
+    public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES = "oak.segment.cache.disk.segment-write-bytes";
+    public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME = "oak.segment.cache.disk.segment-read-time";
+    public static final String OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME = "oak.segment.cache.disk.segment-write-time";
+
+    private final MeterStats segmentReadBytes;
+    private final MeterStats segmentWriteBytes;
+    private final TimerStats segmentReadTime;
+    private final TimerStats segmentWriteTime;
+
+    public DiskCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) {
+        segmentReadBytes = statisticsProvider.getMeter(
+                OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY);
+        segmentWriteBytes = statisticsProvider.getMeter(
+                OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY);
+        segmentReadTime = statisticsProvider.getTimer(
+                OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY);
+        segmentWriteTime = statisticsProvider.getTimer(
+                OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY);
+    }
+
+    @Override
+    public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) {
+        segmentReadBytes.mark(length);
+        segmentReadTime.update(elapsed, NANOSECONDS);
+    }
+
+    @Override
+    public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) {
+        segmentWriteBytes.mark(length);
+        segmentWriteTime.update(elapsed, NANOSECONDS);
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java Tue Aug  4 14:13:58 2020
@@ -17,9 +17,12 @@
  */
 package org.apache.jackrabbit.oak.segment.remote.persistentcache;
 
+import com.google.common.base.Stopwatch;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
 import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,19 +40,25 @@ import java.nio.file.attribute.FileTime;
 import java.util.Comparator;
 import java.util.Spliterator;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
 public class PersistentDiskCache extends AbstractPersistentCache {
     private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class);
     public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512;
+    public static final String NAME = "Segment Disk Cache";
 
     private final File directory;
     private final long maxCacheSizeBytes;
+    private final IOMonitor diskCacheIOMonitor;
 
     final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
 
+    final AtomicLong evictionCount = new AtomicLong();
+
     private static final Comparator<Path> sortedByAccessTime = (path1, path2) -> {
         try {
             FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime();
@@ -61,43 +70,53 @@ public class PersistentDiskCache extends
         return 0;
     };
 
-    public PersistentDiskCache(File directory, int cacheMaxSizeMB) {
+    public PersistentDiskCache(File directory, int cacheMaxSizeMB, IOMonitor diskCacheIOMonitor) {
         this.directory = directory;
         this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L;
-
+        this.diskCacheIOMonitor = diskCacheIOMonitor;
         if (!directory.exists()) {
             directory.mkdirs();
         }
 
-        cacheSize.set(FileUtils.sizeOfDirectory(directory));
+        segmentCacheStats = new SegmentCacheStats(
+                NAME,
+                () -> maxCacheSizeBytes,
+                () -> Long.valueOf(directory.listFiles().length),
+                () -> FileUtils.sizeOfDirectory(directory),
+                () -> evictionCount.get());
     }
 
     @Override
-    public Buffer readSegment(long msb, long lsb) {
-        String segmentId = new UUID(msb, lsb).toString();
-        File segmentFile = new File(directory, segmentId);
+    protected Buffer readSegmentInternal(long msb, long lsb) {
+        try {
+            String segmentId = new UUID(msb, lsb).toString();
+            File segmentFile = new File(directory, segmentId);
 
-        if (segmentFile.exists()) {
-            try (FileInputStream fis = new FileInputStream(segmentFile);
-                 FileChannel channel = fis.getChannel()) {
-                int length = (int) channel.size();
-
-                Buffer buffer = Buffer.allocateDirect(length);
-                if (buffer.readFully(channel, 0) < length) {
-                    throw new EOFException();
-                }
-                buffer.flip();
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            if (segmentFile.exists()) {
+                diskCacheIOMonitor.beforeSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length());
+                try (FileInputStream fis = new FileInputStream(segmentFile); FileChannel channel = fis.getChannel()) {
+                    int length = (int) channel.size();
+
+                    Buffer buffer = Buffer.allocateDirect(length);
+                    if (buffer.readFully(channel, 0) < length) {
+                        throw new EOFException();
+                    }
 
-                return buffer;
-            } catch (FileNotFoundException e) {
-                logger.info("Segment {} deleted from file system!", segmentId);
-            } catch (IOException e) {
-                logger.error("Error loading segment {} from cache: {}", segmentId, e);
-            }
-        }
+                    long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+                    diskCacheIOMonitor.afterSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length(), elapsed);
 
-        if (nextCache != null) {
-            return nextCache.readSegment(msb, lsb);
+                    buffer.flip();
+
+                    return buffer;
+                } catch (FileNotFoundException e) {
+                    logger.info("Segment {} deleted from file system!", segmentId);
+                } catch (IOException e) {
+                    logger.error("Error loading segment {} from cache:", segmentId, e);
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Exception while reading segment {} from the cache:", new UUID(msb, lsb), e);
         }
 
         return null;
@@ -162,6 +181,7 @@ public class PersistentDiskCache extends
                     if (cacheSize.get() > maxCacheSizeBytes * 0.66) {
                         cacheSize.addAndGet(-path.toFile().length());
                         path.toFile().delete();
+                        evictionCount.incrementAndGet();
                     } else {
                         breaker.stop();
                     }

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java Tue Aug  4 14:13:58 2020
@@ -17,34 +17,45 @@
  */
 package org.apache.jackrabbit.oak.segment.remote.persistentcache;
 
+import com.google.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
 import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.exceptions.JedisException;
+import redis.clients.jedis.params.SetParams;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.StringReader;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Properties;
 import java.util.UUID;
-
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
-import redis.clients.jedis.Protocol;
-import redis.clients.jedis.exceptions.JedisException;
+import java.util.concurrent.TimeUnit;
 
 public class PersistentRedisCache extends AbstractPersistentCache {
     private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class);
     public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2;
+    public static final String NAME = "Segment Redis Cache";
 
     private static final String REDIS_PREFIX = "SEGMENT";
-    private final int redisExpireSeconds;
+    private final IOMonitor redisCacheIOMonitor;
     private JedisPool redisPool;
+    private SetParams setParamsWithExpire;
 
-    public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout, int redisConnectionTimeout,
-                                int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections) {
-        this.redisExpireSeconds = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+    public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout,
+            int redisConnectionTimeout, int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections,
+            int redisDBIndex, IOMonitor redisCacheIOMonitor) {
+        this.redisCacheIOMonitor = redisCacheIOMonitor;
+        int redisExpireSeconds1 = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+        setParamsWithExpire = SetParams.setParams().ex(redisExpireSeconds1);
 
         if (redisPort == 0) {
             redisPort = 6379;
@@ -57,28 +68,79 @@ public class PersistentRedisCache extend
         jedisPoolConfig.setMaxIdle(redisMaxConnections);
         jedisPoolConfig.setMaxTotal(redisMaxTotalConnections);
 
-        this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout, redisSocketTimeout, null, Protocol.DEFAULT_DATABASE, null);
+        this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout,
+                redisSocketTimeout, null, redisDBIndex, null);
+        this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount,
+                this::getCurrentWeight, this::getNumberOfEvictedKeys);
+    }
+
+    private long getCacheElementCount() {
+        try(Jedis redis = redisPool.getResource()) {
+            return redis.dbSize();
+        } catch (JedisException e) {
+            logger.error("Error getting number of elements in redis", e);
+        }
+
+        return -1;
+    }
+
+    private long getRedisMaxMemory() {
+        try{
+            return Long.parseLong(getRedisProperty("memory",   "maxmemory"));
+        } catch (JedisException | IOException e) {
+            logger.error("Error getting redis configuration value for 'maxmemory'", e);
+        }
+        return -1;
+    }
+
+    private long getCurrentWeight() {
+        try{
+            return Long.parseLong(getRedisProperty("memory",   "used_memory"));
+        } catch (JedisException | IOException e) {
+            logger.error("Error getting number of elements in redis", e);
+        }
+        return -1;
+    }
+
+    private long getNumberOfEvictedKeys() {
+        try{
+            return Long.parseLong(getRedisProperty("stats",   "evicted_keys"));
+        } catch (JedisException | IOException e) {
+            logger.error("Error getting number of evicted elements in redis", e);
+        }
+        return -1;
+    }
+
+    private String getRedisProperty(String section, String propertyName) throws IOException {
+        try(Jedis redis = redisPool.getResource()) {
+            String redisInfoString = redis.info(section);
+            Properties props = new Properties();
+            props.load(new StringReader(redisInfoString));
+            return (String) props.get(propertyName);
+        }
     }
 
     @Override
-    public Buffer readSegment(long msb, long lsb) {
+    protected Buffer readSegmentInternal(long msb, long lsb) {
         String segmentId = new UUID(msb, lsb).toString();
 
+        Stopwatch stopwatch = Stopwatch.createStarted();
         try(Jedis redis = redisPool.getResource()) {
+            redisCacheIOMonitor.beforeSegmentRead(null, msb, lsb, 0);
+
             final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes());
             if (bytes != null) {
+                long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+                redisCacheIOMonitor.afterSegmentRead(null, msb, lsb, bytes.length, elapsed);
+
                 Buffer buffer = Buffer.allocateDirect(bytes.length);
 
                 buffer.put(bytes);
                 buffer.flip();
                 return buffer;
             }
-        } catch (JedisException e) {
-            logger.error("Error loading segment {} from cache: {}", segmentId, e);
-        }
-
-        if (nextCache != null) {
-            return nextCache.readSegment(msb, lsb);
+        } catch (Exception e) {
+            logger.error("Error loading segment {} from cache", segmentId, e);
         }
 
         return null;
@@ -88,7 +150,7 @@ public class PersistentRedisCache extend
     public boolean containsSegment(long msb, long lsb) {
         String segmentId = new UUID(msb, lsb).toString();
 
-        try(Jedis redis = redisPool.getResource()) {
+        try (Jedis redis = redisPool.getResource()) {
             return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes());
         } catch (JedisException e) {
             logger.error("Error checking segment existence {} in cache: {}", segmentId, e);
@@ -98,7 +160,7 @@ public class PersistentRedisCache extend
     }
 
     @Override
-    public void writeSegment(long msb, long lsb, Buffer buffer){
+    public void writeSegment(long msb, long lsb, Buffer buffer) {
         String segmentId = new UUID(msb, lsb).toString();
         Buffer bufferCopy = buffer.duplicate();
 
@@ -110,8 +172,7 @@ public class PersistentRedisCache extend
                         bufferCopy.write(channel);
                     }
                     final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes();
-                    redis.set(key, bos.toByteArray());
-                    redis.expire(key, redisExpireSeconds);
+                    redis.set(key, bos.toByteArray(), setParamsWithExpire);
                     cacheSize.addAndGet(bos.size());
                 } catch (Throwable t) {
                     logger.error("Error writing segment {} to cache: {}", segmentId, t);

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java Tue Aug  4 14:13:58 2020
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * This {@code IOMonitor} implementations registers the following monitoring endpoints
+ * with the Metrics library if available:
+ * <ul>
+ *     <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES}:
+ *          a meter metrics for the number of bytes read from segment redis cache</li>
+ *     <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES}:
+ *          a meter metrics for the number of bytes written to segment redis cache</li>
+ *     <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME}:
+ *          a timer metrics for the time spent reading from segment redis cache</li>
+ *     <li>{@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME}:
+ *          a timer metrics for the time spent writing to segment redis cache</li>
+ * </ul>
+ */
+public class RedisCacheIOMonitor extends IOMonitorAdapter {
+    public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES = "oak.segment.cache.redis.segment-read-bytes";
+    public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES = "oak.segment.cache.redis.segment-write-bytes";
+    public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME = "oak.segment.cache.redis.segment-read-time";
+    public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME = "oak.segment.cache.redis.segment-write-time";
+
+    private final MeterStats segmentReadBytes;
+    private final MeterStats segmentWriteBytes;
+    private final TimerStats segmentReadTime;
+    private final TimerStats segmentWriteTime;
+
+    public RedisCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) {
+        segmentReadBytes = statisticsProvider.getMeter(
+                OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY);
+        segmentWriteBytes = statisticsProvider.getMeter(
+                OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY);
+        segmentReadTime = statisticsProvider.getTimer(
+                OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY);
+        segmentWriteTime = statisticsProvider.getTimer(
+                OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY);
+    }
+
+    @Override
+    public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) {
+        segmentReadBytes.mark(length);
+        segmentReadTime.update(elapsed, NANOSECONDS);
+    }
+
+    @Override
+    public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) {
+        segmentWriteBytes.mark(length);
+        segmentWriteTime.update(elapsed, NANOSECONDS);
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java Tue Aug  4 14:13:58 2020
@@ -1,16 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 package org.apache.jackrabbit.oak.segment.remote.persistentcache;
 
 import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
 
 import com.google.common.io.Closer;
 
+import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider;
 import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.ConfigurationPolicy;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 
 import java.io.File;
 import java.io.IOException;
@@ -26,8 +51,14 @@ public class RemotePersistentCacheServic
 
     private final Closer closer = Closer.create();
 
+    private OsgiWhiteboard osgiWhiteboard;
+
+    @Reference
+    private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
+
     @Activate
     public void activate(ComponentContext context, Configuration config) throws IOException {
+        osgiWhiteboard = new OsgiWhiteboard(context.getBundleContext());
         persistentCache = createPersistentCache(config, closer);
         registration = context.getBundleContext().registerService(PersistentCache.class.getName(), persistentCache, new Properties());
     }
@@ -42,24 +73,47 @@ public class RemotePersistentCacheServic
         persistentCache = null;
     }
 
-    private static PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+    protected void registerCloseable(final Registration registration) {
+        closer.register(registration::unregister);
+    }
+
+    protected  <T> Registration registerMBean(Class<T> clazz, T bean, String type, String name) {
+        return WhiteboardUtils.registerMBean(osgiWhiteboard, clazz, bean, type, name);
+    }
+
+    private PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+
+        RoleStatisticsProvider roleStatisticsProvider = new RoleStatisticsProvider(statisticsProvider, "remote_persistence");
+
+        DiskCacheIOMonitor diskCacheIOMonitor = new DiskCacheIOMonitor(roleStatisticsProvider);
+        RedisCacheIOMonitor redisCacheIOMonitor = new RedisCacheIOMonitor(roleStatisticsProvider);
+
         if (configuration.diskCacheEnabled()) {
-            PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB());
+            PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB(), diskCacheIOMonitor);
             closer.register(persistentDiskCache);
 
+            CacheStatsMBean diskCacheStatsMBean = persistentDiskCache.getCacheStats();
+            registerCloseable(registerMBean(CacheStatsMBean.class, diskCacheStatsMBean, CacheStats.TYPE, diskCacheStatsMBean.getName()));
+
             if (configuration.redisCacheEnabled()) {
                 PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
-                        configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+                        configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
                 persistentDiskCache.linkWith(redisCache);
                 closer.register(redisCache);
+
+                CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
+                registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));
             }
 
             return persistentDiskCache;
         } else if (configuration.redisCacheEnabled()) {
             PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
-                    configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+                    configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
             closer.register(redisCache);
 
+            CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
+            registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));
+
             return redisCache;
         }
 

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java Tue Aug  4 14:13:58 2020
@@ -104,7 +104,7 @@ public abstract class AbstractPersistent
             final TestSegment segment = testSegments.get(nSegment);
             final long[] id = segment.getSegmentId();
             try {
-                final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+                final Buffer segmentRead = persistentCache.readSegment(id[0], id[1], () -> null);
                 segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
             } finally {
                 done.incrementAndGet();
@@ -146,7 +146,7 @@ public abstract class AbstractPersistent
                 if (persistentCache.containsSegment(msb, lsb)) {
                     containsFailures.incrementAndGet();
                 }
-                if (persistentCache.readSegment(msb, lsb) != null) {
+                if (persistentCache.readSegment(msb, lsb, () -> null) != null) {
                     readFailures.incrementAndGet();
                 }
             } catch (Throwable t) {
@@ -184,7 +184,7 @@ public abstract class AbstractPersistent
                 if (!persistentCache.containsSegment(segmentId[0], segmentId[1])) {
                     containsFailures.incrementAndGet();
                 }
-                if (persistentCache.readSegment(segmentId[0], segmentId[1]) == null) {
+                if (persistentCache.readSegment(segmentId[0], segmentId[1], () -> null) == null) {
                     readFailures.incrementAndGet();
                 }
             } catch (Throwable t) {
@@ -219,7 +219,7 @@ public abstract class AbstractPersistent
 
         waitWhile.accept(() -> done.get() < SEGMENTS);
 
-        Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1]);
+        Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1], () -> null);
         assertNotNull("The segment was not found", segmentRead);
         assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead);
     }

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java Tue Aug  4 14:13:58 2020
@@ -18,19 +18,29 @@
 package org.apache.jackrabbit.oak.segment.remote.persistentcache;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.File;
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class PersistentDiskCacheTest extends AbstractPersistentCacheTest {
 
@@ -39,13 +49,13 @@ public class PersistentDiskCacheTest ext
 
     @Before
     public void setUp() throws Exception {
-        persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024);
+        persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024, new IOMonitorAdapter());
     }
 
     @Test
     public void cleanupTest() throws Exception {
         persistentCache.close();
-        persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0);
+        persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0, new IOMonitorAdapter());
         final List<TestSegment> testSegments = new ArrayList<>(SEGMENTS);
         final List<Map<String, Buffer>> segmentsRead = new ArrayList<>(THREADS);
 
@@ -86,7 +96,7 @@ public class PersistentDiskCacheTest ext
             final long[] id = segment.getSegmentId();
             try {
                 final Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(nThread);
-                final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+                final Buffer segmentRead = persistentCache.readSegment(id[0], id[1], () -> null);
                 segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
             } catch (Throwable t) {
                 errors.incrementAndGet();
@@ -115,4 +125,30 @@ public class PersistentDiskCacheTest ext
         }
         assertEquals("Segment(s) not cleaned up in cache", 0, SEGMENTS - errors.get());
     }
+
+    @Test
+    public void testIOMonitor() throws IOException {
+        IOMonitorAdapter ioMonitorAdapter = Mockito.mock(IOMonitorAdapter.class);
+
+        persistentCache.close();
+        File cacheFolder = temporaryFolder.newFolder();
+        persistentCache = new PersistentDiskCache(cacheFolder, 0, ioMonitorAdapter);
+
+        UUID segmentUUID = UUID.randomUUID();
+
+        persistentCache.readSegment(segmentUUID.getMostSignificantBits(), segmentUUID.getLeastSignificantBits(), () -> null);
+
+        //Segment not in cache, monitor methods not invoked
+        verify(ioMonitorAdapter, never()).beforeSegmentRead(any(), anyLong(), anyLong(), anyInt());
+        verify(ioMonitorAdapter, never()).afterSegmentRead(any(), anyLong(), anyLong(), anyInt(), anyLong());
+
+        //place segment in disk cache
+        File segmentFile = new File(cacheFolder, segmentUUID.toString());
+        segmentFile.createNewFile();
+
+        persistentCache.readSegment(segmentUUID.getMostSignificantBits(), segmentUUID.getLeastSignificantBits(), () -> null);
+
+        verify(ioMonitorAdapter, times(1)).beforeSegmentRead(eq(segmentFile), eq(segmentUUID.getMostSignificantBits()), eq(segmentUUID.getLeastSignificantBits()), anyInt());
+        verify(ioMonitorAdapter, times(1)).afterSegmentRead(eq(segmentFile), eq(segmentUUID.getMostSignificantBits()), eq(segmentUUID.getLeastSignificantBits()), anyInt(), anyLong());
+    }
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java Tue Aug  4 14:13:58 2020
@@ -17,19 +17,37 @@
  */
 package org.apache.jackrabbit.oak.segment.remote.persistentcache;
 
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
 import redis.embedded.RedisServer;
 
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 public class PersistentRedisCacheTest extends AbstractPersistentCacheTest {
 
     private RedisServer redisServer;
+    private IOMonitorAdapter ioMonitorAdapter;
 
     @Before
     public void setUp() throws Exception {
         redisServer = RedisServer.builder().build();
         redisServer.start();
         int port = redisServer.ports().get(0);
+        ioMonitorAdapter = Mockito.mock(IOMonitorAdapter.class);
+
         persistentCache = new PersistentRedisCache(
                 "localhost",
                 port,
@@ -38,7 +56,9 @@ public class PersistentRedisCacheTest ex
                 50,
                 10,
                 2000,
-                200000
+                200000,
+                0,
+                ioMonitorAdapter
         );
     }
 
@@ -46,4 +66,26 @@ public class PersistentRedisCacheTest ex
     public void tearDown() throws Exception {
         redisServer.stop();
     }
+
+    @Test
+    public void testIOMonitor() throws IOException, InterruptedException {
+
+        UUID segmentUUID = UUID.randomUUID();
+        long msb = segmentUUID.getMostSignificantBits();
+        long lsb = segmentUUID.getLeastSignificantBits();
+
+        persistentCache.readSegment(msb, lsb, () -> null);
+
+        //Segment not in cache, monitor methods not invoked
+        verify(ioMonitorAdapter, never()).afterSegmentRead(any(), anyLong(), anyLong(), anyInt(), anyLong());
+
+        persistentCache.writeSegment(msb, lsb, Buffer.wrap("segment_content".getBytes()));
+
+        Thread.sleep(300);
+
+        persistentCache.readSegment(msb, lsb, () -> null);
+
+        verify(ioMonitorAdapter, times(1)).afterSegmentRead(any(), eq(msb), eq(lsb), anyInt(), anyLong());
+    }
+
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-segment-tar/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/pom.xml?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/pom.xml Tue Aug  4 14:13:58 2020
@@ -48,7 +48,8 @@
                         	org.apache.jackrabbit.oak.segment.spi,
                             org.apache.jackrabbit.oak.segment.spi.monitor,
                             org.apache.jackrabbit.oak.segment.spi.persistence,
-                            org.apache.jackrabbit.oak.segment.spi.persistence.split
+                            org.apache.jackrabbit.oak.segment.spi.persistence.split,
+                            org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache
                         </Export-Package>
                         <Embed-Dependency>
                             netty-*,

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java Tue Aug  4 14:13:58 2020
@@ -47,6 +47,7 @@ import org.apache.jackrabbit.api.stats.R
 import org.apache.jackrabbit.api.stats.TimeSeries;
 import org.apache.jackrabbit.oak.osgi.OsgiUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
 import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -641,51 +642,6 @@ public class SegmentNodeStoreFactory {
     }
 
     private static StatisticsProvider getRoleStatisticsProvider(StatisticsProvider delegate, String role) {
-        RepositoryStatistics repositoryStatistics = new RepositoryStatistics() {
-
-            @Override
-            public TimeSeries getTimeSeries(Type type) {
-                return getTimeSeries(type.name(), type.isResetValueEachSecond());
-            }
-
-            @Override
-            public TimeSeries getTimeSeries(String type, boolean resetValueEachSecond) {
-                return delegate.getStats().getTimeSeries(addRoleToName(type, role), resetValueEachSecond);
-            }
-        };
-
-        return new StatisticsProvider() {
-
-            @Override
-            public RepositoryStatistics getStats() {
-                return repositoryStatistics;
-            }
-
-            @Override
-            public MeterStats getMeter(String name, StatsOptions options) {
-                return delegate.getMeter(addRoleToName(name, role), options);
-            }
-
-            @Override
-            public CounterStats getCounterStats(String name, StatsOptions options) {
-                return delegate.getCounterStats(addRoleToName(name, role), options);
-            }
-
-            @Override
-            public TimerStats getTimer(String name, StatsOptions options) {
-                return delegate.getTimer(addRoleToName(name, role), options);
-            }
-
-            @Override
-            public HistogramStats getHistogram(String name, StatsOptions options) {
-                return delegate.getHistogram(addRoleToName(name, role), options);
-            }
-
-        };
+        return new RoleStatisticsProvider(delegate, role);
     }
-
-    private static String addRoleToName(String name, String role) {
-        return role + '.' + name;
-    }
-
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java Tue Aug  4 14:13:58 2020
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.monitor;
+
+import org.apache.jackrabbit.api.stats.RepositoryStatistics;
+import org.apache.jackrabbit.api.stats.TimeSeries;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+
+public class RoleStatisticsProvider implements StatisticsProvider{
+
+    private final StatisticsProvider delegate;
+    private final String role;
+    private final RepositoryStatistics repositoryStatistics;
+
+    public RoleStatisticsProvider(StatisticsProvider delegate, String role) {
+        this.delegate = delegate;
+        this.role = role;
+
+        this.repositoryStatistics = new RepositoryStatistics() {
+
+            @Override
+            public TimeSeries getTimeSeries(Type type) {
+                return getTimeSeries(type.name(), type.isResetValueEachSecond());
+            }
+
+            @Override
+            public TimeSeries getTimeSeries(String type, boolean resetValueEachSecond) {
+                return delegate.getStats().getTimeSeries(addRoleToName(type, role), resetValueEachSecond);
+            }
+        };
+    }
+
+    @Override
+    public RepositoryStatistics getStats() {
+        return repositoryStatistics;
+    }
+
+    @Override
+    public MeterStats getMeter(String name, StatsOptions options) {
+        return delegate.getMeter(addRoleToName(name, role), options);
+    }
+
+    @Override
+    public CounterStats getCounterStats(String name, StatsOptions options) {
+        return delegate.getCounterStats(addRoleToName(name, role), options);
+    }
+
+    @Override
+    public TimerStats getTimer(String name, StatsOptions options) {
+        return delegate.getTimer(addRoleToName(name, role), options);
+    }
+
+    @Override
+    public HistogramStats getHistogram(String name, StatsOptions options) {
+        return delegate.getHistogram(addRoleToName(name, role), options);
+    }
+
+    private static String addRoleToName(String name, String role) {
+        return role + '.' + name;
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java Tue Aug  4 14:13:58 2020
@@ -17,18 +17,26 @@
  */
 package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
 
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 public abstract class AbstractPersistentCache implements PersistentCache, Closeable {
     private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class);
 
@@ -39,17 +47,83 @@ public abstract class AbstractPersistent
     protected PersistentCache nextCache;
     protected final HashSet<String> writesPending;
 
+    protected SegmentCacheStats segmentCacheStats;
+
     public AbstractPersistentCache() {
         executor = Executors.newFixedThreadPool(THREADS);
         writesPending = new HashSet<>();
     }
 
-    public PersistentCache linkWith(PersistentCache nextCache) {
+    public PersistentCache linkWith(AbstractPersistentCache nextCache) {
         this.nextCache = nextCache;
         return nextCache;
     }
 
     @Override
+    public Buffer readSegment(long msb, long lsb, @NotNull Callable<Buffer> loader) {
+        Buffer segment = readSegmentInternal(msb, lsb);
+        if (segment != null) {
+            segmentCacheStats.hitCount.incrementAndGet();
+            return segment;
+        }
+        segmentCacheStats.missCount.incrementAndGet();
+
+        // Either use the next cache or the 'loader'
+        Callable<Buffer> nextLoader = nextCache != null
+                ? () -> nextCache.readSegment(msb, lsb, loader)
+                : loader;
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            segment = nextLoader.call();
+
+            if (segment != null) {
+                recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), true);
+                writeSegment(msb, lsb, segment);
+            }
+
+            return segment;
+        } catch (Exception t) {
+            logger.error("Exception while loading segment {} from remote store or linked cache", new UUID(msb, lsb), t);
+            recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), false);
+        }
+        return segment;
+    }
+
+    /**
+     * Reads the segment from the cache.
+     * If segment is not found, this method does not query next cache that was set with {@link #linkWith(AbstractPersistentCache)}
+     *
+     * @param msb the most significant bits of the identifier of the segment
+     * @param lsb the least significant bits of the identifier of the segment
+     * @return byte buffer containing the segment data or null if the segment doesn't exist
+     */
+    protected abstract Buffer readSegmentInternal(long msb, long lsb);
+
+    /**
+     * Records time spent to load data from external source, after cache miss.
+     *
+     * @param loadTime   load time in nanoseconds
+     * @param successful indicates whether loading of the segment into cache was successful
+     */
+    protected final void recordCacheLoadTimeInternal(long loadTime, boolean successful) {
+        if (successful) {
+            segmentCacheStats.loadSuccessCount.incrementAndGet();
+        } else {
+            segmentCacheStats.loadExceptionCount.incrementAndGet();
+        }
+        segmentCacheStats.loadTime.addAndGet(loadTime);
+    }
+
+    /**
+     * @return Statistics for this cache.
+     */
+    @NotNull
+    public AbstractCacheStats getCacheStats() {
+        return segmentCacheStats;
+    }
+
+    @Override
     public void close() {
         try {
             executor.shutdown();

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java Tue Aug  4 14:13:58 2020
@@ -45,14 +45,7 @@ public class CachingSegmentArchiveReader
     @Override
     @Nullable
     public Buffer readSegment(long msb, long lsb) throws IOException {
-        Buffer buffer = persistentCache.readSegment(msb, lsb);
-        if (buffer == null) {
-            buffer = delegate.readSegment(msb, lsb);
-            if (buffer != null) {
-                persistentCache.writeSegment(msb, lsb, buffer);
-            }
-        }
-        return buffer;
+        return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb));
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java?rev=1880569&r1=1880568&r2=1880569&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java Tue Aug  4 14:13:58 2020
@@ -18,8 +18,11 @@
 package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.concurrent.Callable;
+
 /**
  * This interface represents a cache which survives segment store restarts.
  * The cache is agnostic to any archive structure. Segments are only
@@ -32,10 +35,11 @@ public interface PersistentCache {
      *
      * @param msb the most significant bits of the identifier of the segment
      * @param lsb the least significant bits of the identifier of the segment
+     * @param loader in case of cache miss, with {@code loader.call()} missing element will be retrieved
      * @return byte buffer containing the segment data or null if the segment doesn't exist
      */
     @Nullable
-    Buffer readSegment(long msb, long lsb);
+    Buffer readSegment(long msb, long lsb, @NotNull Callable<Buffer> loader);
 
     /**
      * Check if the segment exists in the cache.

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java Tue Aug  4 14:13:58 2020
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import com.google.common.cache.CacheStats;
+import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class SegmentCacheStats extends AbstractCacheStats {
+    private final @NotNull Supplier<Long> maximumWeight;
+
+    @NotNull
+    private final Supplier<Long> elementCount;
+
+    @NotNull
+    final Supplier<Long> currentWeight;
+
+    @NotNull
+    final AtomicLong loadSuccessCount = new AtomicLong();
+
+    @NotNull
+    final AtomicInteger loadExceptionCount = new AtomicInteger();
+
+    @NotNull
+    final AtomicLong loadTime = new AtomicLong();
+
+    @NotNull
+    final Supplier<Long> evictionCount;
+
+    @NotNull
+    final AtomicLong hitCount = new AtomicLong();
+
+    @NotNull
+    final AtomicLong missCount = new AtomicLong();
+
+    public SegmentCacheStats(@NotNull String name,
+                             @NotNull Supplier<Long> maximumWeight,
+                             @NotNull Supplier<Long> elementCount,
+                             @NotNull Supplier<Long> currentWeight,
+                             @NotNull Supplier<Long> evictionCount) {
+        super(name);
+        this.maximumWeight = maximumWeight;
+        this.elementCount = checkNotNull(elementCount);
+        this.currentWeight = checkNotNull(currentWeight);
+        this.evictionCount = evictionCount;
+    }
+
+    @Override
+    protected CacheStats getCurrentStats() {
+        return new CacheStats(
+                hitCount.get(),
+                missCount.get(),
+                loadSuccessCount.get(),
+                loadExceptionCount.get(),
+                loadTime.get(),
+                evictionCount.get()
+        );
+    }
+
+    @Override
+    public long getElementCount() {
+        return elementCount.get();
+    }
+
+    @Override
+    public long getMaxTotalWeight() {
+        return maximumWeight.get();
+    }
+
+    @Override
+    public long estimateCurrentWeight() {
+        return currentWeight.get();
+    }
+}
+

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java Tue Aug  4 14:13:58 2020
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Internal(since = "1.0.0")
+@Version("1.0.0")
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.annotations.Internal;
+import org.osgi.annotation.versioning.Version;
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java?rev=1880569&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java Tue Aug  4 14:13:58 2020
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PersistentCacheStatsTest {
+
+    @Test
+    public void testCacheStats() {
+        AbstractPersistentCache cache = new PersistentCacheImpl();
+
+        cache.writeSegment(1, 1, Buffer.wrap(new byte[]{1}));
+
+        //segment in cache
+        Buffer segment = cache.readSegment(1, 1, () -> null);
+        assertNotNull(segment);
+
+        assertEquals(1, cache.getCacheStats().getHitCount());
+        long loadTime = cache.getCacheStats().getTotalLoadTime();
+        assertEquals(0, loadTime);
+
+
+        //segment not in cache but loaded from remote store
+        segment = cache.readSegment(0, 0, () -> Buffer.wrap(new byte[]{0}));
+        assertNotNull(segment);
+        assertEquals(1, cache.getCacheStats().getMissCount());
+        loadTime = cache.getCacheStats().getTotalLoadTime();
+        assertTrue(loadTime > 0);
+
+        //segment not in cache and exception while loading from remote store
+        segment = cache.readSegment(2, 2, () -> {
+            throw new Exception();
+        });
+        assertNull(segment);
+        long loadTime2 = cache.getCacheStats().getTotalLoadTime();
+        assertTrue(loadTime2 > loadTime);
+        assertEquals(2, cache.getCacheStats().getMissCount());
+        assertEquals(1, cache.getCacheStats().getLoadExceptionCount());
+
+        cache.close();
+    }
+
+    @Test
+    public void testCacheStatsForLinkedCaches() {
+        AbstractPersistentCache cache1 = new PersistentCacheImpl();
+        AbstractPersistentCache cache2 = new PersistentCacheImpl();
+
+        cache1.linkWith(cache2);
+
+        //segment not in either cache
+        Buffer segment = cache1.readSegment(0 ,0, () -> null);
+
+        assertNull(segment);
+        assertEquals(1, cache1.getCacheStats().getMissCount());
+        assertEquals(1, cache2.getCacheStats().getMissCount());
+        assertEquals(0, cache1.getCacheStats().getHitCount());
+        assertEquals(0, cache2.getCacheStats().getHitCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+        //segment in first cache
+        cache1.writeSegment(1, 1, Buffer.wrap(new byte[]{1}));
+        segment = cache1.readSegment(1 ,1, () -> null);
+
+        assertNotNull(segment);
+        assertEquals(1, cache1.getCacheStats().getMissCount());
+        assertEquals(1, cache2.getCacheStats().getMissCount());
+        assertEquals(1, cache1.getCacheStats().getHitCount());
+        assertEquals(0, cache2.getCacheStats().getHitCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+        //segment in second cache
+        cache2.writeSegment(2, 2, Buffer.wrap(new byte[]{2}));
+        segment = cache1.readSegment(2 ,2, () -> null);
+
+        assertNotNull(segment);
+        assertEquals(2, cache1.getCacheStats().getMissCount());
+        assertEquals(1, cache2.getCacheStats().getMissCount());
+        assertEquals(1, cache1.getCacheStats().getHitCount());
+        assertEquals(1, cache2.getCacheStats().getHitCount());
+        assertEquals(1, cache1.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+        //segment loaded from the remote storage
+        segment = cache1.readSegment(3 ,3, () -> Buffer.wrap(new byte[]{3}));
+
+        assertNotNull(segment);
+        assertEquals(3, cache1.getCacheStats().getMissCount());
+        assertEquals(2, cache2.getCacheStats().getMissCount());
+        assertEquals(1, cache1.getCacheStats().getHitCount());
+        assertEquals(1, cache2.getCacheStats().getHitCount());
+        assertEquals(2, cache1.segmentCacheStats.getLoadCount());
+        assertEquals(1, cache2.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+        //exception while loading segment from the remote storage
+        segment = cache1.readSegment(4 ,4, () -> {
+            throw new Exception();
+        });
+
+        assertNull(segment);
+        assertEquals(4, cache1.getCacheStats().getMissCount());
+        assertEquals(3, cache2.getCacheStats().getMissCount());
+        assertEquals(1, cache1.getCacheStats().getHitCount());
+        assertEquals(1, cache2.getCacheStats().getHitCount());
+        assertEquals(2, cache1.segmentCacheStats.getLoadCount());
+        assertEquals(2, cache2.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount());
+        assertEquals(1, cache2.segmentCacheStats.getLoadExceptionCount());
+
+        //linked cache throws exception
+        cache2 = new PersistentCacheImpl(){
+            @Override
+            protected Buffer readSegmentInternal(long msb, long lsb) {
+                throw new RuntimeException();
+            }
+        };
+        cache1.linkWith(cache2);
+        segment = cache1.readSegment(5 ,5, () -> Buffer.wrap(new byte[]{5}));
+
+        assertNull(segment);
+        assertEquals(5, cache1.getCacheStats().getMissCount());
+        assertEquals(0, cache2.getCacheStats().getMissCount());
+        assertEquals(1, cache1.getCacheStats().getHitCount());
+        assertEquals(0, cache2.getCacheStats().getHitCount());
+        assertEquals(3, cache1.segmentCacheStats.getLoadCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadCount());
+        assertEquals(1, cache1.segmentCacheStats.getLoadExceptionCount());
+        assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount());
+
+        cache2.close();
+        cache1.close();
+    }
+
+    class PersistentCacheImpl extends AbstractPersistentCache {
+        HashMap<UUID, Buffer> segments = new HashMap<>();
+
+        public PersistentCacheImpl() {
+            segmentCacheStats = new SegmentCacheStats("stats", () -> maximumWeight, () -> elementCount.get(), () -> currentWeight.get(), () -> evictionCount.get());
+        }
+
+        long maximumWeight = Long.MAX_VALUE;
+        AtomicLong elementCount = new AtomicLong();
+        AtomicLong currentWeight = new AtomicLong();
+        AtomicLong evictionCount = new AtomicLong();
+
+        void AbstractPersistentCache() {
+            segmentCacheStats = new SegmentCacheStats("stats", () -> maximumWeight, () -> elementCount.get(), () -> currentWeight.get(), () -> evictionCount.get());
+        }
+
+        @Override
+        public boolean containsSegment(long msb, long lsb) {
+            return segments.containsKey(new UUID(msb, lsb));
+        }
+
+        @Override
+        public void writeSegment(long msb, long lsb, Buffer buffer) {
+            segments.put(new UUID(msb, lsb), buffer);
+        }
+
+        @Override
+        public void cleanUp() {
+
+        }
+
+        @Override
+        protected Buffer readSegmentInternal(long msb, long lsb) {
+            return segments.get(new UUID(msb, lsb));
+        }
+    }
+}