You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/08/04 14:13:48 UTC
svn commit: r1880568 - in /jackrabbit/oak/trunk: oak-run/ oak-segment-remote/
oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persisten...
Author: adulceanu
Date: Tue Aug 4 14:13:48 2020
New Revision: 1880568
URL: http://svn.apache.org/viewvc?rev=1880568&view=rev
Log:
OAK-7744 - Persistent cache for the Segment Node Store
Added:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
Modified:
jackrabbit/oak/trunk/oak-run/pom.xml
jackrabbit/oak/trunk/oak-segment-remote/pom.xml
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
Modified: jackrabbit/oak/trunk/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/pom.xml?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-run/pom.xml Tue Aug 4 14:13:48 2020
@@ -35,6 +35,7 @@
<groovy.version>2.4.17</groovy.version>
<!--
Size History:
+ + 55 MB Add support for 3rd level cache (GRANITE-30735)
+ 54 MB AWS support for segment-tar (OAK-8827)
+ 52 MB AWS java sdk update (OAK-8875)
+ 51 MB AWS java sdk update (OAK-7536)
@@ -45,7 +46,7 @@
+ 41 MB build failing on the release profile (OAK-6250)
+ 38 MB. Initial value. Current 35MB plus a 10%
-->
- <max.jar.size>54000000</max.jar.size>
+ <max.jar.size>57671680</max.jar.size>
</properties>
<build>
Modified: jackrabbit/oak/trunk/oak-segment-remote/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/pom.xml?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/pom.xml Tue Aug 4 14:13:48 2020
@@ -39,11 +39,18 @@
<!-- OAK-7182 -->${guava.osgi.import},
org.apache.jackrabbit.oak.segment.spi*,
!org.apache.jackrabbit.oak.segment*,
+ !net.sf.cglib.asm.util,
+ !org.apache.tools.ant,
+ !org.apache.tools.ant.types,
*
</Import-Package>
<Export-Package>
- org.apache.jackrabbit.oak.segment.remote*
+ org.apache.jackrabbit.oak.segment.remote
</Export-Package>
+ <Embed-Dependency>
+ org.apache.servicemix.bundles.*,
+ commons-pool2
+ </Embed-Dependency>
</instructions>
</configuration>
</plugin>
@@ -108,6 +115,23 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+
+ <!-- Redis client for segment cache -->
+ <dependency>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.jedis</artifactId>
+ <version>3.3.0_1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.cglib</artifactId>
+ <version>3.1_1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>2.6.2</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
@@ -120,5 +144,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.kstyrc</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <version>0.6</version>
+ </dependency>
</dependencies>
</project>
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,89 @@
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.Configuration.PID;
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentDiskCache.DEFAULT_MAX_CACHE_SIZE_MB;
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentRedisCache.DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+ pid = {PID},
+ name = "Apache Jackrabbit Oak Remote Persistent Cache Service",
+ description = "Persistent cache for the Oak Segment Node Store")
+public @interface Configuration {
+
+ String PID = "org.apache.jackrabbit.oak.segment.remote.RemotePersistentCacheService";
+
+ @AttributeDefinition(
+ name = "Disk cache persistence",
+ description = "Boolean value indicating that the local disk persisted cache should be used for segment store"
+ )
+ boolean diskCacheEnabled() default false;
+
+ @AttributeDefinition(
+ name = "Disk cache persistence directory",
+ description = "Path on the file system where disk cache persistence data will be stored."
+ )
+ String diskCacheDirectory() default "";
+
+ @AttributeDefinition(
+ name = "Disk cache persistence maximum size",
+ description = "Disk cache size (in MB). Default value is " + DEFAULT_MAX_CACHE_SIZE_MB
+ )
+ int diskCacheMaxSizeMB() default DEFAULT_MAX_CACHE_SIZE_MB;
+
+ @AttributeDefinition(
+ name = "Redis cache persistence",
+ description = "Boolean value indicating that the redis persisted cache should be used for segment store"
+ )
+ boolean redisCacheEnabled() default false;
+
+ @AttributeDefinition(
+ name = "Redis cache persistence host",
+ description = "Remote redis server host"
+ )
+ String redisCacheHost() default "";
+
+ @AttributeDefinition(
+ name = "Redis cache persistence port",
+ description = "Remote redis server port (0 = default)"
+ )
+ int redisCachePort() default 0;
+
+ @AttributeDefinition(
+ name = "Redis cache persistence entries expiry interval",
+ description = "Number of seconds to keep the entries in the cache. Default value is " + DEFAULT_REDIS_CACHE_EXPIRE_SECONDS
+ )
+ int redisCacheExpireSeconds() default DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
+
+ @AttributeDefinition(
+ name = "Redis socket timeout",
+ description = "Number of seconds to wait for response for request"
+ )
+ int redisSocketTimeout() default 10;
+
+ @AttributeDefinition(
+ name = "Redis connection timeout",
+ description = "Number of seconds to wait for redis connection to be established"
+ )
+ int redisConnectionTimeout() default 50;
+
+ @AttributeDefinition(
+ name = "Redis Minimum Connections",
+ description = "Minimum number of established connections that need to be kept in the pool"
+ )
+ int redisMinConnections() default 10;
+
+ @AttributeDefinition(
+ name = "Redis Maximum Connections",
+ description = "Maximum number of connections required by the business"
+ )
+ int redisMaxConnections() default 100;
+
+ @AttributeDefinition(
+ name = "Redis Maximum Total Connections",
+ description = "Includes the number of idle connections as a surplus"
+ )
+ int redisMaxTotalConnections() default 200;
+}
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
+import java.util.Comparator;
+import java.util.Spliterator;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+public class PersistentDiskCache extends AbstractPersistentCache {
+ private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class);
+ public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512;
+
+ private final File directory;
+ private final long maxCacheSizeBytes;
+
+ final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
+
+ private static final Comparator<Path> sortedByAccessTime = (path1, path2) -> {
+ try {
+ FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime();
+ FileTime lastAccessFile2 = Files.readAttributes(path2, BasicFileAttributes.class).lastAccessTime();
+ return lastAccessFile1.compareTo(lastAccessFile2);
+ } catch (IOException e) {
+ logger.error("A problem occurred while cleaning up the cache: ", e);
+ }
+ return 0;
+ };
+
+ public PersistentDiskCache(File directory, int cacheMaxSizeMB) {
+ this.directory = directory;
+ this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L;
+
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+
+ cacheSize.set(FileUtils.sizeOfDirectory(directory));
+ }
+
+ @Override
+ public Buffer readSegment(long msb, long lsb) {
+ String segmentId = new UUID(msb, lsb).toString();
+ File segmentFile = new File(directory, segmentId);
+
+ if (segmentFile.exists()) {
+ try (FileInputStream fis = new FileInputStream(segmentFile);
+ FileChannel channel = fis.getChannel()) {
+ int length = (int) channel.size();
+
+ Buffer buffer = Buffer.allocateDirect(length);
+ if (buffer.readFully(channel, 0) < length) {
+ throw new EOFException();
+ }
+ buffer.flip();
+
+ return buffer;
+ } catch (FileNotFoundException e) {
+ logger.info("Segment {} deleted from file system!", segmentId);
+ } catch (IOException e) {
+ logger.error("Error loading segment {} from cache: {}", segmentId, e);
+ }
+ }
+
+ if (nextCache != null) {
+ return nextCache.readSegment(msb, lsb);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return new File(directory, new UUID(msb, lsb).toString()).exists();
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, Buffer buffer) {
+ String segmentId = new UUID(msb, lsb).toString();
+ File segmentFile = new File(directory, segmentId);
+ Buffer bufferCopy = buffer.duplicate();
+
+ Runnable task = () -> {
+ if (lockSegmentWrite(segmentId)) {
+ try (FileChannel channel = new FileOutputStream(segmentFile).getChannel()) {
+ int fileSize = bufferCopy.write(channel);
+ cacheSize.addAndGet(fileSize);
+ } catch (Throwable t) {
+ logger.error("Error writing segment {} to cache: {}", segmentId, t);
+ } finally {
+ unlockSegmentWrite(segmentId);
+ }
+ }
+ cleanUp();
+ };
+
+ executor.execute(task);
+
+ if (nextCache != null) {
+ nextCache.writeSegment(msb, lsb, buffer);
+ }
+ }
+
+ private boolean isCacheFull() {
+ return cacheSize.get() >= maxCacheSizeBytes;
+ }
+
+ @Override
+ public void cleanUp() {
+ if (!cleanupInProgress.getAndSet(true)) {
+ try {
+ cleanUpInternal();
+ } finally {
+ cleanupInProgress.set(false);
+ }
+ }
+ }
+
+ private void cleanUpInternal() {
+ if (isCacheFull()) {
+ try {
+ Stream<Path> segmentsPaths = Files.walk(directory.toPath())
+ .sorted(sortedByAccessTime)
+ .filter(filePath -> !filePath.toFile().isDirectory());
+
+ StreamConsumer.forEach(segmentsPaths, (path, breaker) -> {
+
+ if (cacheSize.get() > maxCacheSizeBytes * 0.66) {
+ cacheSize.addAndGet(-path.toFile().length());
+ path.toFile().delete();
+ } else {
+ breaker.stop();
+ }
+ });
+ } catch (IOException e) {
+ logger.error("A problem occurred while cleaning up the cache: ", e);
+ }
+ }
+ }
+
+ static class StreamConsumer {
+
+ public static class Breaker {
+ private boolean shouldBreak = false;
+
+ public void stop() {
+ shouldBreak = true;
+ }
+
+ boolean get() {
+ return shouldBreak;
+ }
+ }
+
+ public static <T> void forEach(Stream<T> stream, BiConsumer<T, Breaker> consumer) {
+ Spliterator<T> spliterator = stream.spliterator();
+ boolean hadNext = true;
+ Breaker breaker = new Breaker();
+
+ while (hadNext && !breaker.get()) {
+ hadNext = spliterator.tryAdvance(elem -> {
+ consumer.accept(elem, breaker);
+ });
+ }
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.UUID;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class PersistentRedisCache extends AbstractPersistentCache {
+ private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class);
+ public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2;
+
+ private static final String REDIS_PREFIX = "SEGMENT";
+ private final int redisExpireSeconds;
+ private JedisPool redisPool;
+
+ public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout, int redisConnectionTimeout,
+ int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections) {
+ this.redisExpireSeconds = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+
+ if (redisPort == 0) {
+ redisPort = 6379;
+ }
+
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ jedisPoolConfig.setTestOnBorrow(true);
+ jedisPoolConfig.setMaxWaitMillis(redisSocketTimeout);
+ jedisPoolConfig.setMinIdle(redisMinConnections);
+ jedisPoolConfig.setMaxIdle(redisMaxConnections);
+ jedisPoolConfig.setMaxTotal(redisMaxTotalConnections);
+
+ this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout, redisSocketTimeout, null, Protocol.DEFAULT_DATABASE, null);
+ }
+
+ @Override
+ public Buffer readSegment(long msb, long lsb) {
+ String segmentId = new UUID(msb, lsb).toString();
+
+ try(Jedis redis = redisPool.getResource()) {
+ final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes());
+ if (bytes != null) {
+ Buffer buffer = Buffer.allocateDirect(bytes.length);
+
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+ } catch (JedisException e) {
+ logger.error("Error loading segment {} from cache: {}", segmentId, e);
+ }
+
+ if (nextCache != null) {
+ return nextCache.readSegment(msb, lsb);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ String segmentId = new UUID(msb, lsb).toString();
+
+ try(Jedis redis = redisPool.getResource()) {
+ return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes());
+ } catch (JedisException e) {
+ logger.error("Error checking segment existence {} in cache: {}", segmentId, e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, Buffer buffer){
+ String segmentId = new UUID(msb, lsb).toString();
+ Buffer bufferCopy = buffer.duplicate();
+
+ Runnable task = () -> {
+ if (lockSegmentWrite(segmentId)) {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (WritableByteChannel channel = Channels.newChannel(bos); Jedis redis = redisPool.getResource()) {
+ while (bufferCopy.hasRemaining()) {
+ bufferCopy.write(channel);
+ }
+ final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes();
+ redis.set(key, bos.toByteArray());
+ redis.expire(key, redisExpireSeconds);
+ cacheSize.addAndGet(bos.size());
+ } catch (Throwable t) {
+ logger.error("Error writing segment {} to cache: {}", segmentId, t);
+ } finally {
+ unlockSegmentWrite(segmentId);
+ }
+ }
+ };
+
+ executor.execute(task);
+
+ if (nextCache != null) {
+ nextCache.writeSegment(msb, lsb, buffer);
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ // do nothing
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,68 @@
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+
+import com.google.common.io.Closer;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+@Component(
+ configurationPolicy = ConfigurationPolicy.REQUIRE,
+ configurationPid = {Configuration.PID})
+public class RemotePersistentCacheService {
+ private ServiceRegistration registration;
+
+ private PersistentCache persistentCache;
+
+ private final Closer closer = Closer.create();
+
+ @Activate
+ public void activate(ComponentContext context, Configuration config) throws IOException {
+ persistentCache = createPersistentCache(config, closer);
+ registration = context.getBundleContext().registerService(PersistentCache.class.getName(), persistentCache, new Properties());
+ }
+
+ @Deactivate
+ public void deactivate() throws IOException {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ closeQuietly(closer);
+ persistentCache = null;
+ }
+
+ private static PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+ if (configuration.diskCacheEnabled()) {
+ PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB());
+ closer.register(persistentDiskCache);
+
+ if (configuration.redisCacheEnabled()) {
+ PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
+ configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+ persistentDiskCache.linkWith(redisCache);
+ closer.register(redisCache);
+ }
+
+ return persistentDiskCache;
+ } else if (configuration.redisCacheEnabled()) {
+ PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
+ configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections());
+ closer.register(redisCache);
+
+ return redisCache;
+ }
+
+ return null;
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.*;
+
+public abstract class AbstractPersistentCacheTest {
+
+ protected static final int SEGMENTS = 1000;
+ protected static final int THREADS = 50;
+ protected static final int SEGMENTS_PER_THREAD = SEGMENTS / THREADS;
+ protected static final int TIMEOUT_COUNT = 50;
+
+ protected static final Executor executor = Executors.newFixedThreadPool(THREADS);
+
+ protected static final Consumer<BiConsumer<Integer, Integer>> runConcurrently = r -> {
+ for (int i = 0; i < THREADS; ++i) {
+ int finalI = i;
+ executor.execute(() -> {
+ for (int j = finalI * SEGMENTS_PER_THREAD; j < (finalI + 1) * SEGMENTS_PER_THREAD; ++j) {
+ r.accept(finalI, j);
+ }
+ });
+ }
+ };
+
+ protected AbstractPersistentCache persistentCache;
+
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicInteger done = new AtomicInteger(0);
+ int count; // for checking timeouts
+
+ protected Consumer<Supplier<Boolean>> waitWhile = (r -> {
+ for (count = 0; r.get() && count < TIMEOUT_COUNT; ++count) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ });
+
+ @Test
+ public void writeAndReadManySegments() throws Exception {
+ final List<TestSegment> testSegments = new ArrayList<>(SEGMENTS);
+ final List<Map<String, Buffer>> segmentsRead = new ArrayList<>(THREADS);
+
+ for (int i = 0; i < SEGMENTS; ++i) {
+ testSegments.add(TestSegment.createSegment());
+ }
+
+ for (int i = 0; i < THREADS; ++i) {
+ final Map<String, Buffer> segmentsReadThisThread = new HashMap<>(SEGMENTS_PER_THREAD);
+ segmentsRead.add(segmentsReadThisThread);
+ }
+
+ runConcurrently.accept((nThread, nSegment) -> {
+ TestSegment segment = testSegments.get(nSegment);
+ long[] id = segment.getSegmentId();
+ try {
+ persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer());
+ } catch (Throwable t) {
+ errors.incrementAndGet();
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+ waitWhile.accept(() -> persistentCache.getWritesPending() > 0);
+
+ assertEquals("Errors have occurred while writing", 0, errors.get());
+ assertNoTimeout();
+
+ done.set(0);
+ runConcurrently.accept((nThread, nSegment) -> {
+ final Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(nThread);
+ final TestSegment segment = testSegments.get(nSegment);
+ final long[] id = segment.getSegmentId();
+ try {
+ final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+ segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+
+ assertNoTimeout();
+ assertEquals("Errors have occurred while reading", 0, errors.get());
+
+ for (int i = 0; i < THREADS; ++i) {
+ for (int j = i * SEGMENTS_PER_THREAD; j < (i + 1) * SEGMENTS_PER_THREAD; ++j) {
+ TestSegment testSegment = testSegments.get(j);
+ Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(i);
+ long[] segmentReadId = testSegment.getSegmentId();
+ Buffer segmentRead = segmentsReadThisThread.get(new UUID(segmentReadId[0], segmentReadId[1]).toString());
+ if (segmentRead == null) {
+ errors.incrementAndGet();
+ continue;
+ }
+ assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead);
+ }
+ }
+ assertEquals("Segment(s) not found in cache", 0, errors.get());
+ }
+
+ @Test
+ public void testNonExisting() throws Exception {
+ final Random random = new Random();
+ final long[] segmentIds = random.longs(2 * SEGMENTS).toArray();
+ final AtomicInteger containsFailures = new AtomicInteger(0);
+ final AtomicInteger readFailures = new AtomicInteger(0);
+
+ runConcurrently.accept((nThread, nSegment) -> {
+ try {
+ long msb = segmentIds[2 * nSegment];
+ long lsb = segmentIds[2 * nSegment + 1];
+ if (persistentCache.containsSegment(msb, lsb)) {
+ containsFailures.incrementAndGet();
+ }
+ if (persistentCache.readSegment(msb, lsb) != null) {
+ readFailures.incrementAndGet();
+ }
+ } catch (Throwable t) {
+ errors.incrementAndGet();
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+
+ assertEquals("exceptions occurred", 0, errors.get());
+ assertNoTimeout();
+ assertEquals("containsSegment failed", 0, containsFailures.get());
+ assertEquals("readSegment failed", 0, readFailures.get());
+ }
+
+ @Test
+ public void testExisting() throws Exception {
+ final TestSegment testSegment = TestSegment.createSegment();
+ final long[] segmentId = testSegment.getSegmentId();
+ persistentCache.writeSegment(segmentId[0], segmentId[1], testSegment.getSegmentBuffer());
+ final AtomicInteger containsFailures = new AtomicInteger(0);
+ final AtomicInteger readFailures = new AtomicInteger(0);
+
+ // We need this to give the cache's write thread pool time to start the thread
+ Thread.sleep(1000);
+
+ waitWhile.accept(() -> persistentCache.getWritesPending() > 0);
+ assertNoTimeout();
+ assertEquals(0, persistentCache.getWritesPending());
+
+ runConcurrently.accept((nThread, nSegment) -> {
+ try {
+ if (!persistentCache.containsSegment(segmentId[0], segmentId[1])) {
+ containsFailures.incrementAndGet();
+ }
+ if (persistentCache.readSegment(segmentId[0], segmentId[1]) == null) {
+ readFailures.incrementAndGet();
+ }
+ } catch (Throwable t) {
+ errors.incrementAndGet();
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+
+ assertEquals("Exceptions occurred", 0, errors.get());
+ assertNoTimeout();
+ assertEquals("containsSegment failed", 0, containsFailures.get());
+ assertEquals("readSegment failed", 0, readFailures.get());
+ }
+
+ @Test
+ public void testConcurrentWritesSameSegment() throws Exception {
+ final TestSegment testSegment = TestSegment.createSegment();
+ long[] segmentId = testSegment.getSegmentId();
+
+ runConcurrently.accept((nThread, nSegment) -> {
+ try {
+ persistentCache.writeSegment(segmentId[0], segmentId[1], testSegment.getSegmentBuffer());
+ } catch (Throwable t) {
+ errors.incrementAndGet();
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+
+ Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1]);
+ assertNotNull("The segment was not found", segmentRead);
+ assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead);
+ }
+
+ protected static class TestSegment {
+ public static int UUID_LEN = 2 * Long.SIZE;
+ public static int SEGMENT_LEN = 256 * 1024;
+
+ private static final Random random = new Random();
+
+ private final byte[] segmentId;
+ private final byte[] segmentBytes;
+
+ protected static TestSegment createSegment() {
+ return new TestSegment(createSegmentIdBytes(), createSegmentBytes());
+ }
+
+ private static byte[] createSegmentBytes() {
+ byte[] ret = new byte[SEGMENT_LEN];
+ random.nextBytes(ret);
+ return ret;
+ }
+
+ private static byte[] createSegmentIdBytes() {
+ byte[] ret = new byte[UUID_LEN];
+ random.nextBytes(ret);
+ return ret;
+ }
+
+ protected long[] getSegmentId() {
+ final Buffer buffer = Buffer.allocate(segmentId.length);
+ buffer.put(segmentId);
+ long[] ret = new long[2];
+ ret[0] = buffer.getLong(0);
+ ret[1] = buffer.getLong(8);
+ return ret;
+ }
+
+ protected Buffer getSegmentBuffer() {
+ return Buffer.wrap(segmentBytes);
+ }
+
+ private TestSegment(byte[] segmentId, byte[] segmentBytes) {
+ this.segmentId = segmentId;
+ this.segmentBytes = segmentBytes;
+ }
+
+ protected byte[] getSegmentBytes() {
+ return segmentBytes;
+ }
+ }
+
+ protected static void assertSegmentBufferEquals(Buffer expected, Buffer actual) {
+ expected.rewind();
+ actual.rewind();
+ assertEquals("Segment size is different", TestSegment.SEGMENT_LEN, actual.remaining());
+ for (int i = 0; i < TestSegment.SEGMENT_LEN; ++i) {
+ assertEquals("Difference in byte buffer", expected.get(i), actual.get(i));
+ }
+ }
+
+ protected void assertNoTimeout() {
+ assertTrue("Wait timeout reached", count < TIMEOUT_COUNT);
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PersistentDiskCacheTest extends AbstractPersistentCacheTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+
+ @Before
+ public void setUp() throws Exception {
+ persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024);
+ }
+
+ @Test
+ public void cleanupTest() throws Exception {
+ persistentCache.close();
+ persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0);
+ final List<TestSegment> testSegments = new ArrayList<>(SEGMENTS);
+ final List<Map<String, Buffer>> segmentsRead = new ArrayList<>(THREADS);
+
+ for (int i = 0; i < SEGMENTS; ++i) {
+ testSegments.add(TestSegment.createSegment());
+ }
+
+ for (int i = 0; i < THREADS; ++i) {
+ final Map<String, Buffer> segmentsReadThisThread = new HashMap<>(SEGMENTS_PER_THREAD);
+ segmentsRead.add(segmentsReadThisThread);
+ }
+
+ runConcurrently.accept((nThread, nSegment) -> {
+ TestSegment segment = testSegments.get(nSegment);
+ long[] id = segment.getSegmentId();
+ try {
+ persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer());
+ } catch (Throwable t) {
+ errors.incrementAndGet();
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+ waitWhile.accept(() -> persistentCache.getWritesPending() > 0);
+
+ assertEquals("Errors have occurred while writing", 0, errors.get());
+ assertNoTimeout();
+
+ done.set(0);
+ waitWhile.accept(() -> ((PersistentDiskCache) persistentCache).cleanupInProgress.get());
+
+ persistentCache.cleanUp();
+
+ runConcurrently.accept((nThread, nSegment) -> {
+ final TestSegment segment = testSegments.get(nSegment);
+ final long[] id = segment.getSegmentId();
+ try {
+ final Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(nThread);
+ final Buffer segmentRead = persistentCache.readSegment(id[0], id[1]);
+ segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead);
+ } catch (Throwable t) {
+ errors.incrementAndGet();
+ } finally {
+ done.incrementAndGet();
+ }
+ });
+
+ waitWhile.accept(() -> done.get() < SEGMENTS);
+
+ assertNoTimeout();
+ assertEquals("Errors have occurred while reading", 0, errors.get());
+ errors.set(0);
+
+ for (int i = 0; i < THREADS; ++i) {
+ for (int j = i * SEGMENTS_PER_THREAD; j < (i + 1) * SEGMENTS_PER_THREAD; ++j) {
+ TestSegment testSegment = testSegments.get(j);
+ byte[] testSegmentBytes = testSegment.getSegmentBytes();
+ Map<String, Buffer> segmentsReadThisThread = segmentsRead.get(i);
+ long[] segmentReadId = testSegment.getSegmentId();
+ Buffer segmentRead = segmentsReadThisThread.get(new UUID(segmentReadId[0], segmentReadId[1]).toString());
+ if (segmentRead == null) {
+ errors.incrementAndGet();
+ }
+ }
+ }
+ assertEquals("Segment(s) not cleaned up in cache", 0, SEGMENTS - errors.get());
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.junit.After;
+import org.junit.Before;
+import redis.embedded.RedisServer;
+
+public class PersistentRedisCacheTest extends AbstractPersistentCacheTest {
+
+ private RedisServer redisServer;
+
+ @Before
+ public void setUp() throws Exception {
+ redisServer = RedisServer.builder().build();
+ redisServer.start();
+ int port = redisServer.ports().get(0);
+ persistentCache = new PersistentRedisCache(
+ "localhost",
+ port,
+ -1,
+ 10000,
+ 50,
+ 10,
+ 2000,
+ 200000
+ );
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ redisServer.stop();
+ }
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java Tue Aug 4 14:13:48 2020
@@ -48,6 +48,7 @@ import org.apache.jackrabbit.api.stats.T
import org.apache.jackrabbit.oak.osgi.OsgiUtil;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
@@ -254,6 +255,12 @@ public class SegmentNodeStoreFactory {
boolean splitPersistence() default false;
@AttributeDefinition(
+ name = "Cache persistence",
+ description = "Boolean value indicating that the persisted cache should be used for the custom segment store"
+ )
+ boolean cachePersistence() default false;
+
+ @AttributeDefinition(
name = "Backup directory",
description = "Directory (relative to current working directory) for storing repository backups. " +
"Defaults to 'repository.home/segmentstore-backup'."
@@ -315,6 +322,13 @@ public class SegmentNodeStoreFactory {
)
private volatile SegmentNodeStorePersistence segmentStore;
+ @Reference(
+ cardinality = ReferenceCardinality.OPTIONAL,
+ policy = ReferencePolicy.STATIC,
+ policyOption = ReferencePolicyOption.GREEDY
+ )
+ private volatile PersistentCache persistentCache;
+
@Reference
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
@@ -334,6 +348,7 @@ public class SegmentNodeStoreFactory {
configuration,
blobStore,
segmentStore,
+ persistentCache,
getRoleStatisticsProvider(statisticsProvider, configuration.role()),
registrations,
whiteboard,
@@ -376,6 +391,7 @@ public class SegmentNodeStoreFactory {
Configuration configuration,
BlobStore blobStore,
SegmentNodeStorePersistence segmentStore,
+ PersistentCache persistentCache,
StatisticsProvider statisticsProvider,
Closer closer,
Whiteboard whiteboard,
@@ -539,6 +555,11 @@ public class SegmentNodeStoreFactory {
}
@Override
+ public boolean hasCachePersistence() {
+ return configuration.cachePersistence();
+ }
+
+ @Override
public boolean registerDescriptors() {
return configuration.registerDescriptors();
}
@@ -607,6 +628,11 @@ public class SegmentNodeStoreFactory {
}
@Override
+ public PersistentCache getPersistentCache() {
+ return persistentCache;
+ }
+
+ @Override
public BundleContext getBundleContext() {
return context.getBundleContext();
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java Tue Aug 4 14:13:48 2020
@@ -25,14 +25,8 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
import static org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo.getOrCreateId;
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.io.Closer;
+
import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
@@ -50,9 +44,17 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.compaction.SegmentRevisionGC;
import org.apache.jackrabbit.oak.segment.compaction.SegmentRevisionGCMBean;
-import org.apache.jackrabbit.oak.segment.file.*;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.FileStoreGCMonitor;
+import org.apache.jackrabbit.oak.segment.file.FileStoreStatsMBean;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.segment.file.MetricsIOMonitor;
+import org.apache.jackrabbit.oak.segment.file.MetricsRemoteStoreMonitor;
import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.CachingPersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.segment.spi.persistence.split.SplitPersistence;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -76,6 +78,13 @@ import org.osgi.framework.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
class SegmentNodeStoreRegistrar {
static SegmentNodeStore registerSegmentNodeStore(Configuration cfg) throws IOException {
@@ -136,6 +145,8 @@ class SegmentNodeStoreRegistrar {
boolean hasSplitPersistence();
+ boolean hasCachePersistence();
+
boolean registerDescriptors();
boolean dispatchChanges();
@@ -160,6 +171,8 @@ class SegmentNodeStoreRegistrar {
SegmentNodeStorePersistence getSegmentNodeStorePersistence();
+ PersistentCache getPersistentCache();
+
BundleContext getBundleContext();
}
@@ -231,16 +244,22 @@ class SegmentNodeStoreRegistrar {
}
if (cfg.hasCustomSegmentStore() && cfg.getSegmentNodeStorePersistence() != null) {
+ SegmentNodeStorePersistence customPersistence = cfg.getSegmentNodeStorePersistence();
+
+ if (cfg.hasCachePersistence()) {
+ cfg.getLogger().info("Using persistent cache for the custom persistence [{}]", customPersistence);
+ customPersistence = new CachingPersistence(cfg.getPersistentCache(), customPersistence);
+ }
+
if (cfg.hasSplitPersistence()) {
- cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", cfg.getSegmentNodeStorePersistence());
+ cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", customPersistence);
cfg.getSplitPersistenceDirectory().mkdirs();
- SegmentNodeStorePersistence roPersistence = cfg.getSegmentNodeStorePersistence();
SegmentNodeStorePersistence rwPersistence = new TarPersistence(cfg.getSplitPersistenceDirectory());
- SegmentNodeStorePersistence persistence = new SplitPersistence(roPersistence, rwPersistence);
+ SegmentNodeStorePersistence persistence = new SplitPersistence(customPersistence, rwPersistence);
builder.withCustomPersistence(persistence);
} else {
- cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", cfg.getSegmentNodeStorePersistence());
- builder.withCustomPersistence(cfg.getSegmentNodeStorePersistence());
+ cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", customPersistence);
+ builder.withCustomPersistence(customPersistence);
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java?rev=1880568&r1=1880567&r2=1880568&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java Tue Aug 4 14:13:48 2020
@@ -38,13 +38,12 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.DEFAULT_MAX_FILE_SIZE;
import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
-import java.io.File;
-import java.io.IOException;
-
import com.google.common.io.Closer;
+
import org.apache.jackrabbit.oak.osgi.OsgiUtil;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
@@ -64,6 +63,9 @@ import org.osgi.service.metatype.annotat
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+
/**
* An OSGi wrapper for the segment node store.
*/
@@ -252,6 +254,12 @@ public class SegmentNodeStoreService {
boolean splitPersistence() default false;
@AttributeDefinition(
+ name = "Cache persistence",
+ description = "Boolean value indicating that the persisted cache should be used for the custom segment store"
+ )
+ boolean cachePersistence() default false;
+
+ @AttributeDefinition(
name = "Backup directory",
description = "Directory (relative to current working directory) for storing repository backups. " +
"Defaults to 'repository.home/segmentstore-backup'."
@@ -295,6 +303,13 @@ public class SegmentNodeStoreService {
)
private volatile SegmentNodeStorePersistence segmentStore;
+ @Reference(
+ cardinality = ReferenceCardinality.OPTIONAL,
+ policy = ReferencePolicy.STATIC,
+ policyOption = ReferencePolicyOption.GREEDY
+ )
+ private volatile PersistentCache persistentCache;
+
@Reference
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
@@ -303,7 +318,7 @@ public class SegmentNodeStoreService {
@Activate
public void activate(ComponentContext context, Configuration configuration) throws IOException {
OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
- registerSegmentStore(context, configuration, blobStore, segmentStore, statisticsProvider, closer, whiteboard, log);
+ registerSegmentStore(context, configuration, blobStore, segmentStore, persistentCache, statisticsProvider, closer, whiteboard, log);
}
private static SegmentNodeStore registerSegmentStore(
@@ -311,6 +326,7 @@ public class SegmentNodeStoreService {
Configuration configuration,
BlobStore blobStore,
SegmentNodeStorePersistence segmentStore,
+ PersistentCache persistentCache,
StatisticsProvider statisticsProvider,
Closer closer,
Whiteboard whiteboard,
@@ -489,6 +505,11 @@ public class SegmentNodeStoreService {
}
@Override
+ public boolean hasCachePersistence() {
+ return configuration.cachePersistence();
+ }
+
+ @Override
public boolean registerDescriptors() {
return true;
}
@@ -557,6 +578,11 @@ public class SegmentNodeStoreService {
}
@Override
+ public PersistentCache getPersistentCache() {
+ return persistentCache;
+ }
+
+ @Override
public BundleContext getBundleContext() {
return context.getBundleContext();
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public abstract class AbstractPersistentCache implements PersistentCache, Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class);
+
+ public static final int THREADS = Integer.getInteger("oak.segment.cache.threads", 10);
+
+ protected ExecutorService executor;
+ protected AtomicLong cacheSize = new AtomicLong(0);
+ protected PersistentCache nextCache;
+ protected final HashSet<String> writesPending;
+
+ public AbstractPersistentCache() {
+ executor = Executors.newFixedThreadPool(THREADS);
+ writesPending = new HashSet<>();
+ }
+
+ public PersistentCache linkWith(PersistentCache nextCache) {
+ this.nextCache = nextCache;
+ return nextCache;
+ }
+
+ @Override
+ public void close() {
+ try {
+ executor.shutdown();
+ if (executor.awaitTermination(60, SECONDS)) {
+ logger.debug("The persistent cache scheduler was successfully shut down");
+ } else {
+ logger.warn("The persistent cache scheduler takes too long to shut down");
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Interrupt while shutting down the persistent cache scheduler", e);
+ currentThread().interrupt();
+ }
+ }
+
+ public int getWritesPending() {
+ synchronized (writesPending) {
+ return writesPending.size();
+ }
+ }
+
+ protected boolean lockSegmentWrite(String segmentId) {
+ synchronized (writesPending) {
+ return writesPending.add(segmentId);
+ }
+ }
+
+ protected void unlockSegmentWrite(String segmentId) {
+ synchronized (writesPending) {
+ writesPending.remove(segmentId);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class CachingArchiveManager implements SegmentArchiveManager {
+
+ private final SegmentArchiveManager delegate;
+
+ private final PersistentCache persistentCache;
+
+ public CachingArchiveManager(PersistentCache persistentCache, SegmentArchiveManager delegate) {
+ this.delegate = delegate;
+ this.persistentCache = persistentCache;
+ }
+
+ @Override
+ public @NotNull List<String> listArchives() throws IOException {
+ return delegate.listArchives();
+ }
+
+ @Override
+ public @Nullable SegmentArchiveReader open(@NotNull String archiveName) throws IOException {
+ return new CachingSegmentArchiveReader(persistentCache, delegate.open(archiveName));
+ }
+
+ @Override
+ public @Nullable SegmentArchiveReader forceOpen(String archiveName) throws IOException {
+ return new CachingSegmentArchiveReader(persistentCache, delegate.forceOpen(archiveName));
+ }
+
+ @Override
+ public @NotNull SegmentArchiveWriter create(@NotNull String archiveName) throws IOException {
+ return delegate.create(archiveName);
+ }
+
+ @Override
+ public boolean delete(@NotNull String archiveName) {
+ return delegate.delete(archiveName);
+ }
+
+ @Override
+ public boolean renameTo(@NotNull String from, @NotNull String to) {
+ return delegate.renameTo(from, to);
+ }
+
+ @Override
+ public void copyFile(@NotNull String from, @NotNull String to) throws IOException {
+ delegate.copyFile(from, to);
+ }
+
+ @Override
+ public boolean exists(@NotNull String archiveName) {
+ return delegate.exists(archiveName);
+ }
+
+ @Override
+ public void recoverEntries(@NotNull String archiveName, @NotNull LinkedHashMap<UUID, byte[]> entries) throws IOException {
+ delegate.recoverEntries(archiveName, entries);
+ }
+
+ @Override
+ public void backup(@NotNull String archiveName, @NotNull String backupArchiveName,
+ @NotNull Set<UUID> recoveredEntries) throws IOException {
+ delegate.backup(archiveName, backupArchiveName, recoveredEntries);
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+
+import java.io.IOException;
+
+public class CachingPersistence implements SegmentNodeStorePersistence {
+
+ private final SegmentNodeStorePersistence delegate;
+
+ private final PersistentCache persistentCache;
+
+ public CachingPersistence(PersistentCache persistentCache, SegmentNodeStorePersistence delegate) {
+ this.delegate = delegate;
+ this.persistentCache = persistentCache;
+ }
+
+ @Override
+ public SegmentArchiveManager createArchiveManager(boolean memoryMapping, boolean offHeapAccess, IOMonitor ioMonitor,
+ FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) throws IOException {
+ return new CachingArchiveManager(persistentCache, delegate.createArchiveManager(memoryMapping, offHeapAccess, ioMonitor, fileStoreMonitor, remoteStoreMonitor));
+ }
+
+ @Override
+ public boolean segmentFilesExist() {
+ return delegate.segmentFilesExist();
+ }
+
+ @Override
+ public JournalFile getJournalFile() {
+ return delegate.getJournalFile();
+ }
+
+ @Override
+ public GCJournalFile getGCJournalFile() throws IOException {
+ return delegate.getGCJournalFile();
+ }
+
+ @Override
+ public ManifestFile getManifestFile() throws IOException {
+ return delegate.getManifestFile();
+ }
+
+ @Override
+ public RepositoryLock lockRepository() throws IOException {
+ return delegate.lockRepository();
+ }
+
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CachingSegmentArchiveReader implements SegmentArchiveReader {
+
+ @NotNull
+ private final PersistentCache persistentCache;
+
+ @NotNull
+ private final SegmentArchiveReader delegate;
+
+ public CachingSegmentArchiveReader(
+ @NotNull PersistentCache persistentCache,
+ @NotNull SegmentArchiveReader delegate) {
+ this.persistentCache = persistentCache;
+ this.delegate = delegate;
+ }
+
+ @Override
+ @Nullable
+ public Buffer readSegment(long msb, long lsb) throws IOException {
+ Buffer buffer = persistentCache.readSegment(msb, lsb);
+ if (buffer == null) {
+ buffer = delegate.readSegment(msb, lsb);
+ if (buffer != null) {
+ persistentCache.writeSegment(msb, lsb, buffer);
+ }
+ }
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ if (persistentCache.containsSegment(msb, lsb)) {
+ return true;
+ } else {
+ return delegate.containsSegment(msb, lsb);
+ }
+ }
+
+ @Override
+ public List<SegmentArchiveEntry> listSegments() {
+ return delegate.listSegments();
+ }
+
+ @Override
+ @Nullable
+ public Buffer getGraph() throws IOException {
+ return delegate.getGraph();
+ }
+
+ @Override
+ public boolean hasGraph() {
+ return delegate.hasGraph();
+ }
+
+ @Override
+ @NotNull
+ public Buffer getBinaryReferences() throws IOException {
+ return delegate.getBinaryReferences();
+ }
+
+ @Override
+ public long length() {
+ return delegate.length();
+ }
+
+ @Override
+ @NotNull
+ public String getName() {
+ return delegate.getName();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return delegate.getEntrySize(size);
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java?rev=1880568&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java Tue Aug 4 14:13:48 2020
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This interface represents a cache which survives segment store restarts.
+ * The cache is agnostic to any archive structure. Segments are only
+ * identified by their UUIDs, specified as msb and lsb parts of the segment id.
+ */
+public interface PersistentCache {
+
+ /**
+ * Reads the segment from cache.
+ *
+ * @param msb the most significant bits of the identifier of the segment
+ * @param lsb the least significant bits of the identifier of the segment
+ * @return byte buffer containing the segment data or null if the segment doesn't exist
+ */
+ @Nullable
+ Buffer readSegment(long msb, long lsb);
+
+ /**
+ * Check if the segment exists in the cache.
+ *
+ * @param msb the most significant bits of the identifier of the segment
+ * @param lsb the least significant bits of the identifier of the segment
+ * @return {@code true} if the segment exists
+ */
+ boolean containsSegment(long msb, long lsb);
+
+ /**
+ * Writes the segment to the cache.
+ *
+ * @param msb the most significant bits of the identifier of the segment
+ * @param lsb the least significant bits of the identifier of the segment
+ * @param buffer the byte buffer containing the segment data
+ */
+ void writeSegment(long msb, long lsb, Buffer buffer);
+
+ /**
+ * Purges the cache entries according to the implementation policy (e.g. maximum
+ * cache size, maximum number of entries, etc.)
+ */
+ void cleanUp();
+}