You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/06/07 07:32:19 UTC

svn commit: r1747160 - in /jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment: ./ file/

Author: mduerig
Date: Tue Jun  7 07:32:19 2016
New Revision: 1747160

URL: http://svn.apache.org/viewvc?rev=1747160&view=rev
Log:
OAK-4277: Finalise de-duplication caches
- Invalidate caches through an (extension of) the GCMonitor instead of relying on explicit method calls
- Refactor NodeCache and RecordCache

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java Tue Jun  7 07:32:19 2016
@@ -22,6 +22,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.RecordCache.newRecordCache;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -117,8 +118,7 @@ public class Compactor {
         }
     }
 
-    private final RecordCache<RecordId> cache = RecordCache.<RecordId> factory(
-            cacheSize).get();
+    private final RecordCache<RecordId> cache = newRecordCache(cacheSize);
 
     public Compactor(SegmentReader reader, SegmentWriter writer,
             BlobStore blobStore, Supplier<Boolean> cancel, SegmentGCOptions gc) {

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java Tue Jun  7 07:32:19 2016
@@ -28,6 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
 import com.google.common.base.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,95 +39,120 @@ import org.slf4j.LoggerFactory;
 // implement configuration, monitoring and management
 // add unit tests
 // document, nullability
-public class NodeCache {
-    private static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
-
-    private final int capacity;
-    private final List<Map<String, RecordId>> nodes;
+public abstract class NodeCache {
 
-    private int size;
+    public abstract void put(String key, RecordId value, int depth);
 
-    private final Set<Integer> muteDepths = newHashSet();
+    @CheckForNull
+    public abstract RecordId get(String key);
 
-    public static final Supplier<NodeCache> factory(final int capacity, final int maxDepth) {
-        return new Supplier<NodeCache>() {
-            @Override
-            public NodeCache get() {
-                return new NodeCache(capacity, maxDepth);
-            }
-        };
+    @Nonnull
+    public static NodeCache newNodeCache(int capacity, int maxDepth) {
+        if (capacity <= 0) {
+            return new NodeCache.Empty();
+        } else {
+            return new NodeCache.Default(capacity, maxDepth);
+        }
     }
 
-    public static final Supplier<NodeCache> empty() {
-        return new Supplier<NodeCache>() {
-            @Override
-            public NodeCache get() {
-                return new NodeCache() {
-                    @Override
-                    public synchronized void put(String key, RecordId value, int depth) { }
-
-                    @Override
-                    public synchronized RecordId get(String key) { return null; }
-                };
-            }
-        };
+    @Nonnull
+    public static Supplier<NodeCache> factory(int capacity, int maxDepth) {
+        if (capacity <= 0) {
+            return NodeCache.Empty.supplier();
+        } else {
+            return NodeCache.Default.supplier(capacity, maxDepth);
+        }
     }
 
-    /* Internal constructor used by empty() only */
-    private NodeCache() {
-        this.capacity = 0;
-        this.nodes = null;
+    private static class Empty extends NodeCache {
+        static final Supplier<NodeCache> supplier() {
+            return  new Supplier<NodeCache>() {
+                @Override
+                public NodeCache get() {
+                    return new NodeCache.Empty();
+                }
+            };
+        }
+
+        @Override
+        public synchronized void put(String key, RecordId value, int depth) { }
+
+        @Override
+        public synchronized RecordId get(String key) { return null; }
     }
 
-    public NodeCache(int capacity, int maxDepth) {
-        checkArgument(capacity > 0);
-        checkArgument(maxDepth > 0);
-        this.capacity = capacity;
-        this.nodes = newArrayList();
-        for (int k = 0; k < maxDepth; k++) {
-            nodes.add(new HashMap<String, RecordId>());
+    private static class Default extends NodeCache {
+        private static final Logger LOG = LoggerFactory.getLogger(Default.class);
+
+        private final int capacity;
+        private final List<Map<String, RecordId>> nodes;
+
+        private int size;
+
+        private final Set<Integer> muteDepths = newHashSet();
+
+        static final Supplier<NodeCache> supplier(final int capacity, final int size) {
+            return new Supplier<NodeCache>() {
+                @Override
+                public NodeCache get() {
+                    return new NodeCache.Default(capacity, size);
+                }
+            };
         }
-    }
 
-    public synchronized void put(String key, RecordId value, int depth) {
-        // FIXME OAK-4277: Finalise de-duplication caches
-        // Validate and optimise the eviction strategy.
-        // Nodes with many children should probably get a boost to
-        // protecting them from preemptive eviction. Also it might be
-        // necessary to implement pinning (e.g. for checkpoints).
-        while (size >= capacity) {
-            int d = nodes.size() - 1;
-            int removed = nodes.remove(d).size();
-            size -= removed;
-            if (removed > 0) {
-                // FIXME OAK-4165: Too verbose logging during revision gc
-                LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
-                    "New size is {}", d, size + removed, capacity, size);
+        Default(int capacity, int maxDepth) {
+            checkArgument(capacity > 0);
+            checkArgument(maxDepth > 0);
+            this.capacity = capacity;
+            this.nodes = newArrayList();
+            for (int k = 0; k < maxDepth; k++) {
+                nodes.add(new HashMap<String, RecordId>());
             }
         }
 
-        if (depth < nodes.size()) {
-            if (nodes.get(depth).put(key, value) == null) {
-                size++;
+        @Override
+        public synchronized void put(String key, RecordId value, int depth) {
+            // FIXME OAK-4277: Finalise de-duplication caches
+            // Validate and optimise the eviction strategy.
+            // Nodes with many children should probably get a boost to
+            // protecting them from preemptive eviction. Also it might be
+            // necessary to implement pinning (e.g. for checkpoints).
+            while (size >= capacity) {
+                int d = nodes.size() - 1;
+                int removed = nodes.remove(d).size();
+                size -= removed;
+                if (removed > 0) {
+                    // FIXME OAK-4165: Too verbose logging during revision gc
+                    LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
+                            "New size is {}", d, size + removed, capacity, size);
+                }
             }
-        } else {
-            if (muteDepths.add(depth)) {
-                LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
-                    key, value, depth, nodes.size());
+
+            if (depth < nodes.size()) {
+                if (nodes.get(depth).put(key, value) == null) {
+                    size++;
+                }
+            } else {
+                if (muteDepths.add(depth)) {
+                    LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
+                            key, value, depth, nodes.size());
+                }
             }
         }
-    }
 
-    public synchronized RecordId get(String key) {
-        for (Map<String, RecordId> map : nodes) {
-            if (!map.isEmpty()) {
-                RecordId recordId = map.get(key);
-                if (recordId != null) {
-                    return recordId;
+        @Override
+        public synchronized RecordId get(String key) {
+            for (Map<String, RecordId> map : nodes) {
+                if (!map.isEmpty()) {
+                    RecordId recordId = map.get(key);
+                    if (recordId != null) {
+                        return recordId;
+                    }
                 }
             }
+            return null;
         }
-        return null;
+
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java Tue Jun  7 07:32:19 2016
@@ -22,53 +22,86 @@ package org.apache.jackrabbit.oak.segmen
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
 import com.google.common.base.Supplier;
 
 // FIXME OAK-4277: Finalise de-duplication caches
 // implement configuration, monitoring and management
 // add unit tests
 // document, nullability
-public class RecordCache<T> {
-    private final Map<T, RecordId> records;
+public abstract class RecordCache<T> {
 
-    public static final <T> Supplier<RecordCache<T>> factory(final int size) {
-        return new Supplier<RecordCache<T>>() {
-            @Override
-            public RecordCache<T> get() {
-                return new RecordCache<>(size);
-            }
-        };
-    }
+    public abstract void put(T key, RecordId value);
+
+    @CheckForNull
+    public abstract RecordId get(T key);
 
-    public static final <T> Supplier<RecordCache<T>> empty() {
-        return new Supplier<RecordCache<T>>() {
-            @Override
-            public RecordCache<T> get() {
-                return new RecordCache<T>(0) {
-                    @Override
-                    public synchronized void put(T key, RecordId value) { }
-
-                    @Override
-                    public synchronized RecordId get(T key) { return null; }
-                };
-            }
-        };
+    @Nonnull
+    public static <T> RecordCache<T> newRecordCache(int size) {
+        if (size <= 0) {
+            return new Empty<>();
+        } else {
+            return new Default<>(size);
+        }
     }
 
-    public RecordCache(final int size) {
-        records = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
-                return size() >= size;
-            }
-        };
+    @Nonnull
+    public static <T> Supplier<RecordCache<T>> factory(int size) {
+        if (size <= 0) {
+            return Empty.supplier();
+        } else {
+            return Default.supplier(size);
+        }
     }
 
-    public synchronized void put(T key, RecordId value) {
-        records.put(key, value);
+    private static class Empty<T> extends RecordCache<T> {
+        static final <T> Supplier<RecordCache<T>> supplier() {
+            return  new Supplier<RecordCache<T>>() {
+                @Override
+                public RecordCache<T> get() {
+                    return new Empty<>();
+                }
+            };
+        }
+
+        @Override
+        public synchronized void put(T key, RecordId value) { }
+
+        @Override
+        public synchronized RecordId get(T key) { return null; }
     }
 
-    public synchronized RecordId get(T key) {
-        return records.get(key);
+    private static class Default<T> extends RecordCache<T> {
+        private final Map<T, RecordId> records;
+
+        static final <T> Supplier<RecordCache<T>> supplier(final int size) {
+            return new Supplier<RecordCache<T>>() {
+                @Override
+                public RecordCache<T> get() {
+                    return new Default<>(size);
+                }
+            };
+        }
+
+        Default(final int size) {
+            records = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
+                    return size() >= size;
+                }
+            };
+        }
+
+        @Override
+        public synchronized void put(T key, RecordId value) {
+            records.put(key, value);
+        }
+
+        @Override
+        public synchronized RecordId get(T key) {
+            return records.get(key);
+        }
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Tue Jun  7 07:32:19 2016
@@ -57,7 +57,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.jcr.PropertyType;
 
-import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.io.Closeables;
@@ -127,12 +126,6 @@ public class SegmentWriter {
         this.writeOperationHandler = checkNotNull(writeOperationHandler);
     }
 
-    // FIXME OAK-4277: Finalise de-duplication caches
-    // There should be a cleaner way to control the deduplication caches across gc generations
-    public void evictCaches(Predicate<Integer> evict) {
-        cacheManager.evictCaches(evict);
-    }
-
     public void flush() throws IOException {
         writeOperationHandler.flush();
     }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriterBuilder.java Tue Jun  7 07:32:19 2016
@@ -20,13 +20,13 @@
 package org.apache.jackrabbit.oak.segment;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.Integer.getInteger;
 import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
 
 import javax.annotation.Nonnull;
 
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
+import org.apache.jackrabbit.oak.segment.WriterCacheManager.Empty;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.http.HttpStore;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
@@ -38,11 +38,6 @@ import org.apache.jackrabbit.oak.segment
  * was specified (default).
  */
 public final class SegmentWriterBuilder {
-    private static final int STRING_RECORDS_CACHE_SIZE = getInteger(
-            "oak.segment.writer.stringsCacheSize", 15000);
-
-    private static final int TPL_RECORDS_CACHE_SIZE = getInteger(
-            "oak.segment.writer.templatesCacheSize", 3000);
 
     @Nonnull
     private final String name;
@@ -56,16 +51,7 @@ public final class SegmentWriterBuilder
     private boolean pooled = false;
 
     @Nonnull
-    private WriterCacheManager cacheManager = WriterCacheManager.Default.create(
-        STRING_RECORDS_CACHE_SIZE <= 0
-            ? RecordCache.<String>empty()
-            : RecordCache.<String>factory(STRING_RECORDS_CACHE_SIZE),
-        TPL_RECORDS_CACHE_SIZE <= 0
-            ? RecordCache.<Template>empty()
-            : RecordCache.<Template>factory(TPL_RECORDS_CACHE_SIZE),
-        // FIXME OAK-4277: Finalise de-duplication caches: make sizes and depth configurable
-        NodeCache.factory(1000000, 20));
-
+    private WriterCacheManager cacheManager = new WriterCacheManager.Default();
 
     private SegmentWriterBuilder(@Nonnull String name) { this.name = checkNotNull(name); }
 
@@ -131,7 +117,7 @@ public final class SegmentWriterBuilder
 
     @Nonnull
     public SegmentWriterBuilder withoutCache() {
-        this.cacheManager = WriterCacheManager.Empty.create();
+        this.cacheManager = Empty.INSTANCE;
         return this;
     }
 

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java Tue Jun  7 07:32:19 2016
@@ -22,6 +22,8 @@ package org.apache.jackrabbit.oak.segmen
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Suppliers.memoize;
 import static com.google.common.collect.Maps.newConcurrentMap;
+import static java.lang.Integer.getInteger;
+import static org.apache.jackrabbit.oak.segment.RecordCache.newRecordCache;
 
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
@@ -35,28 +37,30 @@ import com.google.common.base.Supplier;
 // implement configuration, monitoring and management
 // add unit tests
 // document, nullability
-public interface WriterCacheManager {
+public abstract class WriterCacheManager {
+    private static final int DEFAULT_STRING_CACHE_SIZE = getInteger(
+            "oak.tar.stringsCacheSize", 15000);
 
-    @Nonnull
-    RecordCache<String> getStringCache(int generation);
+    private static final int DEFAULT_TEMPLATE_CACHE_SIZE = getInteger(
+            "oak.tar.templatesCacheSize", 3000);
 
     @Nonnull
-    RecordCache<Template> getTemplateCache(int generation);
+    public abstract RecordCache<String> getStringCache(int generation);
 
     @Nonnull
-    NodeCache getNodeCache(int generation);
+    public abstract RecordCache<Template> getTemplateCache(int generation);
 
-    void evictCaches(Predicate<Integer> generations);
+    @Nonnull
+    public abstract NodeCache getNodeCache(int generation);
 
-    class Empty implements WriterCacheManager {
-        private static final WriterCacheManager EMPTY = new Empty();
-        private final RecordCache<String> stringCache = RecordCache.<String>empty().get();
-        private final RecordCache<Template> templateCache = RecordCache.<Template>empty().get();
-        private final NodeCache nodeCache = NodeCache.empty().get();
+    public static class Empty extends WriterCacheManager {
+        public static final WriterCacheManager INSTANCE = new Empty();
 
-        public static WriterCacheManager create() { return EMPTY; }
+        private final RecordCache<String> stringCache = newRecordCache(0);
+        private final RecordCache<Template> templateCache = newRecordCache(0);
+        private final NodeCache nodeCache = NodeCache.newNodeCache(0, 0);
 
-        private Empty(){}
+        private Empty() {}
 
         @Override
         public RecordCache<String> getStringCache(int generation) {
@@ -72,12 +76,9 @@ public interface WriterCacheManager {
         public NodeCache getNodeCache(int generation) {
             return nodeCache;
         }
-
-        @Override
-        public void evictCaches(Predicate<Integer> generations) { }
     }
 
-    class Default implements WriterCacheManager {
+    public static class Default extends WriterCacheManager {
         /**
          * Cache of recently stored string records, used to avoid storing duplicates
          * of frequently occurring data.
@@ -96,14 +97,7 @@ public interface WriterCacheManager {
          */
         private final Generation<NodeCache> nodeCaches;
 
-        public static WriterCacheManager create(
-                @Nonnull Supplier<RecordCache<String>> stringCacheFactory,
-                @Nonnull Supplier<RecordCache<Template>> templateCacheFactory,
-                @Nonnull Supplier<NodeCache> nodeCacheFactory) {
-            return new Default(stringCacheFactory, templateCacheFactory, nodeCacheFactory);
-        }
-
-        private Default(
+        public Default(
                 @Nonnull Supplier<RecordCache<String>> stringCacheFactory,
                 @Nonnull Supplier<RecordCache<Template>> templateCacheFactory,
                 @Nonnull Supplier<NodeCache> nodeCacheFactory) {
@@ -112,6 +106,12 @@ public interface WriterCacheManager {
             this.nodeCaches = new Generation<>(nodeCacheFactory);
         }
 
+        public Default() {
+            this(RecordCache.<String>factory(DEFAULT_STRING_CACHE_SIZE),
+                 RecordCache.<Template>factory(DEFAULT_TEMPLATE_CACHE_SIZE),
+                 NodeCache.factory(1000000, 20));
+        }
+
         private static class Generation<T> {
             private final ConcurrentMap<Integer, Supplier<T>> generations = newConcurrentMap();
             private final Supplier<T> cacheFactory;
@@ -156,8 +156,7 @@ public interface WriterCacheManager {
             return nodeCaches.getGeneration(generation);
         }
 
-        @Override
-        public void evictCaches(Predicate<Integer> generations) {
+        protected void evictCaches(Predicate<Integer> generations) {
             stringCaches.evictGenerations(generations);
             templateCaches.evictGenerations(generations);
             nodeCaches.evictGenerations(generations);

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Tue Jun  7 07:32:19 2016
@@ -39,6 +39,8 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
 import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
 import static org.apache.jackrabbit.oak.segment.SegmentWriterBuilder.segmentWriterBuilder;
+import static org.apache.jackrabbit.oak.segment.file.GCListener.Status.FAILURE;
+import static org.apache.jackrabbit.oak.segment.file.GCListener.Status.SUCCESS;
 import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
 
 import java.io.Closeable;
@@ -91,10 +93,9 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentVersion;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.WriterCacheManager;
+import org.apache.jackrabbit.oak.segment.WriterCacheManager.Empty;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -194,9 +195,9 @@ public class FileStore implements Segmen
     private final SegmentVersion version;
 
     /**
-     * {@code GCMonitor} monitoring this instance's gc progress
+     * {@code GcListener} listening to this instance's gc progress
      */
-    private final GCMonitor gcMonitor;
+    private final GCListener gcListener;
 
     /**
      * Represents the approximate size on disk of the repository.
@@ -259,11 +260,12 @@ public class FileStore implements Segmen
                 .with(version)
                 .withGeneration(getGeneration)
                 .withWriterPool()
+                .with(builder.getCacheManager())
                 .build(this);
         this.directory = builder.getDirectory();
         this.maxFileSize = builder.getMaxFileSize() * MB;
         this.memoryMapping = builder.getMemoryMapping();
-        this.gcMonitor = builder.getGcMonitor();
+        this.gcListener = builder.getGcListener();
         this.gcOptions = builder.getGcOptions();
 
         Map<Integer, Map<Character, File>> map = collectFiles(directory);
@@ -374,7 +376,7 @@ public class FileStore implements Segmen
     }
 
     public void maybeCompact(boolean cleanup) throws IOException {
-        gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
+        gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
 
         Runtime runtime = Runtime.getRuntime();
         long avail = runtime.totalMemory() - runtime.freeMemory();
@@ -383,7 +385,7 @@ public class FileStore implements Segmen
         long delta = 0;
         long needed = delta * gcOptions.getMemoryThreshold();
         if (needed >= avail) {
-            gcMonitor.skipped(
+            gcListener.skipped(
                     "TarMK GC #{}: not enough available memory {} ({} bytes), needed {} ({} bytes)," +
                     " last merge delta {} ({} bytes), so skipping compaction for now",
                     GC_COUNT,
@@ -400,22 +402,22 @@ public class FileStore implements Segmen
         int gainThreshold = gcOptions.getGainThreshold();
         boolean sufficientEstimatedGain = true;
         if (gainThreshold <= 0) {
-            gcMonitor.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
+            gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
                     GC_COUNT, gainThreshold);
         } else if (gcOptions.isPaused()) {
-            gcMonitor.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
+            gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
         } else {
-            gcMonitor.info("TarMK GC #{}: estimation started", GC_COUNT);
+            gcListener.info("TarMK GC #{}: estimation started", GC_COUNT);
             Supplier<Boolean> shutdown = newShutdownSignal();
             CompactionGainEstimate estimate = estimateCompactionGain(shutdown);
             if (shutdown.get()) {
-                gcMonitor.info("TarMK GC #{}: estimation interrupted. Skipping compaction.", GC_COUNT);
+                gcListener.info("TarMK GC #{}: estimation interrupted. Skipping compaction.", GC_COUNT);
             }
 
             long gain = estimate.estimateCompactionGain();
             sufficientEstimatedGain = gain >= gainThreshold;
             if (sufficientEstimatedGain) {
-                gcMonitor.info(
+                gcListener.info(
                     "TarMK GC #{}: estimation completed in {} ({} ms). " +
                     "Gain is {}% or {}/{} ({}/{} bytes), so running compaction",
                         GC_COUNT, watch, watch.elapsed(MILLISECONDS), gain,
@@ -423,12 +425,12 @@ public class FileStore implements Segmen
                         estimate.getReachableSize(), estimate.getTotalSize());
             } else {
                 if (estimate.getTotalSize() == 0) {
-                    gcMonitor.skipped(
+                    gcListener.skipped(
                             "TarMK GC #{}: estimation completed in {} ({} ms). " +
                             "Skipping compaction for now as repository consists of a single tar file only",
                             GC_COUNT, watch, watch.elapsed(MILLISECONDS));
                 } else {
-                    gcMonitor.skipped(
+                    gcListener.skipped(
                         "TarMK GC #{}: estimation completed in {} ({} ms). " +
                         "Gain is {}% or {}/{} ({}/{} bytes), so skipping compaction for now",
                             GC_COUNT, watch, watch.elapsed(MILLISECONDS), gain,
@@ -444,7 +446,7 @@ public class FileStore implements Segmen
                     cleanupNeeded.set(cleanup);
                 }
             } else {
-                gcMonitor.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
+                gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
             }
         }
     }
@@ -652,7 +654,7 @@ public class FileStore implements Segmen
 
         fileStoreLock.writeLock().lock();
         try {
-            gcMonitor.info("TarMK GC #{}: cleanup started. Current repository size is {} ({} bytes)",
+            gcListener.info("TarMK GC #{}: cleanup started. Current repository size is {} ({} bytes)",
                     GC_COUNT, humanReadableByteCount(initialSize), initialSize);
 
             newWriter();
@@ -681,14 +683,14 @@ public class FileStore implements Segmen
             log.info("{}: size of bulk references/reclaim set {}/{}",
                     reader, bulkRefs.size(), reclaim.size());
             if (shutdown) {
-                gcMonitor.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+                gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
                 break;
             }
         }
         for (TarReader reader : cleaned.keySet()) {
             cleaned.put(reader, reader.sweep(reclaim));
             if (shutdown) {
-                gcMonitor.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+                gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
                 break;
             }
         }
@@ -723,7 +725,7 @@ public class FileStore implements Segmen
         for (TarReader oldReader : oldReaders) {
             closeAndLogOnFail(oldReader);
             File file = oldReader.getFile();
-            gcMonitor.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
+            gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
             toRemove.addLast(file);
         }
 
@@ -731,8 +733,8 @@ public class FileStore implements Segmen
         approximateSize.set(finalSize);
         stats.reclaimed(initialSize - finalSize);
         // FIXME OAK-4106: Reclaimed size reported by FileStore.cleanup is off
-        gcMonitor.cleaned(initialSize - finalSize, finalSize);
-        gcMonitor.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
+        gcListener.cleaned(initialSize - finalSize, finalSize);
+        gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
                 " and space reclaimed {} ({} bytes).",
                 GC_COUNT, watch, watch.elapsed(MILLISECONDS),
                 humanReadableByteCount(finalSize), finalSize,
@@ -833,7 +835,7 @@ public class FileStore implements Segmen
      * @return {@code true} if compaction succeeded, {@code false} otherwise.
      */
     public boolean compact() throws IOException {
-        gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
+        gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
         Stopwatch watch = Stopwatch.createStarted();
 
         SegmentNodeState before = segmentReader.readHeadState();
@@ -842,7 +844,7 @@ public class FileStore implements Segmen
         if (existing > 1) {
             // FIXME OAK-4371: Overly zealous warning about checkpoints on compaction
             // Make the number of checkpoints configurable above which the warning should be issued?
-            gcMonitor.warn(
+            gcListener.warn(
                     "TarMK GC #{}: compaction found {} checkpoints, you might need to run checkpoint cleanup",
                     GC_COUNT, existing);
         }
@@ -853,11 +855,11 @@ public class FileStore implements Segmen
         Supplier<Boolean> cancel = newCancelCompactionCondition();
         SegmentNodeState after = compact(bufferWriter, before, cancel);
         if (after == null) {
-            gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+            gcListener.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
             return false;
         }
 
-        gcMonitor.info("TarMK GC #{}: compacted {} to {}",
+        gcListener.info("TarMK GC #{}: compacted {} to {}",
                 GC_COUNT, before.getRecordId(), after.getRecordId());
 
         try {
@@ -868,27 +870,27 @@ public class FileStore implements Segmen
                 // Some other concurrent changes have been made.
                 // Rebase (and compact) those changes on top of the
                 // compacted state before retrying to set the head.
-                gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
+                gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
                     "Compacting these commits. Cycle {}", GC_COUNT, cycles);
                 SegmentNodeState head = segmentReader.readHeadState();
                 after = compact(bufferWriter, head, cancel);
                 if (after == null) {
-                    gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+                    gcListener.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
                     return false;
                 }
 
-                gcMonitor.info("TarMK GC #{}: compacted {} against {} to {}",
+                gcListener.info("TarMK GC #{}: compacted {} against {} to {}",
                         GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
                 before = head;
             }
             if (!success) {
-                gcMonitor.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
+                gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
                         GC_COUNT, cycles - 1);
                 if (gcOptions.getForceAfterFail()) {
-                    gcMonitor.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
+                    gcListener.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
                     success = forceCompact(bufferWriter, cancel);
                     if (!success) {
-                        gcMonitor.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
+                        gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
                             "Most likely compaction didn't get exclusive access to the store or was " +
                             "prematurely cancelled. Cleaning up.",
                             GC_COUNT);
@@ -903,42 +905,27 @@ public class FileStore implements Segmen
             }
 
             if (success) {
-                segmentWriter.evictCaches(new Predicate<Integer>() {
-                    @Override
-                    public boolean apply(Integer generation) {
-                        return generation < newGeneration;
-                    }
-                });
-
                 // FIXME OAK-4285: Align cleanup of segment id tables with the new cleanup strategy
                 // ith clean brutal we need to remove those ids that have been cleaned
                 // i.e. those whose segment was from an old generation
                 tracker.clearSegmentIdTables(Predicates.<SegmentId>alwaysFalse());
 
-                // FIXME OAK-4283: Align GCMonitor API with implementation
-                // Refactor GCMonitor: there is no more compaction map stats
-                gcMonitor.compacted(new long[]{}, new long[]{}, new long[]{});
-
-                gcMonitor.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
+                gcListener.compacted(SUCCESS, newGeneration);
+                gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
                         GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
                 return true;
             } else {
-                segmentWriter.evictCaches(new Predicate<Integer>() {
-                    @Override
-                    public boolean apply(Integer generation) {
-                        return generation == newGeneration;
-                    }
-                });
-                gcMonitor.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
+                gcListener.compacted(FAILURE, newGeneration);
+                gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
                         GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
                 return false;
             }
         } catch (InterruptedException e) {
-            gcMonitor.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
+            gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
             currentThread().interrupt();
             return false;
         } catch (Exception e) {
-            gcMonitor.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
+            gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
             return false;
         }
     }
@@ -948,7 +935,7 @@ public class FileStore implements Segmen
     throws IOException {
         if (gcOptions.isOffline()) {
             SegmentWriter writer = new SegmentWriter(this, segmentReader,
-                    blobStore, tracker, WriterCacheManager.Empty.create(), bufferWriter);
+                    blobStore, tracker, Empty.INSTANCE, bufferWriter);
             return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions)
                     .compact(EMPTY_NODE, head, EMPTY_NODE);
         } else {
@@ -968,13 +955,13 @@ public class FileStore implements Segmen
                         SegmentNodeState after = compact(bufferWriter,
                                 segmentReader.readNode(base), cancel);
                         if (after == null) {
-                            gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+                            gcListener.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
                             return null;
                         } else {
                             return after.getRecordId();
                         }
                     } catch (IOException e) {
-                        gcMonitor.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
+                        gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
                         return null;
                     }
                 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java?rev=1747160&r1=1747159&r2=1747160&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java Tue Jun  7 07:32:19 2016
@@ -31,11 +31,13 @@ import java.io.IOException;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.segment.SegmentVersion;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.WriterCacheManager;
 import org.apache.jackrabbit.oak.segment.compaction.LoggingGCMonitor;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore;
@@ -78,6 +80,66 @@ public class FileStoreBuilder {
     @Nonnull
     private SegmentGCOptions gcOptions = SegmentGCOptions.DEFAULT;
 
+    @Nonnull
+    private GCListener gcListener;
+
+    @Nonnull
+    private final WriterCacheManager cacheManager = new WriterCacheManager.Default() {{
+        gcListener = new GCListener() {
+            @Override
+            public void info(String message, Object... arguments) {
+                gcMonitor.info(message, arguments);
+            }
+
+            @Override
+            public void warn(String message, Object... arguments) {
+                gcMonitor.warn(message, arguments);
+            }
+
+            @Override
+            public void error(String message, Exception exception) {
+                gcMonitor.error(message, exception);
+            }
+
+            @Override
+            public void skipped(String reason, Object... arguments) {
+                gcMonitor.skipped(reason, arguments);
+            }
+
+            @Override
+            public void compacted(long[] segmentCounts, long[] recordCounts, long[] compactionMapWeights) {
+                gcMonitor.compacted(segmentCounts, recordCounts, compactionMapWeights);
+            }
+
+            @Override
+            public void cleaned(long reclaimedSize, long currentSize) {
+                gcMonitor.cleaned(reclaimedSize, currentSize);
+            }
+
+            @Override
+            public void compacted(@Nonnull Status status, final int newGeneration) {
+                switch (status) {
+                    case SUCCESS:
+                        evictCaches(new Predicate<Integer>() {
+                            @Override
+                            public boolean apply(Integer generation) {
+                                return generation < newGeneration;
+                            }
+                        });
+                        break;
+                    case FAILURE:
+                        evictCaches(new Predicate<Integer>() {
+                            @Override
+                            public boolean apply(Integer generation) {
+                                return generation == newGeneration;
+                            }
+                        });
+                        break;
+                }
+            }
+        };
+    }};
+
     @CheckForNull
     private TarRevisions revisions;
 
@@ -277,8 +339,8 @@ public class FileStoreBuilder {
     }
 
     @Nonnull
-    DelegatingGCMonitor getGcMonitor() {
-        return gcMonitor;
+    GCListener getGcListener() {
+        return gcListener;
     }
 
     @Nonnull
@@ -301,4 +363,9 @@ public class FileStoreBuilder {
         checkState(revisions != null, "File store not yet built");
         return revisions;
     }
+
+    @Nonnull
+    WriterCacheManager getCacheManager() {
+        return cacheManager;
+    }
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java?rev=1747160&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCListener.java Tue Jun  7 07:32:19 2016
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+
+// FIXME OAK-4283: Align GCMonitor API with implementation: Unify with GCMonitor
+public interface GCListener extends GCMonitor {
+    enum Status {SUCCESS, FAILURE}
+    void compacted(@Nonnull Status status, int generation);
+}