You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/06/07 07:32:19 UTC
svn commit: r1747160 - in
/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment:
./ file/
Author: mduerig
Date: Tue Jun 7 07:32:19 2016
New Revision: 1747160
URL: http://svn.apache.org/viewvc?rev=1747160&view=rev
Log:
OAK-4277: Finalise de-duplication caches
- Invalidate caches through an (extension of) the GCMonitor instead of relying on explicit method calls
- Refactor NodeCache and RecordCache
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java Tue Jun 7 07:32:19 2016
@@ -22,6 +22,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.api.Type.BINARY;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.RecordCache.newRecordCache;
import java.io.IOException;
import java.io.InputStream;
@@ -117,8 +118,7 @@ public class Compactor {
}
}
- private final RecordCache<RecordId> cache = RecordCache.<RecordId> factory(
- cacheSize).get();
+ private final RecordCache<RecordId> cache = newRecordCache(cacheSize);
public Compactor(SegmentReader reader, SegmentWriter writer,
BlobStore blobStore, Supplier<Boolean> cancel, SegmentGCOptions gc) {
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java Tue Jun 7 07:32:19 2016
@@ -28,6 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
import com.google.common.base.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,95 +39,120 @@ import org.slf4j.LoggerFactory;
// implement configuration, monitoring and management
// add unit tests
// document, nullability
-public class NodeCache {
- private static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
-
- private final int capacity;
- private final List<Map<String, RecordId>> nodes;
+public abstract class NodeCache {
- private int size;
+ public abstract void put(String key, RecordId value, int depth);
- private final Set<Integer> muteDepths = newHashSet();
+ @CheckForNull
+ public abstract RecordId get(String key);
- public static final Supplier<NodeCache> factory(final int capacity, final int maxDepth) {
- return new Supplier<NodeCache>() {
- @Override
- public NodeCache get() {
- return new NodeCache(capacity, maxDepth);
- }
- };
+ @Nonnull
+ public static NodeCache newNodeCache(int capacity, int maxDepth) {
+ if (capacity <= 0) {
+ return new NodeCache.Empty();
+ } else {
+ return new NodeCache.Default(capacity, maxDepth);
+ }
}
- public static final Supplier<NodeCache> empty() {
- return new Supplier<NodeCache>() {
- @Override
- public NodeCache get() {
- return new NodeCache() {
- @Override
- public synchronized void put(String key, RecordId value, int depth) { }
-
- @Override
- public synchronized RecordId get(String key) { return null; }
- };
- }
- };
+ @Nonnull
+ public static Supplier<NodeCache> factory(int capacity, int maxDepth) {
+ if (capacity <= 0) {
+ return NodeCache.Empty.supplier();
+ } else {
+ return NodeCache.Default.supplier(capacity, maxDepth);
+ }
}
- /* Internal constructor used by empty() only */
- private NodeCache() {
- this.capacity = 0;
- this.nodes = null;
+ private static class Empty extends NodeCache {
+ static final Supplier<NodeCache> supplier() {
+ return new Supplier<NodeCache>() {
+ @Override
+ public NodeCache get() {
+ return new NodeCache.Empty();
+ }
+ };
+ }
+
+ @Override
+ public synchronized void put(String key, RecordId value, int depth) { }
+
+ @Override
+ public synchronized RecordId get(String key) { return null; }
}
- public NodeCache(int capacity, int maxDepth) {
- checkArgument(capacity > 0);
- checkArgument(maxDepth > 0);
- this.capacity = capacity;
- this.nodes = newArrayList();
- for (int k = 0; k < maxDepth; k++) {
- nodes.add(new HashMap<String, RecordId>());
+ private static class Default extends NodeCache {
+ private static final Logger LOG = LoggerFactory.getLogger(Default.class);
+
+ private final int capacity;
+ private final List<Map<String, RecordId>> nodes;
+
+ private int size;
+
+ private final Set<Integer> muteDepths = newHashSet();
+
+ static final Supplier<NodeCache> supplier(final int capacity, final int size) {
+ return new Supplier<NodeCache>() {
+ @Override
+ public NodeCache get() {
+ return new NodeCache.Default(capacity, size);
+ }
+ };
}
- }
- public synchronized void put(String key, RecordId value, int depth) {
- // FIXME OAK-4277: Finalise de-duplication caches
- // Validate and optimise the eviction strategy.
- // Nodes with many children should probably get a boost to
- // protecting them from preemptive eviction. Also it might be
- // necessary to implement pinning (e.g. for checkpoints).
- while (size >= capacity) {
- int d = nodes.size() - 1;
- int removed = nodes.remove(d).size();
- size -= removed;
- if (removed > 0) {
- // FIXME OAK-4165: Too verbose logging during revision gc
- LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
- "New size is {}", d, size + removed, capacity, size);
+ Default(int capacity, int maxDepth) {
+ checkArgument(capacity > 0);
+ checkArgument(maxDepth > 0);
+ this.capacity = capacity;
+ this.nodes = newArrayList();
+ for (int k = 0; k < maxDepth; k++) {
+ nodes.add(new HashMap<String, RecordId>());
}
}
- if (depth < nodes.size()) {
- if (nodes.get(depth).put(key, value) == null) {
- size++;
+ @Override
+ public synchronized void put(String key, RecordId value, int depth) {
+ // FIXME OAK-4277: Finalise de-duplication caches
+ // Validate and optimise the eviction strategy.
+ // Nodes with many children should probably get a boost to
+ // protecting them from preemptive eviction. Also it might be
+ // necessary to implement pinning (e.g. for checkpoints).
+ while (size >= capacity) {
+ int d = nodes.size() - 1;
+ int removed = nodes.remove(d).size();
+ size -= removed;
+ if (removed > 0) {
+ // FIXME OAK-4165: Too verbose logging during revision gc
+ LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
+ "New size is {}", d, size + removed, capacity, size);
+ }
}
- } else {
- if (muteDepths.add(depth)) {
- LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
- key, value, depth, nodes.size());
+
+ if (depth < nodes.size()) {
+ if (nodes.get(depth).put(key, value) == null) {
+ size++;
+ }
+ } else {
+ if (muteDepths.add(depth)) {
+ LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
+ key, value, depth, nodes.size());
+ }
}
}
- }
- public synchronized RecordId get(String key) {
- for (Map<String, RecordId> map : nodes) {
- if (!map.isEmpty()) {
- RecordId recordId = map.get(key);
- if (recordId != null) {
- return recordId;
+ @Override
+ public synchronized RecordId get(String key) {
+ for (Map<String, RecordId> map : nodes) {
+ if (!map.isEmpty()) {
+ RecordId recordId = map.get(key);
+ if (recordId != null) {
+ return recordId;
+ }
}
}
+ return null;
}
- return null;
+
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java Tue Jun 7 07:32:19 2016
@@ -22,53 +22,86 @@ package org.apache.jackrabbit.oak.segmen
import java.util.LinkedHashMap;
import java.util.Map;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
import com.google.common.base.Supplier;
// FIXME OAK-4277: Finalise de-duplication caches
// implement configuration, monitoring and management
// add unit tests
// document, nullability
-public class RecordCache<T> {
- private final Map<T, RecordId> records;
+public abstract class RecordCache<T> {
- public static final <T> Supplier<RecordCache<T>> factory(final int size) {
- return new Supplier<RecordCache<T>>() {
- @Override
- public RecordCache<T> get() {
- return new RecordCache<>(size);
- }
- };
- }
+ public abstract void put(T key, RecordId value);
+
+ @CheckForNull
+ public abstract RecordId get(T key);
- public static final <T> Supplier<RecordCache<T>> empty() {
- return new Supplier<RecordCache<T>>() {
- @Override
- public RecordCache<T> get() {
- return new RecordCache<T>(0) {
- @Override
- public synchronized void put(T key, RecordId value) { }
-
- @Override
- public synchronized RecordId get(T key) { return null; }
- };
- }
- };
+ @Nonnull
+ public static <T> RecordCache<T> newRecordCache(int size) {
+ if (size <= 0) {
+ return new Empty<>();
+ } else {
+ return new Default<>(size);
+ }
}
- public RecordCache(final int size) {
- records = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
- @Override
- protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
- return size() >= size;
- }
- };
+ @Nonnull
+ public static <T> Supplier<RecordCache<T>> factory(int size) {
+ if (size <= 0) {
+ return Empty.supplier();
+ } else {
+ return Default.supplier(size);
+ }
}
- public synchronized void put(T key, RecordId value) {
- records.put(key, value);
+ private static class Empty<T> extends RecordCache<T> {
+ static final <T> Supplier<RecordCache<T>> supplier() {
+ return new Supplier<RecordCache<T>>() {
+ @Override
+ public RecordCache<T> get() {
+ return new Empty<>();
+ }
+ };
+ }
+
+ @Override
+ public synchronized void put(T key, RecordId value) { }
+
+ @Override
+ public synchronized RecordId get(T key) { return null; }
}
- public synchronized RecordId get(T key) {
- return records.get(key);
+ private static class Default<T> extends RecordCache<T> {
+ private final Map<T, RecordId> records;
+
+ static final <T> Supplier<RecordCache<T>> supplier(final int size) {
+ return new Supplier<RecordCache<T>>() {
+ @Override
+ public RecordCache<T> get() {
+ return new Default<>(size);
+ }
+ };
+ }
+
+ Default(final int size) {
+ records = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
+ return size() >= size;
+ }
+ };
+ }
+
+ @Override
+ public synchronized void put(T key, RecordId value) {
+ records.put(key, value);
+ }
+
+ @Override
+ public synchronized RecordId get(T key) {
+ return records.get(key);
+ }
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Tue Jun 7 07:32:19 2016
@@ -57,7 +57,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jcr.PropertyType;
-import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.Closeables;
@@ -127,12 +126,6 @@ public class SegmentWriter {
this.writeOperationHandler = checkNotNull(writeOperationHandler);
}
- // FIXME OAK-4277: Finalise de-duplication caches
- // There should be a cleaner way to control the deduplication caches across gc generations
- public void evictCaches(Predicate<Integer> evict) {
- cacheManager.evictCaches(evict);
- }
-
public void flush() throws IOException {
writeOperationHandler.flush();
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java Tue Jun 7 07:32:19 2016
@@ -20,13 +20,13 @@
package org.apache.jackrabbit.oak.segment;
import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.Integer.getInteger;
import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
import javax.annotation.Nonnull;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import org.apache.jackrabbit.oak.segment.WriterCacheManager.Empty;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.http.HttpStore;
import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
@@ -38,11 +38,6 @@ import org.apache.jackrabbit.oak.segment
* was specified (default).
*/
public final class SegmentWriterBuilder {
- private static final int STRING_RECORDS_CACHE_SIZE = getInteger(
- "oak.segment.writer.stringsCacheSize", 15000);
-
- private static final int TPL_RECORDS_CACHE_SIZE = getInteger(
- "oak.segment.writer.templatesCacheSize", 3000);
@Nonnull
private final String name;
@@ -56,16 +51,7 @@ public final class SegmentWriterBuilder
private boolean pooled = false;
@Nonnull
- private WriterCacheManager cacheManager = WriterCacheManager.Default.create(
- STRING_RECORDS_CACHE_SIZE <= 0
- ? RecordCache.<String>empty()
- : RecordCache.<String>factory(STRING_RECORDS_CACHE_SIZE),
- TPL_RECORDS_CACHE_SIZE <= 0
- ? RecordCache.<Template>empty()
- : RecordCache.<Template>factory(TPL_RECORDS_CACHE_SIZE),
- // FIXME OAK-4277: Finalise de-duplication caches: make sizes and depth configurable
- NodeCache.factory(1000000, 20));
-
+ private WriterCacheManager cacheManager = new WriterCacheManager.Default();
private SegmentWriterBuilder(@Nonnull String name) { this.name = checkNotNull(name); }
@@ -131,7 +117,7 @@ public final class SegmentWriterBuilder
@Nonnull
public SegmentWriterBuilder withoutCache() {
- this.cacheManager = WriterCacheManager.Empty.create();
+ this.cacheManager = Empty.INSTANCE;
return this;
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java Tue Jun 7 07:32:19 2016
@@ -22,6 +22,8 @@ package org.apache.jackrabbit.oak.segmen
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.collect.Maps.newConcurrentMap;
+import static java.lang.Integer.getInteger;
+import static org.apache.jackrabbit.oak.segment.RecordCache.newRecordCache;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
@@ -35,28 +37,30 @@ import com.google.common.base.Supplier;
// implement configuration, monitoring and management
// add unit tests
// document, nullability
-public interface WriterCacheManager {
+public abstract class WriterCacheManager {
+ private static final int DEFAULT_STRING_CACHE_SIZE = getInteger(
+ "oak.tar.stringsCacheSize", 15000);
- @Nonnull
- RecordCache<String> getStringCache(int generation);
+ private static final int DEFAULT_TEMPLATE_CACHE_SIZE = getInteger(
+ "oak.tar.templatesCacheSize", 3000);
@Nonnull
- RecordCache<Template> getTemplateCache(int generation);
+ public abstract RecordCache<String> getStringCache(int generation);
@Nonnull
- NodeCache getNodeCache(int generation);
+ public abstract RecordCache<Template> getTemplateCache(int generation);
- void evictCaches(Predicate<Integer> generations);
+ @Nonnull
+ public abstract NodeCache getNodeCache(int generation);
- class Empty implements WriterCacheManager {
- private static final WriterCacheManager EMPTY = new Empty();
- private final RecordCache<String> stringCache = RecordCache.<String>empty().get();
- private final RecordCache<Template> templateCache = RecordCache.<Template>empty().get();
- private final NodeCache nodeCache = NodeCache.empty().get();
+ public static class Empty extends WriterCacheManager {
+ public static final WriterCacheManager INSTANCE = new Empty();
- public static WriterCacheManager create() { return EMPTY; }
+ private final RecordCache<String> stringCache = newRecordCache(0);
+ private final RecordCache<Template> templateCache = newRecordCache(0);
+ private final NodeCache nodeCache = NodeCache.newNodeCache(0, 0);
- private Empty(){}
+ private Empty() {}
@Override
public RecordCache<String> getStringCache(int generation) {
@@ -72,12 +76,9 @@ public interface WriterCacheManager {
public NodeCache getNodeCache(int generation) {
return nodeCache;
}
-
- @Override
- public void evictCaches(Predicate<Integer> generations) { }
}
- class Default implements WriterCacheManager {
+ public static class Default extends WriterCacheManager {
/**
* Cache of recently stored string records, used to avoid storing duplicates
* of frequently occurring data.
@@ -96,14 +97,7 @@ public interface WriterCacheManager {
*/
private final Generation<NodeCache> nodeCaches;
- public static WriterCacheManager create(
- @Nonnull Supplier<RecordCache<String>> stringCacheFactory,
- @Nonnull Supplier<RecordCache<Template>> templateCacheFactory,
- @Nonnull Supplier<NodeCache> nodeCacheFactory) {
- return new Default(stringCacheFactory, templateCacheFactory, nodeCacheFactory);
- }
-
- private Default(
+ public Default(
@Nonnull Supplier<RecordCache<String>> stringCacheFactory,
@Nonnull Supplier<RecordCache<Template>> templateCacheFactory,
@Nonnull Supplier<NodeCache> nodeCacheFactory) {
@@ -112,6 +106,12 @@ public interface WriterCacheManager {
this.nodeCaches = new Generation<>(nodeCacheFactory);
}
+ public Default() {
+ this(RecordCache.<String>factory(DEFAULT_STRING_CACHE_SIZE),
+ RecordCache.<Template>factory(DEFAULT_TEMPLATE_CACHE_SIZE),
+ NodeCache.factory(1000000, 20));
+ }
+
private static class Generation<T> {
private final ConcurrentMap<Integer, Supplier<T>> generations = newConcurrentMap();
private final Supplier<T> cacheFactory;
@@ -156,8 +156,7 @@ public interface WriterCacheManager {
return nodeCaches.getGeneration(generation);
}
- @Override
- public void evictCaches(Predicate<Integer> generations) {
+ protected void evictCaches(Predicate<Integer> generations) {
stringCaches.evictGenerations(generations);
templateCaches.evictGenerations(generations);
nodeCaches.evictGenerations(generations);
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Tue Jun 7 07:32:19 2016
@@ -39,6 +39,8 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
import static org.apache.jackrabbit.oak.segment.SegmentWriterBuilder.segmentWriterBuilder;
+import static org.apache.jackrabbit.oak.segment.file.GCListener.Status.FAILURE;
+import static org.apache.jackrabbit.oak.segment.file.GCListener.Status.SUCCESS;
import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
import java.io.Closeable;
@@ -91,10 +93,9 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentVersion;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.WriterCacheManager;
+import org.apache.jackrabbit.oak.segment.WriterCacheManager.Empty;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -194,9 +195,9 @@ public class FileStore implements Segmen
private final SegmentVersion version;
/**
- * {@code GCMonitor} monitoring this instance's gc progress
+ * {@code GcListener} listening to this instance's gc progress
*/
- private final GCMonitor gcMonitor;
+ private final GCListener gcListener;
/**
* Represents the approximate size on disk of the repository.
@@ -259,11 +260,12 @@ public class FileStore implements Segmen
.with(version)
.withGeneration(getGeneration)
.withWriterPool()
+ .with(builder.getCacheManager())
.build(this);
this.directory = builder.getDirectory();
this.maxFileSize = builder.getMaxFileSize() * MB;
this.memoryMapping = builder.getMemoryMapping();
- this.gcMonitor = builder.getGcMonitor();
+ this.gcListener = builder.getGcListener();
this.gcOptions = builder.getGcOptions();
Map<Integer, Map<Character, File>> map = collectFiles(directory);
@@ -374,7 +376,7 @@ public class FileStore implements Segmen
}
public void maybeCompact(boolean cleanup) throws IOException {
- gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
+ gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
Runtime runtime = Runtime.getRuntime();
long avail = runtime.totalMemory() - runtime.freeMemory();
@@ -383,7 +385,7 @@ public class FileStore implements Segmen
long delta = 0;
long needed = delta * gcOptions.getMemoryThreshold();
if (needed >= avail) {
- gcMonitor.skipped(
+ gcListener.skipped(
"TarMK GC #{}: not enough available memory {} ({} bytes), needed {} ({} bytes)," +
" last merge delta {} ({} bytes), so skipping compaction for now",
GC_COUNT,
@@ -400,22 +402,22 @@ public class FileStore implements Segmen
int gainThreshold = gcOptions.getGainThreshold();
boolean sufficientEstimatedGain = true;
if (gainThreshold <= 0) {
- gcMonitor.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
+ gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
GC_COUNT, gainThreshold);
} else if (gcOptions.isPaused()) {
- gcMonitor.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
+ gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
} else {
- gcMonitor.info("TarMK GC #{}: estimation started", GC_COUNT);
+ gcListener.info("TarMK GC #{}: estimation started", GC_COUNT);
Supplier<Boolean> shutdown = newShutdownSignal();
CompactionGainEstimate estimate = estimateCompactionGain(shutdown);
if (shutdown.get()) {
- gcMonitor.info("TarMK GC #{}: estimation interrupted. Skipping compaction.", GC_COUNT);
+ gcListener.info("TarMK GC #{}: estimation interrupted. Skipping compaction.", GC_COUNT);
}
long gain = estimate.estimateCompactionGain();
sufficientEstimatedGain = gain >= gainThreshold;
if (sufficientEstimatedGain) {
- gcMonitor.info(
+ gcListener.info(
"TarMK GC #{}: estimation completed in {} ({} ms). " +
"Gain is {}% or {}/{} ({}/{} bytes), so running compaction",
GC_COUNT, watch, watch.elapsed(MILLISECONDS), gain,
@@ -423,12 +425,12 @@ public class FileStore implements Segmen
estimate.getReachableSize(), estimate.getTotalSize());
} else {
if (estimate.getTotalSize() == 0) {
- gcMonitor.skipped(
+ gcListener.skipped(
"TarMK GC #{}: estimation completed in {} ({} ms). " +
"Skipping compaction for now as repository consists of a single tar file only",
GC_COUNT, watch, watch.elapsed(MILLISECONDS));
} else {
- gcMonitor.skipped(
+ gcListener.skipped(
"TarMK GC #{}: estimation completed in {} ({} ms). " +
"Gain is {}% or {}/{} ({}/{} bytes), so skipping compaction for now",
GC_COUNT, watch, watch.elapsed(MILLISECONDS), gain,
@@ -444,7 +446,7 @@ public class FileStore implements Segmen
cleanupNeeded.set(cleanup);
}
} else {
- gcMonitor.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
+ gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
}
}
}
@@ -652,7 +654,7 @@ public class FileStore implements Segmen
fileStoreLock.writeLock().lock();
try {
- gcMonitor.info("TarMK GC #{}: cleanup started. Current repository size is {} ({} bytes)",
+ gcListener.info("TarMK GC #{}: cleanup started. Current repository size is {} ({} bytes)",
GC_COUNT, humanReadableByteCount(initialSize), initialSize);
newWriter();
@@ -681,14 +683,14 @@ public class FileStore implements Segmen
log.info("{}: size of bulk references/reclaim set {}/{}",
reader, bulkRefs.size(), reclaim.size());
if (shutdown) {
- gcMonitor.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+ gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
break;
}
}
for (TarReader reader : cleaned.keySet()) {
cleaned.put(reader, reader.sweep(reclaim));
if (shutdown) {
- gcMonitor.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+ gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
break;
}
}
@@ -723,7 +725,7 @@ public class FileStore implements Segmen
for (TarReader oldReader : oldReaders) {
closeAndLogOnFail(oldReader);
File file = oldReader.getFile();
- gcMonitor.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
+ gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
toRemove.addLast(file);
}
@@ -731,8 +733,8 @@ public class FileStore implements Segmen
approximateSize.set(finalSize);
stats.reclaimed(initialSize - finalSize);
// FIXME OAK-4106: Reclaimed size reported by FileStore.cleanup is off
- gcMonitor.cleaned(initialSize - finalSize, finalSize);
- gcMonitor.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
+ gcListener.cleaned(initialSize - finalSize, finalSize);
+ gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
" and space reclaimed {} ({} bytes).",
GC_COUNT, watch, watch.elapsed(MILLISECONDS),
humanReadableByteCount(finalSize), finalSize,
@@ -833,7 +835,7 @@ public class FileStore implements Segmen
* @return {@code true} if compaction succeeded, {@code false} otherwise.
*/
public boolean compact() throws IOException {
- gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
+ gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
Stopwatch watch = Stopwatch.createStarted();
SegmentNodeState before = segmentReader.readHeadState();
@@ -842,7 +844,7 @@ public class FileStore implements Segmen
if (existing > 1) {
// FIXME OAK-4371: Overly zealous warning about checkpoints on compaction
// Make the number of checkpoints configurable above which the warning should be issued?
- gcMonitor.warn(
+ gcListener.warn(
"TarMK GC #{}: compaction found {} checkpoints, you might need to run checkpoint cleanup",
GC_COUNT, existing);
}
@@ -853,11 +855,11 @@ public class FileStore implements Segmen
Supplier<Boolean> cancel = newCancelCompactionCondition();
SegmentNodeState after = compact(bufferWriter, before, cancel);
if (after == null) {
- gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ gcListener.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
return false;
}
- gcMonitor.info("TarMK GC #{}: compacted {} to {}",
+ gcListener.info("TarMK GC #{}: compacted {} to {}",
GC_COUNT, before.getRecordId(), after.getRecordId());
try {
@@ -868,27 +870,27 @@ public class FileStore implements Segmen
// Some other concurrent changes have been made.
// Rebase (and compact) those changes on top of the
// compacted state before retrying to set the head.
- gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
+ gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
"Compacting these commits. Cycle {}", GC_COUNT, cycles);
SegmentNodeState head = segmentReader.readHeadState();
after = compact(bufferWriter, head, cancel);
if (after == null) {
- gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ gcListener.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
return false;
}
- gcMonitor.info("TarMK GC #{}: compacted {} against {} to {}",
+ gcListener.info("TarMK GC #{}: compacted {} against {} to {}",
GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
before = head;
}
if (!success) {
- gcMonitor.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
+ gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
GC_COUNT, cycles - 1);
if (gcOptions.getForceAfterFail()) {
- gcMonitor.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
+ gcListener.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
success = forceCompact(bufferWriter, cancel);
if (!success) {
- gcMonitor.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
+ gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
"Most likely compaction didn't get exclusive access to the store or was " +
"prematurely cancelled. Cleaning up.",
GC_COUNT);
@@ -903,42 +905,27 @@ public class FileStore implements Segmen
}
if (success) {
- segmentWriter.evictCaches(new Predicate<Integer>() {
- @Override
- public boolean apply(Integer generation) {
- return generation < newGeneration;
- }
- });
-
// FIXME OAK-4285: Align cleanup of segment id tables with the new cleanup strategy
// ith clean brutal we need to remove those ids that have been cleaned
// i.e. those whose segment was from an old generation
tracker.clearSegmentIdTables(Predicates.<SegmentId>alwaysFalse());
- // FIXME OAK-4283: Align GCMonitor API with implementation
- // Refactor GCMonitor: there is no more compaction map stats
- gcMonitor.compacted(new long[]{}, new long[]{}, new long[]{});
-
- gcMonitor.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
+ gcListener.compacted(SUCCESS, newGeneration);
+ gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
return true;
} else {
- segmentWriter.evictCaches(new Predicate<Integer>() {
- @Override
- public boolean apply(Integer generation) {
- return generation == newGeneration;
- }
- });
- gcMonitor.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
+ gcListener.compacted(FAILURE, newGeneration);
+ gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
return false;
}
} catch (InterruptedException e) {
- gcMonitor.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
+ gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
currentThread().interrupt();
return false;
} catch (Exception e) {
- gcMonitor.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
+ gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
return false;
}
}
@@ -948,7 +935,7 @@ public class FileStore implements Segmen
throws IOException {
if (gcOptions.isOffline()) {
SegmentWriter writer = new SegmentWriter(this, segmentReader,
- blobStore, tracker, WriterCacheManager.Empty.create(), bufferWriter);
+ blobStore, tracker, Empty.INSTANCE, bufferWriter);
return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions)
.compact(EMPTY_NODE, head, EMPTY_NODE);
} else {
@@ -968,13 +955,13 @@ public class FileStore implements Segmen
SegmentNodeState after = compact(bufferWriter,
segmentReader.readNode(base), cancel);
if (after == null) {
- gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ gcListener.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
return null;
} else {
return after.getRecordId();
}
} catch (IOException e) {
- gcMonitor.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
+ gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
return null;
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java Tue Jun 7 07:32:19 2016
@@ -31,11 +31,13 @@ import java.io.IOException;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.segment.SegmentVersion;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.WriterCacheManager;
import org.apache.jackrabbit.oak.segment.compaction.LoggingGCMonitor;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore;
@@ -78,6 +80,66 @@ public class FileStoreBuilder {
@Nonnull
private SegmentGCOptions gcOptions = SegmentGCOptions.DEFAULT;
+ @Nonnull
+ private GCListener gcListener;
+
+ @Nonnull
+ private final WriterCacheManager cacheManager = new WriterCacheManager.Default() {{
+ gcListener = new GCListener() {
+ @Override
+ public void info(String message, Object... arguments) {
+ gcMonitor.info(message, arguments);
+ }
+
+ @Override
+ public void warn(String message, Object... arguments) {
+ gcMonitor.warn(message, arguments);
+ }
+
+ @Override
+ public void error(String message, Exception exception) {
+ gcMonitor.error(message, exception);
+ }
+
+ @Override
+ public void skipped(String reason, Object... arguments) {
+ gcMonitor.skipped(reason, arguments);
+ }
+
+ @Override
+ public void compacted(long[] segmentCounts, long[] recordCounts, long[] compactionMapWeights) {
+ gcMonitor.compacted(segmentCounts, recordCounts, compactionMapWeights);
+ }
+
+ @Override
+ public void cleaned(long reclaimedSize, long currentSize) {
+ gcMonitor.cleaned(reclaimedSize, currentSize);
+ }
+
+ @Override
+ public void compacted(@Nonnull Status status, final int newGeneration) {
+ switch (status) {
+ case SUCCESS:
+ evictCaches(new Predicate<Integer>() {
+ @Override
+ public boolean apply(Integer generation) {
+ return generation < newGeneration;
+ }
+ });
+ break;
+ case FAILURE:
+ evictCaches(new Predicate<Integer>() {
+ @Override
+ public boolean apply(Integer generation) {
+ return generation == newGeneration;
+ }
+ });
+ break;
+ }
+ }
+ };
+ }};
+
@CheckForNull
private TarRevisions revisions;
@@ -277,8 +339,8 @@ public class FileStoreBuilder {
}
@Nonnull
- DelegatingGCMonitor getGcMonitor() {
- return gcMonitor;
+ GCListener getGcListener() {
+ return gcListener;
}
@Nonnull
@@ -301,4 +363,9 @@ public class FileStoreBuilder {
checkState(revisions != null, "File store not yet built");
return revisions;
}
+
+ @Nonnull
+ WriterCacheManager getCacheManager() {
+ return cacheManager;
+ }
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java?rev=1747160&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java Tue Jun 7 07:32:19 2016
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+
+// FIXME OAK-4283: Align GCMonitor API with implementation: Unify with GCMonitor
+public interface GCListener extends GCMonitor {
+ enum Status {SUCCESS, FAILURE}
+ void compacted(@Nonnull Status status, int generation);
+}