You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/08/04 14:13:48 UTC

svn commit: r1880568 - in /jackrabbit/oak/trunk: oak-run/ oak-segment-remote/ oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/ oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persisten...

Author: adulceanu
Date: Tue Aug  4 14:13:48 2020
New Revision: 1880568

URL: http://svn.apache.org/viewvc?rev=1880568&view=rev
Log:
OAK-7744 - Persistent cache for the Segment Node Store

Added:
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
Modified:
    jackrabbit/oak/trunk/oak-run/pom.xml
    jackrabbit/oak/trunk/oak-segment-remote/pom.xml
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java

Modified: jackrabbit/oak/trunk/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/pom.xml?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-run/pom.xml Tue Aug  4 14:13:48 2020
@@ -35,6 +35,7 @@
     <groovy.version>2.4.17</groovy.version>
     <!--
       Size History:
+      + 55 MB Add support for 3rd level cache (GRANITE-30735)
       + 54 MB AWS support for segment-tar (OAK-8827)
       + 52 MB AWS java sdk update (OAK-8875)
       + 51 MB AWS java sdk update (OAK-7536)
@@ -45,7 +46,7 @@
       + 41 MB build failing on the release profile (OAK-6250)
       + 38 MB. Initial value. Current 35MB plus a 10%
     -->
-    <max.jar.size>54000000</max.jar.size>
+    <max.jar.size>57671680</max.jar.size>
   </properties>
 
   <build>

Modified: jackrabbit/oak/trunk/oak-segment-remote/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/pom.xml?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/pom.xml Tue Aug  4 14:13:48 2020
@@ -39,11 +39,18 @@
                 <!-- OAK-7182 -->${guava.osgi.import},
                   org.apache.jackrabbit.oak.segment.spi*,
                   !org.apache.jackrabbit.oak.segment*,
+				  !net.sf.cglib.asm.util,
+                  !org.apache.tools.ant,
+                  !org.apache.tools.ant.types,
                   *
               </Import-Package>
               <Export-Package>
-              	org.apache.jackrabbit.oak.segment.remote*
+              	org.apache.jackrabbit.oak.segment.remote
               </Export-Package>
+              <Embed-Dependency>
+                org.apache.servicemix.bundles.*,
+                commons-pool2
+              </Embed-Dependency>
             </instructions>
           </configuration>
         </plugin>
@@ -108,6 +115,23 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <!-- Redis client for segment cache -->
+       <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.jedis</artifactId>
+            <version>3.3.0_1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.cglib</artifactId>
+            <version>3.1_1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+            <version>2.6.2</version>
+        </dependency>
         
         <!-- Test dependencies -->
         <dependency>
@@ -120,5 +144,10 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.kstyrc</groupId>
+            <artifactId>embedded-redis</artifactId>
+            <version>0.6</version>
+        </dependency>
   </dependencies>
 </project>

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,89 @@
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.Configuration.PID;
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentDiskCache.DEFAULT_MAX_CACHE_SIZE_MB;
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentRedisCache.DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+        pid = {PID},
+        name = "Apache Jackrabbit Oak Remote Persistent Cache Service",
+        description = "Persistent cache for the Oak Segment Node Store")
+public @interface Configuration {
+
+    String PID = "org.apache.jackrabbit.oak.segment.remote.RemotePersistentCacheService";
+
+    @AttributeDefinition(
+            name = "Disk cache persistence",
+            description = "Boolean value indicating that the local disk persisted cache should be used for segment store"
+    )
+    boolean diskCacheEnabled() default false;
+
+    @AttributeDefinition(
+            name = "Disk cache persistence directory",
+            description = "Path on the file system where disk cache persistence data will be stored."
+    )
+    String diskCacheDirectory() default "";
+
+    @AttributeDefinition(
+            name = "Disk cache persistence maximum size",
+            description = "Disk cache size (in MB). Default value is " + DEFAULT_MAX_CACHE_SIZE_MB
+    )
+    int diskCacheMaxSizeMB() default DEFAULT_MAX_CACHE_SIZE_MB;
+
+    @AttributeDefinition(
+            name = "Redis cache persistence",
+            description = "Boolean value indicating that the redis persisted cache should be used for segment store"
+    )
+    boolean redisCacheEnabled() default false;
+
+    @AttributeDefinition(
+            name = "Redis cache persistence host",
+            description = "Remote redis server host"
+    )
+    String redisCacheHost() default "";
+
+    @AttributeDefinition(
+            name = "Redis cache persistence port",
+            description = "Remote redis server port (0 = default)"
+    )
+    int redisCachePort() default 0;
+
+    @AttributeDefinition(
+            name = "Redis cache persistence entries expiry interval",
+            description = "Number of seconds to keep the entries in the cache. Default value is " + DEFAULT_REDIS_CACHE_EXPIRE_SECONDS
+    )
+    int redisCacheExpireSeconds() default DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
+
+    @AttributeDefinition(
+            name = "Redis socket timeout",
+            description = "Number of seconds to wait for response for request"
+    )
+    int redisSocketTimeout() default 10;
+
+    @AttributeDefinition(
+            name = "Redis connection timeout",
+            description = "Number of seconds to wait for redis connection to be established"
+    )
+    int redisConnectionTimeout() default 50;
+
+    @AttributeDefinition(
+            name = "Redis Minimum Connections",
+            description = "Minimum number of established connections that need to be kept in the pool"
+    )
+    int redisMinConnections() default 10;
+
+    @AttributeDefinition(
+            name = "Redis Maximum Connections",
+            description = "Maximum number of connections required by the business"
+    )
+    int redisMaxConnections() default 100;
+
+    @AttributeDefinition(
+            name = "Redis Maximum Total Connections",
+            description = "Includes the number of idle connections as a surplus"
+    )
+    int redisMaxTotalConnections() default 200;
+}

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
+import java.util.Comparator;
+import java.util.Spliterator;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+public class PersistentDiskCache extends AbstractPersistentCache {
+    private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class);
+    public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512;
+
+    private final File directory;
+    private final long maxCacheSizeBytes;
+
+    final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
+
+    private static final Comparator<Path> sortedByAccessTime = (path1, path2) -> {
+        try {
+            FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime();
+            FileTime lastAccessFile2 = Files.readAttributes(path2, BasicFileAttributes.class).lastAccessTime();
+            return lastAccessFile1.compareTo(lastAccessFile2);
+        } catch (IOException e) {
+            logger.error("A problem occurred while cleaning up the cache: ", e);
+        }
+        return 0;
+    };
+
+    public PersistentDiskCache(File directory, int cacheMaxSizeMB) {
+        this.directory = directory;
+        this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L;
+
+        if (!directory.exists()) {
+            directory.mkdirs();
+        }
+
+        cacheSize.set(FileUtils.sizeOfDirectory(directory));
+    }
+
+    @Override
+    public Buffer readSegment(long msb, long lsb) {
+        String segmentId = new UUID(msb, lsb).toString();
+        File segmentFile = new File(directory, segmentId);
+
+        if (segmentFile.exists()) {
+            try (FileInputStream fis = new FileInputStream(segmentFile);
+                 FileChannel channel = fis.getChannel()) {
+                int length = (int) channel.size();
+
+                Buffer buffer = Buffer.allocateDirect(length);
+                if (buffer.readFully(channel, 0) < length) {
+                    throw new EOFException();
+                }
+                buffer.flip();
+
+                return buffer;
+            } catch (FileNotFoundException e) {
+                logger.info("Segment {} deleted from file system!", segmentId);
+            } catch (IOException e) {
+                logger.error("Error loading segment {} from cache: {}", segmentId, e);
+            }
+        }
+
+        if (nextCache != null) {
+            return nextCache.readSegment(msb, lsb);
+        }
+
+        return null;
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        return new File(directory, new UUID(msb, lsb).toString()).exists();
+    }
+
+    @Override
+    public void writeSegment(long msb, long lsb, Buffer buffer) {
+        String segmentId = new UUID(msb, lsb).toString();
+        File segmentFile = new File(directory, segmentId);
+        Buffer bufferCopy = buffer.duplicate();
+
+        Runnable task = () -> {
+            if (lockSegmentWrite(segmentId)) {
+                try (FileChannel channel = new FileOutputStream(segmentFile).getChannel()) {
+                    int fileSize = bufferCopy.write(channel);
+                    cacheSize.addAndGet(fileSize);
+                } catch (Throwable t) {
+                    logger.error("Error writing segment {} to cache: {}", segmentId, t);
+                } finally {
+                    unlockSegmentWrite(segmentId);
+                }
+            }
+            cleanUp();
+        };
+
+        executor.execute(task);
+
+        if (nextCache != null) {
+            nextCache.writeSegment(msb, lsb, buffer);
+        }
+    }
+
+    private boolean isCacheFull() {
+        return cacheSize.get() >= maxCacheSizeBytes;
+    }
+
+    @Override
+    public void cleanUp() {
+        if (!cleanupInProgress.getAndSet(true)) {
+            try {
+                cleanUpInternal();
+            } finally {
+                cleanupInProgress.set(false);
+            }
+        }
+    }
+
+    private void cleanUpInternal() {
+        if (isCacheFull()) {
+            try {
+                Stream<Path> segmentsPaths = Files.walk(directory.toPath())
+                        .sorted(sortedByAccessTime)
+                        .filter(filePath -> !filePath.toFile().isDirectory());
+
+                StreamConsumer.forEach(segmentsPaths, (path, breaker) -> {
+
+                    if (cacheSize.get() > maxCacheSizeBytes * 0.66) {
+                        cacheSize.addAndGet(-path.toFile().length());
+                        path.toFile().delete();
+                    } else {
+                        breaker.stop();
+                    }
+                });
+            } catch (IOException e) {
+                logger.error("A problem occurred while cleaning up the cache: ", e);
+            }
+        }
+    }
+
+    static class StreamConsumer {
+
+        public static class Breaker {
+            private boolean shouldBreak = false;
+
+            public void stop() {
+                shouldBreak = true;
+            }
+
+            boolean get() {
+                return shouldBreak;
+            }
+        }
+
+        public static <T> void forEach(Stream<T> stream, BiConsumer<T, Breaker> consumer) {
+            Spliterator<T> spliterator = stream.spliterator();
+            boolean hadNext = true;
+            Breaker breaker = new Breaker();
+
+            while (hadNext && !breaker.get()) {
+                hadNext = spliterator.tryAdvance(elem -> {
+                    consumer.accept(elem, breaker);
+                });
+            }
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.UUID;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class PersistentRedisCache extends AbstractPersistentCache {
+    private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class);
+    public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2;
+
+    private static final String REDIS_PREFIX = "SEGMENT";
+    private final int redisExpireSeconds;
+    private JedisPool redisPool;
+
+    public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout, int redisConnectionTimeout,
+                                int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections) {
+        this.redisExpireSeconds = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+
+        if (redisPort == 0) {
+            redisPort = 6379;
+        }
+
+        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+        jedisPoolConfig.setTestOnBorrow(true);
+        jedisPoolConfig.setMaxWaitMillis(redisSocketTimeout);
+        jedisPoolConfig.setMinIdle(redisMinConnections);
+        jedisPoolConfig.setMaxIdle(redisMaxConnections);
+        jedisPoolConfig.setMaxTotal(redisMaxTotalConnections);
+
+        this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout, redisSocketTimeout, null, Protocol.DEFAULT_DATABASE, null);
+    }
+
+    @Override
+    public Buffer readSegment(long msb, long lsb) {
+        String segmentId = new UUID(msb, lsb).toString();
+
+        try(Jedis redis = redisPool.getResource()) {
+            final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes());
+            if (bytes != null) {
+                Buffer buffer = Buffer.allocateDirect(bytes.length);
+
+                buffer.put(bytes);
+                buffer.flip();
+                return buffer;
+            }
+        } catch (JedisException e) {
+            logger.error("Error loading segment {} from cache: {}", segmentId, e);
+        }
+
+        if (nextCache != null) {
+            return nextCache.readSegment(msb, lsb);
+        }
+
+        return null;
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        String segmentId = new UUID(msb, lsb).toString();
+
+        try(Jedis redis = redisPool.getResource()) {
+            return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes());
+        } catch (JedisException e) {
+            logger.error("Error checking segment existence {} in cache: {}", segmentId, e);
+        }
+
+        return false;
+    }
+
+    @Override
+    public void writeSegment(long msb, long lsb, Buffer buffer){
+        String segmentId = new UUID(msb, lsb).toString();
+        Buffer bufferCopy = buffer.duplicate();
+
+        Runnable task = () -> {
+            if (lockSegmentWrite(segmentId)) {
+                final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                try (WritableByteChannel channel = Channels.newChannel(bos); Jedis redis = redisPool.getResource()) {
+                    while (bufferCopy.hasRemaining()) {
+                        bufferCopy.write(channel);
+                    }
+                    final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes();
+                    redis.set(key, bos.toByteArray());
+                    redis.expire(key, redisExpireSeconds);
+                    cacheSize.addAndGet(bos.size());
+                } catch (Throwable t) {
+                    logger.error("Error writing segment {} to cache: {}", segmentId, t);
+                } finally {
+                    unlockSegmentWrite(segmentId);
+                }
+            }
+        };
+
+        executor.execute(task);
+
+        if (nextCache != null) {
+            nextCache.writeSegment(msb, lsb, buffer);
+        }
+    }
+
+    @Override
+    public void cleanUp() {
+        // do nothing
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,68 @@
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+
+import com.google.common.io.Closer;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+@Component(
+        configurationPolicy = ConfigurationPolicy.REQUIRE,
+        configurationPid = {Configuration.PID})
+public class RemotePersistentCacheService {
+    private ServiceRegistration registration;
+
+    private PersistentCache persistentCache;
+
+    private final Closer closer = Closer.create();
+
+    @Activate
+    public void activate(ComponentContext context, Configuration config) throws IOException {
+        persistentCache = createPersistentCache(config, closer);
+        registration = context.getBundleContext().registerService(PersistentCache.class.getName(), persistentCache, new Properties());
+    }
+
+    @Deactivate
+    public void deactivate() throws IOException {
+        if (registration != null) {
+            registration.unregister();
+            registration = null;
+        }
+        closeQuietly(closer);
+        persistentCache = null;
+    }
+
+    private static PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+        if (configuration.diskCacheEnabled()) {
+            PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB());
+            closer.register(persistentDiskCache);
+
+            if (configuration.redisCacheEnabled()) {
+                PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
+                        configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+                persistentDiskCache.linkWith(redisCache);
+                closer.register(redisCache);
+            }
+
+            return persistentDiskCache;
+        } else if (configuration.redisCacheEnabled()) {
+            PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
+                    configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+            closer.register(redisCache);
+
+            return redisCache;
+        }
+
+        return null;
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.*;
+
+public abstract class AbstractPersistentCacheTest {
+
+    protected static final int SEGMENTS = 1000;
+    protected static final int THREADS = 50;
+    protected static final int SEGMENTS_PER_THREAD = SEGMENTS / THREADS;
+    protected static final int TIMEOUT_COUNT = 50;
+
+    protected static final Executor executor = Executors.newFixedThreadPool(THREADS);
+
+    protected static final Consumer<BiConsumer<Integer, Integer>> runConcurrently = r -> {
+        for (int i = 0; i < THREADS; ++i) {
+            int finalI = i;
+            executor.execute(() -> {
+                for (int j = finalI * SEGMENTS_PER_THREAD; j < (finalI + 1) * SEGMENTS_PER_THREAD; ++j) {
+                    r.accept(finalI, j);
+                }
+            });
+        }
+    };
+
+    protected AbstractPersistentCache persistentCache;
+
+    final AtomicInteger errors = new AtomicInteger(0);
+    final AtomicInteger done = new AtomicInteger(0);
+    int count; // for checking timeouts
+
+    protected Consumer<Supplier<Boolean>> waitWhile = (r -> {
+        for (count = 0; r.get() && count < TIMEOUT_COUNT; ++count) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+            }
+        }
+    });
+
+    @Test
+    public void writeAndReadManySegments() throws Exception {
+        final List<TestSegment> testSegments = new ArrayList<>(SEGMENTS);
+        final List<Map<String, Buffer>> segmentsRead = new ArrayList<>(THREADS);
+
+        for (int i = 0; i < SEGMENTS; ++i) {
+            testSegments.add(TestSegment.createSegment());
+        }
+
+        for (int i = 0; i < THREADS; ++i) {
+            final Map<String, Buffer> segmentsReadThisThread = new HashMap<>(SEGMENTS_PER_THREAD);
+            segmentsRead.add(segmentsReadThisThread);
+        }
+
+        runConcurrently.accept((nThread, nSegment) -> {
+            TestSegment segment = testSegments.get(nSegment);
+            long[] id = segment.getSegmentId();
+            try {
+                persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer());
+            } catch (Throwable t) {
+                errors.incrementAndGet();
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+        waitWhile.accept(() -> persistentCache.getWritesPending() > 0);
+
+        assertEquals("Errors have occurred while writing", 0, errors.get());
+        assertNoTimeout();
+
+        done.set(0);
+        runConcurrently.accept((nThread, nSegment) -> {
+            final Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(nThread);
+            final TestSegment segment = testSegments.get(nSegment);
+            final long[] id = segment.getSegmentId();
+            try {
+                final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+                segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+
+        assertNoTimeout();
+        assertEquals("Errors have occurred while reading", 0, errors.get());
+
+        for (int i = 0; i < THREADS; ++i) {
+            for (int j = i * SEGMENTS_PER_THREAD; j < (i + 1) * SEGMENTS_PER_THREAD; ++j) {
+                TestSegment testSegment = testSegments.get(j);
+                Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(i);
+                long[] segmentReadId = testSegment.getSegmentId();
+                Buffer segmentRead = segmentsReadThisThread.get(new UUID(segmentReadId[0], segmentReadId[1]).toString());
+                if (segmentRead == null) {
+                    errors.incrementAndGet();
+                    continue;
+                }
+                assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead);
+            }
+        }
+        assertEquals("Segment(s) not found in cache", 0, errors.get());
+    }
+
+    @Test
+    public void testNonExisting() throws Exception {
+        final Random random = new Random();
+        final long[] segmentIds = random.longs(2 * SEGMENTS).toArray();
+        final AtomicInteger containsFailures = new AtomicInteger(0);
+        final AtomicInteger readFailures = new AtomicInteger(0);
+
+        runConcurrently.accept((nThread, nSegment) -> {
+            try {
+                long msb = segmentIds[2 * nSegment];
+                long lsb = segmentIds[2 * nSegment + 1];
+                if (persistentCache.containsSegment(msb, lsb)) {
+                    containsFailures.incrementAndGet();
+                }
+                if (persistentCache.readSegment(msb, lsb) != null) {
+                    readFailures.incrementAndGet();
+                }
+            } catch (Throwable t) {
+                errors.incrementAndGet();
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+
+        assertEquals("exceptions occurred", 0, errors.get());
+        assertNoTimeout();
+        assertEquals("containsSegment failed", 0, containsFailures.get());
+        assertEquals("readSegment failed", 0, readFailures.get());
+    }
+
+    @Test
+    public void testExisting() throws Exception {
+        final TestSegment testSegment = TestSegment.createSegment();
+        final long[] segmentId = testSegment.getSegmentId();
+        persistentCache.writeSegment(segmentId[0], segmentId[1], testSegment.getSegmentBuffer());
+        final AtomicInteger containsFailures = new AtomicInteger(0);
+        final AtomicInteger readFailures = new AtomicInteger(0);
+
+        // We need this to give the cache's write thread pool time to start the thread
+        Thread.sleep(1000);
+
+        waitWhile.accept(() -> persistentCache.getWritesPending() > 0);
+        assertNoTimeout();
+        assertEquals(0, persistentCache.getWritesPending());
+
+        runConcurrently.accept((nThread, nSegment) -> {
+            try {
+                if (!persistentCache.containsSegment(segmentId[0], segmentId[1])) {
+                    containsFailures.incrementAndGet();
+                }
+                if (persistentCache.readSegment(segmentId[0], segmentId[1]) == null) {
+                    readFailures.incrementAndGet();
+                }
+            } catch (Throwable t) {
+                errors.incrementAndGet();
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+
+        assertEquals("Exceptions occurred", 0, errors.get());
+        assertNoTimeout();
+        assertEquals("containsSegment failed", 0, containsFailures.get());
+        assertEquals("readSegment failed", 0, readFailures.get());
+    }
+
+    @Test
+    public void testConcurrentWritesSameSegment() throws Exception {
+        final TestSegment testSegment = TestSegment.createSegment();
+        long[] segmentId = testSegment.getSegmentId();
+
+        runConcurrently.accept((nThread, nSegment) -> {
+            try {
+                persistentCache.writeSegment(segmentId[0], segmentId[1], testSegment.getSegmentBuffer());
+            } catch (Throwable t) {
+                errors.incrementAndGet();
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+
+        Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1]);
+        assertNotNull("The segment was not found", segmentRead);
+        assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead);
+    }
+
+    protected static class TestSegment {
+        public static int UUID_LEN = 2 * Long.SIZE;
+        public static int SEGMENT_LEN = 256 * 1024;
+
+        private static final Random random = new Random();
+
+        private final byte[] segmentId;
+        private final byte[] segmentBytes;
+
+        protected static TestSegment createSegment() {
+            return new TestSegment(createSegmentIdBytes(), createSegmentBytes());
+        }
+
+        private static byte[] createSegmentBytes() {
+            byte[] ret = new byte[SEGMENT_LEN];
+            random.nextBytes(ret);
+            return ret;
+        }
+
+        private static byte[] createSegmentIdBytes() {
+            byte[] ret = new byte[UUID_LEN];
+            random.nextBytes(ret);
+            return ret;
+        }
+
+        protected long[] getSegmentId() {
+            final Buffer buffer = Buffer.allocate(segmentId.length);
+            buffer.put(segmentId);
+            long[] ret = new long[2];
+            ret[0] = buffer.getLong(0);
+            ret[1] = buffer.getLong(8);
+            return ret;
+        }
+
+        protected Buffer getSegmentBuffer() {
+            return Buffer.wrap(segmentBytes);
+        }
+
+        private TestSegment(byte[] segmentId, byte[] segmentBytes) {
+            this.segmentId = segmentId;
+            this.segmentBytes = segmentBytes;
+        }
+
+        protected byte[] getSegmentBytes() {
+            return segmentBytes;
+        }
+    }
+
+    protected static void assertSegmentBufferEquals(Buffer expected, Buffer actual) {
+        expected.rewind();
+        actual.rewind();
+        assertEquals("Segment size is different", TestSegment.SEGMENT_LEN, actual.remaining());
+        for (int i = 0; i < TestSegment.SEGMENT_LEN; ++i) {
+            assertEquals("Difference in byte buffer", expected.get(i), actual.get(i));
+        }
+    }
+
+    protected void assertNoTimeout() {
+        assertTrue("Wait timeout reached", count < TIMEOUT_COUNT);
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PersistentDiskCacheTest extends AbstractPersistentCacheTest {
+
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+
+    @Before
+    public void setUp() throws Exception {
+        persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024);
+    }
+
+    @Test
+    public void cleanupTest() throws Exception {
+        persistentCache.close();
+        persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0);
+        final List<TestSegment> testSegments = new ArrayList<>(SEGMENTS);
+        final List<Map<String, Buffer>> segmentsRead = new ArrayList<>(THREADS);
+
+        for (int i = 0; i < SEGMENTS; ++i) {
+            testSegments.add(TestSegment.createSegment());
+        }
+
+        for (int i = 0; i < THREADS; ++i) {
+            final Map<String, Buffer> segmentsReadThisThread = new HashMap<>(SEGMENTS_PER_THREAD);
+            segmentsRead.add(segmentsReadThisThread);
+        }
+
+        runConcurrently.accept((nThread, nSegment) -> {
+            TestSegment segment = testSegments.get(nSegment);
+            long[] id = segment.getSegmentId();
+            try {
+                persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer());
+            } catch (Throwable t) {
+                errors.incrementAndGet();
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+        waitWhile.accept(() -> persistentCache.getWritesPending() > 0);
+
+        assertEquals("Errors have occurred while writing", 0, errors.get());
+        assertNoTimeout();
+
+        done.set(0);
+        waitWhile.accept(() -> ((PersistentDiskCache) persistentCache).cleanupInProgress.get());
+
+        persistentCache.cleanUp();
+
+        runConcurrently.accept((nThread, nSegment) -> {
+            final TestSegment segment = testSegments.get(nSegment);
+            final long[] id = segment.getSegmentId();
+            try {
+                final Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(nThread);
+                final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+                segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
+            } catch (Throwable t) {
+                errors.incrementAndGet();
+            } finally {
+                done.incrementAndGet();
+            }
+        });
+
+        waitWhile.accept(() -> done.get() < SEGMENTS);
+
+        assertNoTimeout();
+        assertEquals("Errors have occurred while reading", 0, errors.get());
+        errors.set(0);
+
+        for (int i = 0; i < THREADS; ++i) {
+            for (int j = i * SEGMENTS_PER_THREAD; j < (i + 1) * SEGMENTS_PER_THREAD; ++j) {
+                TestSegment testSegment = testSegments.get(j);
+                byte[] testSegmentBytes = testSegment.getSegmentBytes();
+                Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(i);
+                long[] segmentReadId = testSegment.getSegmentId();
+                Buffer segmentRead = segmentsReadThisThread.get(new UUID(segmentReadId[0], segmentReadId[1]).toString());
+                if (segmentRead == null) {
+                    errors.incrementAndGet();
+                }
+            }
+        }
+        assertEquals("Segment(s) not cleaned up in cache", 0, SEGMENTS - errors.get());
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.junit.After;
+import org.junit.Before;
+import redis.embedded.RedisServer;
+
+public class PersistentRedisCacheTest extends AbstractPersistentCacheTest {
+
+    private RedisServer redisServer;
+
+    @Before
+    public void setUp() throws Exception {
+        redisServer = RedisServer.builder().build();
+        redisServer.start();
+        int port = redisServer.ports().get(0);
+        persistentCache = new PersistentRedisCache(
+                "localhost",
+                port,
+                -1,
+                10000,
+                50,
+                10,
+                2000,
+                200000
+        );
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        redisServer.stop();
+    }
+}
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java Tue Aug  4 14:13:48 2020
@@ -48,6 +48,7 @@ import org.apache.jackrabbit.api.stats.T
 import org.apache.jackrabbit.oak.osgi.OsgiUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
@@ -254,6 +255,12 @@ public class SegmentNodeStoreFactory {
         boolean splitPersistence() default false;
 
         @AttributeDefinition(
+                name = "Cache persistence",
+                description = "Boolean value indicating that the persisted cache should be used for the custom segment store"
+        )
+        boolean cachePersistence() default false;
+
+        @AttributeDefinition(
             name = "Backup directory",
             description = "Directory (relative to current working directory) for storing repository backups. " +
                 "Defaults to 'repository.home/segmentstore-backup'."
@@ -315,6 +322,13 @@ public class SegmentNodeStoreFactory {
     )
     private volatile SegmentNodeStorePersistence segmentStore;
 
+    @Reference(
+        cardinality = ReferenceCardinality.OPTIONAL,
+        policy = ReferencePolicy.STATIC,
+        policyOption = ReferencePolicyOption.GREEDY
+    )
+    private volatile PersistentCache persistentCache;
+
     @Reference
     private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
 
@@ -334,6 +348,7 @@ public class SegmentNodeStoreFactory {
             configuration,
             blobStore,
             segmentStore,
+            persistentCache,
             getRoleStatisticsProvider(statisticsProvider, configuration.role()),
             registrations,
             whiteboard,
@@ -376,6 +391,7 @@ public class SegmentNodeStoreFactory {
         Configuration configuration,
         BlobStore blobStore,
         SegmentNodeStorePersistence segmentStore,
+        PersistentCache persistentCache,
         StatisticsProvider statisticsProvider,
         Closer closer,
         Whiteboard whiteboard,
@@ -539,6 +555,11 @@ public class SegmentNodeStoreFactory {
             }
 
             @Override
+            public boolean hasCachePersistence() {
+                return configuration.cachePersistence();
+            }
+
+            @Override
             public boolean registerDescriptors() {
                 return configuration.registerDescriptors();
             }
@@ -607,6 +628,11 @@ public class SegmentNodeStoreFactory {
             }
 
             @Override
+            public PersistentCache getPersistentCache() {
+                return persistentCache;
+            }
+
+            @Override
             public BundleContext getBundleContext() {
                 return context.getBundleContext();
             }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java Tue Aug  4 14:13:48 2020
@@ -25,14 +25,8 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo.getOrCreateId;
 
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.io.Closer;
+
 import org.apache.jackrabbit.commons.SimpleValueFactory;
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
@@ -50,9 +44,17 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentRevisionGC;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentRevisionGCMBean;
-import org.apache.jackrabbit.oak.segment.file.*;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.FileStoreGCMonitor;
+import org.apache.jackrabbit.oak.segment.file.FileStoreStatsMBean;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.segment.file.MetricsIOMonitor;
+import org.apache.jackrabbit.oak.segment.file.MetricsRemoteStoreMonitor;
 import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.CachingPersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
 import org.apache.jackrabbit.oak.segment.spi.persistence.split.SplitPersistence;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -76,6 +78,13 @@ import org.osgi.framework.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 class SegmentNodeStoreRegistrar {
 
     static SegmentNodeStore registerSegmentNodeStore(Configuration cfg) throws IOException {
@@ -136,6 +145,8 @@ class SegmentNodeStoreRegistrar {
 
         boolean hasSplitPersistence();
 
+        boolean hasCachePersistence();
+
         boolean registerDescriptors();
 
         boolean dispatchChanges();
@@ -160,6 +171,8 @@ class SegmentNodeStoreRegistrar {
 
         SegmentNodeStorePersistence getSegmentNodeStorePersistence();
 
+        PersistentCache getPersistentCache();
+
         BundleContext getBundleContext();
 
     }
@@ -231,16 +244,22 @@ class SegmentNodeStoreRegistrar {
         }
 
         if (cfg.hasCustomSegmentStore() && cfg.getSegmentNodeStorePersistence() != null) {
+            SegmentNodeStorePersistence customPersistence = cfg.getSegmentNodeStorePersistence();
+
+            if (cfg.hasCachePersistence()) {
+                cfg.getLogger().info("Using persistent cache for the custom persistence [{}]", customPersistence);
+                customPersistence = new CachingPersistence(cfg.getPersistentCache(), customPersistence);
+            }
+
             if (cfg.hasSplitPersistence()) {
-                cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", cfg.getSegmentNodeStorePersistence());
+                cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", customPersistence);
                 cfg.getSplitPersistenceDirectory().mkdirs();
-                SegmentNodeStorePersistence roPersistence = cfg.getSegmentNodeStorePersistence();
                 SegmentNodeStorePersistence rwPersistence = new TarPersistence(cfg.getSplitPersistenceDirectory());
-                SegmentNodeStorePersistence persistence = new SplitPersistence(roPersistence, rwPersistence);
+                SegmentNodeStorePersistence persistence = new SplitPersistence(customPersistence, rwPersistence);
                 builder.withCustomPersistence(persistence);
             } else {
-                cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", cfg.getSegmentNodeStorePersistence());
-                builder.withCustomPersistence(cfg.getSegmentNodeStorePersistence());
+                cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", customPersistence);
+                builder.withCustomPersistence(customPersistence);
             }
         }
 

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java Tue Aug  4 14:13:48 2020
@@ -38,13 +38,12 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.DEFAULT_MAX_FILE_SIZE;
 import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
 
-import java.io.File;
-import java.io.IOException;
-
 import com.google.common.io.Closer;
+
 import org.apache.jackrabbit.oak.osgi.OsgiUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
@@ -64,6 +63,9 @@ import org.osgi.service.metatype.annotat
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+
 /**
  * An OSGi wrapper for the segment node store.
  */
@@ -252,6 +254,12 @@ public class SegmentNodeStoreService {
         boolean splitPersistence() default false;
 
         @AttributeDefinition(
+                name = "Cache persistence",
+                description = "Boolean value indicating that the persisted cache should be used for the custom segment store"
+        )
+        boolean cachePersistence() default false;
+
+        @AttributeDefinition(
             name = "Backup directory",
             description = "Directory (relative to current working directory) for storing repository backups. " +
                 "Defaults to 'repository.home/segmentstore-backup'."
@@ -295,6 +303,13 @@ public class SegmentNodeStoreService {
     )
     private volatile SegmentNodeStorePersistence segmentStore;
 
+    @Reference(
+        cardinality = ReferenceCardinality.OPTIONAL,
+        policy = ReferencePolicy.STATIC,
+        policyOption = ReferencePolicyOption.GREEDY
+    )
+    private volatile PersistentCache persistentCache;
+
     @Reference
     private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
 
@@ -303,7 +318,7 @@ public class SegmentNodeStoreService {
     @Activate
     public void activate(ComponentContext context, Configuration configuration) throws IOException {
         OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
-        registerSegmentStore(context, configuration, blobStore, segmentStore, statisticsProvider, closer, whiteboard, log);
+        registerSegmentStore(context, configuration, blobStore, segmentStore, persistentCache, statisticsProvider, closer, whiteboard, log);
     }
 
     private static SegmentNodeStore registerSegmentStore(
@@ -311,6 +326,7 @@ public class SegmentNodeStoreService {
         Configuration configuration,
         BlobStore blobStore,
         SegmentNodeStorePersistence segmentStore,
+        PersistentCache persistentCache,
         StatisticsProvider statisticsProvider,
         Closer closer,
         Whiteboard whiteboard,
@@ -489,6 +505,11 @@ public class SegmentNodeStoreService {
             }
 
             @Override
+            public boolean hasCachePersistence() {
+                return configuration.cachePersistence();
+            }
+
+            @Override
             public boolean registerDescriptors() {
                 return true;
             }
@@ -557,6 +578,11 @@ public class SegmentNodeStoreService {
             }
 
             @Override
+            public PersistentCache getPersistentCache() {
+                return persistentCache;
+            }
+
+            @Override
             public BundleContext getBundleContext() {
                 return context.getBundleContext();
             }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public abstract class AbstractPersistentCache implements PersistentCache, Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class);
+
+    public static final int THREADS = Integer.getInteger("oak.segment.cache.threads", 10);
+
+    protected ExecutorService executor;
+    protected AtomicLong cacheSize = new AtomicLong(0);
+    protected PersistentCache nextCache;
+    protected final HashSet<String> writesPending;
+
+    public AbstractPersistentCache() {
+        executor = Executors.newFixedThreadPool(THREADS);
+        writesPending = new HashSet<>();
+    }
+
+    public PersistentCache linkWith(PersistentCache nextCache) {
+        this.nextCache = nextCache;
+        return nextCache;
+    }
+
+    @Override
+    public void close() {
+        try {
+            executor.shutdown();
+            if (executor.awaitTermination(60, SECONDS)) {
+                logger.debug("The persistent cache scheduler was successfully shut down");
+            } else {
+                logger.warn("The persistent cache scheduler takes too long to shut down");
+            }
+        } catch (InterruptedException e) {
+            logger.warn("Interrupt while shutting down the persistent cache scheduler", e);
+            currentThread().interrupt();
+        }
+    }
+
+    public int getWritesPending() {
+        synchronized (writesPending) {
+            return writesPending.size();
+        }
+    }
+
+    protected boolean lockSegmentWrite(String segmentId) {
+        synchronized (writesPending) {
+            return writesPending.add(segmentId);
+        }
+    }
+
+    protected void unlockSegmentWrite(String segmentId) {
+        synchronized (writesPending) {
+            writesPending.remove(segmentId);
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class CachingArchiveManager implements SegmentArchiveManager {
+
+    private final SegmentArchiveManager delegate;
+
+    private final PersistentCache persistentCache;
+
+    public CachingArchiveManager(PersistentCache persistentCache, SegmentArchiveManager delegate) {
+        this.delegate = delegate;
+        this.persistentCache = persistentCache;
+    }
+
+    @Override
+    public @NotNull List<String> listArchives() throws IOException {
+        return delegate.listArchives();
+    }
+
+    @Override
+    public @Nullable SegmentArchiveReader open(@NotNull String archiveName) throws IOException {
+        return new CachingSegmentArchiveReader(persistentCache, delegate.open(archiveName));
+    }
+
+    @Override
+    public @Nullable SegmentArchiveReader forceOpen(String archiveName) throws IOException {
+        return new CachingSegmentArchiveReader(persistentCache, delegate.forceOpen(archiveName));
+    }
+
+    @Override
+    public @NotNull SegmentArchiveWriter create(@NotNull String archiveName) throws IOException {
+        return delegate.create(archiveName);
+    }
+
+    @Override
+    public boolean delete(@NotNull String archiveName) {
+        return delegate.delete(archiveName);
+    }
+
+    @Override
+    public boolean renameTo(@NotNull String from, @NotNull String to) {
+        return delegate.renameTo(from, to);
+    }
+
+    @Override
+    public void copyFile(@NotNull String from, @NotNull String to) throws IOException {
+        delegate.copyFile(from, to);
+    }
+
+    @Override
+    public boolean exists(@NotNull String archiveName) {
+        return delegate.exists(archiveName);
+    }
+
+    @Override
+    public void recoverEntries(@NotNull String archiveName, @NotNull LinkedHashMap<UUID, byte[]> entries) throws IOException {
+        delegate.recoverEntries(archiveName, entries);
+    }
+
+    @Override
+    public void backup(@NotNull String archiveName, @NotNull String backupArchiveName,
+            @NotNull Set<UUID> recoveredEntries) throws IOException {
+        delegate.backup(archiveName, backupArchiveName, recoveredEntries);
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+
+import java.io.IOException;
+
+public class CachingPersistence implements SegmentNodeStorePersistence {
+
+    private final SegmentNodeStorePersistence delegate;
+
+    private final PersistentCache persistentCache;
+
+    public CachingPersistence(PersistentCache persistentCache, SegmentNodeStorePersistence delegate) {
+        this.delegate = delegate;
+        this.persistentCache = persistentCache;
+    }
+
+    @Override
+    public SegmentArchiveManager createArchiveManager(boolean memoryMapping, boolean offHeapAccess, IOMonitor ioMonitor,
+            FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) throws IOException {
+        return  new CachingArchiveManager(persistentCache, delegate.createArchiveManager(memoryMapping, offHeapAccess, ioMonitor, fileStoreMonitor, remoteStoreMonitor));
+    }
+
+    @Override
+    public boolean segmentFilesExist() {
+        return delegate.segmentFilesExist();
+    }
+
+    @Override
+    public JournalFile getJournalFile() {
+        return delegate.getJournalFile();
+    }
+
+    @Override
+    public GCJournalFile getGCJournalFile() throws IOException {
+        return delegate.getGCJournalFile();
+    }
+
+    @Override
+    public ManifestFile getManifestFile() throws IOException {
+        return delegate.getManifestFile();
+    }
+
+    @Override
+    public RepositoryLock lockRepository() throws IOException {
+        return delegate.lockRepository();
+    }
+
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CachingSegmentArchiveReader implements SegmentArchiveReader {
+
+    @NotNull
+    private final PersistentCache persistentCache;
+
+    @NotNull
+    private final SegmentArchiveReader delegate;
+
+    public CachingSegmentArchiveReader(
+            @NotNull PersistentCache persistentCache,
+            @NotNull SegmentArchiveReader delegate) {
+        this.persistentCache = persistentCache;
+        this.delegate = delegate;
+    }
+
+    @Override
+    @Nullable
+    public Buffer readSegment(long msb, long lsb) throws IOException {
+        Buffer buffer = persistentCache.readSegment(msb, lsb);
+        if (buffer == null) {
+            buffer = delegate.readSegment(msb, lsb);
+            if (buffer != null) {
+                persistentCache.writeSegment(msb, lsb, buffer);
+            }
+        }
+        return buffer;
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        if (persistentCache.containsSegment(msb, lsb)) {
+            return true;
+        } else {
+            return delegate.containsSegment(msb, lsb);
+        }
+    }
+
+    @Override
+    public List<SegmentArchiveEntry> listSegments() {
+        return delegate.listSegments();
+    }
+
+    @Override
+    @Nullable
+    public Buffer getGraph() throws IOException {
+        return delegate.getGraph();
+    }
+
+    @Override
+    public boolean hasGraph() {
+        return delegate.hasGraph();
+    }
+
+    @Override
+    @NotNull
+    public Buffer getBinaryReferences() throws IOException {
+        return delegate.getBinaryReferences();
+    }
+
+    @Override
+    public long length() {
+        return delegate.length();
+    }
+
+    @Override
+    @NotNull
+    public String getName() {
+        return delegate.getName();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.close();
+    }
+
+    @Override
+    public int getEntrySize(int size) {
+        return delegate.getEntrySize(size);
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java Tue Aug  4 14:13:48 2020
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This interface represents a cache which survives segment store restarts.
+ * The cache is agnostic to any archive structure. Segments are only
+ * identified by their UUIDs, specified as msb and lsb parts of the segment id.
+ */
+public interface PersistentCache {
+
+    /**
+     * Reads the segment from cache.
+     *
+     * @param msb the most significant bits of the identifier of the segment
+     * @param lsb the least significant bits of the identifier of the segment
+     * @return byte buffer containing the segment data or null if the segment doesn't exist
+     */
+    @Nullable
+    Buffer readSegment(long msb, long lsb);
+
+    /**
+     * Check if the segment exists in the cache.
+     *
+     * @param msb the most significant bits of the identifier of the segment
+     * @param lsb the least significant bits of the identifier of the segment
+     * @return {@code true} if the segment exists
+     */
+    boolean containsSegment(long msb, long lsb);
+
+    /**
+     * Writes the segment to the cache.
+     *
+     * @param msb the most significant bits of the identifier of the segment
+     * @param lsb the least significant bits of the identifier of the segment
+     * @param buffer the byte buffer containing the segment data
+     */
+    void writeSegment(long msb, long lsb, Buffer buffer);
+
+    /**
+     * Purges the cache entries according to the implementation policy (e.g. maximum
+     * cache size, maximum number of entries, etc.)
+     */
+    void cleanUp();
+}