You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/05/18 07:53:59 UTC

svn commit: r1744356 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/ main/java/org/apache/jackrabbit/oak/segment/file/ test/java/org/apache/jackrabbit/oak/segment/

Author: mduerig
Date: Wed May 18 07:53:59 2016
New Revision: 1744356

URL: http://svn.apache.org/viewvc?rev=1744356&view=rev
Log:
OAK-4277: Finalise de-duplication caches
- Move individual caches to top level to make configuration, management, monitoring and testing easier
- Introduce manager for deduplication caches

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java?rev=1744356&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java Wed May 18 07:53:59 2016
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// FIXME OAK-4277: Finalise de-duplication caches
+// implement configuration, monitoring and management
+// add unit tests
+// document, nullability
+public class NodeCache {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
+
+    private final int capacity;
+    private final List<Map<String, RecordId>> nodes;
+
+    private int size;
+
+    private final Set<Integer> muteDepths = newHashSet();
+
+    public static final Supplier<NodeCache> factory(final int capacity, final int maxDepth) {
+        return new Supplier<NodeCache>() {
+            @Override
+            public NodeCache get() {
+                return new NodeCache(capacity, maxDepth);
+            }
+        };
+    }
+
+    public static final Supplier<NodeCache> empty() {
+        return new Supplier<NodeCache>() {
+            @Override
+            public NodeCache get() {
+                return new NodeCache(0, 0) {
+                    @Override
+                    public synchronized void put(String key, RecordId value, int depth) { }
+
+                    @Override
+                    public synchronized RecordId get(String key) { return null; }
+                };
+            }
+        };
+    }
+
+    public NodeCache(int capacity, int maxDepth) {
+        checkArgument(capacity > 0);
+        checkArgument(maxDepth > 0);
+        this.capacity = capacity;
+        this.nodes = newArrayList();
+        for (int k = 0; k < maxDepth; k++) {
+            nodes.add(new HashMap<String, RecordId>());
+        }
+    }
+
+    public synchronized void put(String key, RecordId value, int depth) {
+        // FIXME OAK-4277: Finalise de-duplication caches
+        // Validate and optimise the eviction strategy.
+        // Nodes with many children should probably get a boost to
+        // protecting them from preemptive eviction. Also it might be
+        // necessary to implement pinning (e.g. for checkpoints).
+        while (size >= capacity) {
+            int d = nodes.size() - 1;
+            int removed = nodes.remove(d).size();
+            size -= removed;
+            if (removed > 0) {
+                // FIXME OAK-4165: Too verbose logging during revision gc
+                LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
+                    "New size is {}", d, size + removed, capacity, size);
+            }
+        }
+
+        if (depth < nodes.size()) {
+            if (nodes.get(depth).put(key, value) == null) {
+                size++;
+            }
+        } else {
+            if (muteDepths.add(depth)) {
+                LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
+                    key, value, depth, nodes.size());
+            }
+        }
+    }
+
+    public synchronized RecordId get(String key) {
+        for (Map<String, RecordId> map : nodes) {
+            if (!map.isEmpty()) {
+                RecordId recordId = map.get(key);
+                if (recordId != null) {
+                    return recordId;
+                }
+            }
+        }
+        return null;
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java Wed May 18 07:53:59 2016
@@ -19,200 +19,56 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Suppliers.memoize;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Maps.newConcurrentMap;
-import static com.google.common.collect.Sets.newHashSet;
-
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 // FIXME OAK-4277: Finalise de-duplication caches
-// implement monitoring for this cache
+// implement configuration, monitoring and management
 // add unit tests
+// document, nullability
 public class RecordCache<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(RecordCache.class);
-    // FIXME OAK-4277: Finalise de-duplication caches
-    // make this configurable
-
-    private final ConcurrentMap<Integer, Supplier<Cache<T>>> generations = newConcurrentMap();
-
-    public abstract static class Cache<T> {
-        public static <T> Cache<T> disabled() {
-            return new Cache<T>() {
-                @Override void put(T key, RecordId value) { }
-                @Override void put(T key, RecordId value, int cost) { }
-                @Override RecordId get(T key) { return null; }
-                @Override void clear() { }
-            };
-        }
-        abstract void put(T key, RecordId value);
-        abstract void put(T key, RecordId value, int cost);
-        abstract RecordId get(T key);
-        abstract void clear();
-    }
+    private final Map<T, RecordId> records;
 
-    public static <T> RecordCache<T> disabled() {
-        return new RecordCache<T>() {
-            @Override public Cache<T> generation(int generation) { return Cache.disabled(); }
-            @Override public void clear(int generation) { }
-            @Override public void clear() { }
+    public static final <T> Supplier<RecordCache<T>> factory(final int size) {
+        return new Supplier<RecordCache<T>>() {
+            @Override
+            public RecordCache<T> get() {
+                return new RecordCache<>(size);
+            }
         };
     }
 
-    protected Cache<T> getCache(int generation) {
-        return Cache.disabled();
-    }
-
-    public Cache<T> generation(final int generation) {
-        // Preemptive check to limit the number of wasted Supplier instances
-        if (!generations.containsKey(generation)) {
-            generations.putIfAbsent(generation, memoize(new Supplier<Cache<T>>() {
-                @Override
-                public Cache<T> get() {
-                    return getCache(generation);
-                }
-            }));
-        }
-        return generations.get(generation).get();
-    }
-
-    public void put(Cache<T> cache, int generation) {
-        generations.put(generation, Suppliers.ofInstance(cache));
-    }
-
-    public void clearUpTo(int maxGen) {
-        Iterator<Integer> it = generations.keySet().iterator();
-        while (it.hasNext()) {
-            Integer gen =  it.next();
-            if (gen <= maxGen) {
-                it.remove();
+    public static final <T> Supplier<RecordCache<T>> empty() {
+        return new Supplier<RecordCache<T>>() {
+            @Override
+            public RecordCache<T> get() {
+                return new RecordCache<T>(0) {
+                    @Override
+                    public synchronized void put(T key, RecordId value) { }
+
+                    @Override
+                    public synchronized RecordId get(T key) { return null; }
+                };
             }
-        }
-    }
-
-    public void clear(int generation) {
-        generations.remove(generation);
+        };
     }
 
-    public void clear() {
-        generations.clear();
+    public RecordCache(final int size) {
+        records = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
+                return size() >= size;
+            }
+        };
     }
 
-    public static final class LRUCache<T> extends Cache<T> {
-        private final Map<T, RecordId> map;
-
-        public LRUCache(final int size) {
-            map = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
-                    return size() >= size;
-                }
-            };
-        }
-
-        @Override
-        public synchronized void put(T key, RecordId value) {
-            map.put(key, value);
-        }
-
-        @Override
-        public void put(T key, RecordId value, int cost) {
-            throw new UnsupportedOperationException("Cannot put with a cost");
-        }
-
-        @Override
-        public synchronized RecordId get(T key) {
-            return map.get(key);
-        }
-
-        @Override
-        public synchronized void clear() {
-            map.clear();
-        }
+    public synchronized void put(T key, RecordId value) {
+        records.put(key, value);
     }
 
-    public static final class DeduplicationCache<T> extends Cache<T> {
-        private final int capacity;
-        private final List<Map<T, RecordId>> maps;
-
-        private int size;
-
-        private final Set<Integer> muteDepths = newHashSet();
-
-        public DeduplicationCache(int capacity, int maxDepth) {
-            checkArgument(capacity > 0);
-            checkArgument(maxDepth > 0);
-            this.capacity = capacity;
-            this.maps = newArrayList();
-            for (int k = 0; k < maxDepth; k++) {
-                maps.add(new HashMap<T, RecordId>());
-            }
-        }
-
-        @Override
-        public void put(T key, RecordId value) {
-            throw new UnsupportedOperationException("Cannot put without a cost");
-        }
-
-        @Override
-        public synchronized void put(T key, RecordId value, int cost) {
-            // FIXME OAK-4277: Finalise de-duplication caches
-            // Validate and optimise the eviction strategy.
-            // Nodes with many children should probably get a boost to
-            // protecting them from preemptive eviction. Also it might be
-            // necessary to implement pinning (e.g. for checkpoints).
-            while (size >= capacity) {
-                int d = maps.size() - 1;
-                int removed = maps.remove(d).size();
-                size -= removed;
-                if (removed > 0) {
-                    // FIXME OAK-4165: Too verbose logging during revision gc
-                    LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
-                        "New size is {}", d, size + removed, capacity, size);
-                }
-            }
-
-            if (cost < maps.size()) {
-                if (maps.get(cost).put(key, value) == null) {
-                    size++;
-                }
-            } else {
-                if (muteDepths.add(cost)) {
-                    LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
-                        key, value, cost, maps.size());
-                }
-            }
-        }
-
-        @Override
-        public synchronized RecordId get(T key) {
-            for (Map<T, RecordId> map : maps) {
-                if (!map.isEmpty()) {
-                    RecordId recordId = map.get(key);
-                    if (recordId != null) {
-                        return recordId;
-                    }
-                }
-            }
-            return null;
-        }
-
-        @Override
-        public synchronized void clear() {
-            maps.clear();
-        }
-
+    public synchronized RecordId get(T key) {
+        return records.get(key);
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed May 18 07:53:59 2016
@@ -56,6 +56,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.jcr.PropertyType;
 
+import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.io.Closeables;
@@ -63,7 +64,6 @@ import org.apache.jackrabbit.oak.api.Blo
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
-import org.apache.jackrabbit.oak.segment.RecordCache.Cache;
 import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
@@ -86,47 +86,7 @@ public class SegmentWriter {
 
     static final int BLOCK_SIZE = 1 << 12; // 4kB
 
-    private static final int STRING_RECORDS_CACHE_SIZE = Integer.getInteger(
-            "oak.segment.writer.stringsCacheSize", 15000);
-
-    /**
-     * Cache of recently stored string records, used to avoid storing duplicates
-     * of frequently occurring data.
-     */
-    private final RecordCache<String> stringCache =
-        STRING_RECORDS_CACHE_SIZE <= 0
-            ? RecordCache.<String>disabled()
-            : new RecordCache<String>() {
-                @Override
-                protected Cache<String> getCache(int generation) {
-                    return new LRUCache<String>(STRING_RECORDS_CACHE_SIZE);
-                }
-        };
-
-    private static final int TPL_RECORDS_CACHE_SIZE = Integer.getInteger(
-            "oak.segment.writer.templatesCacheSize", 3000);
-
-    /**
-     * Cache of recently stored template records, used to avoid storing
-     * duplicates of frequently occurring data.
-     */
-    private final RecordCache<Template> templateCache =
-        TPL_RECORDS_CACHE_SIZE <= 0
-            ? RecordCache.<Template>disabled()
-            : new RecordCache<Template>() {
-            @Override
-            protected Cache<Template> getCache(int generation) {
-                return new LRUCache<Template>(TPL_RECORDS_CACHE_SIZE);
-            }
-        };
-
-    private final RecordCache<String> nodeCache;
-
-    // FIXME OAK-4277: Finalise de-duplication caches
-    // Do we need a deduplication cache also for binaries?
-    // Probably/preferably not as long binaries are already de-duplicated
-    // by rewriting its list of block ids and because we should recommend
-    // using a data store for big binaries.
+    private final WriterCacheManager cacheManager;
 
     private final SegmentStore store;
 
@@ -144,38 +104,19 @@ public class SegmentWriter {
      * @param store      store to write to
      * @param version    segment version to write
      * @param writeOperationHandler  handler for write operations.
-     * @param nodeCache  de-duplication cache for nodes
      */
-    public SegmentWriter(SegmentStore store, SegmentVersion version, WriteOperationHandler writeOperationHandler,
-            RecordCache<String> nodeCache) {
+    public SegmentWriter(SegmentStore store, SegmentVersion version,
+                         WriteOperationHandler writeOperationHandler) {
         this.store = store;
         this.version = version;
         this.writeOperationHandler = writeOperationHandler;
-        this.nodeCache = nodeCache;
-    }
-
-    /**
-     * Create a new instance of a {@code SegmentWriter}. Note the thread safety properties
-     * pointed out in the class comment.
-     *
-     * @param store      store to write to
-     * @param version    segment version to write
-     * @param writeOperationHandler  handler for write operations.
-     */
-    public SegmentWriter(SegmentStore store, SegmentVersion version, WriteOperationHandler writeOperationHandler) {
-        this(store, version, writeOperationHandler, new RecordCache<String>());
+        this.cacheManager = new WriterCacheManager();
     }
 
     // FIXME OAK-4277: Finalise de-duplication caches
-    // There should be a cleaner way for adding the cached nodes from the compactor
-    public void addCachedNodes(int generation, Cache<String> cache) {
-        nodeCache.put(cache, generation);
-
-        // FIXME OAK-4277: Finalise de-duplication caches
-        // Find a better way to evict the cache from within the cache itself
-        stringCache.clearUpTo(generation - 1);
-        templateCache.clearUpTo(generation - 1);
-        nodeCache.clearUpTo(generation - 1);
+    // There should be a cleaner way to control the deduplication caches across gc generations
+    public void evictCaches(Predicate<Integer> evict) {
+        cacheManager.evictCaches(evict);
     }
 
     public void flush() throws IOException {
@@ -290,14 +231,18 @@ public class SegmentWriter {
     }
 
     /**
-     * Write a node state, unless cancelled
+     * Write a node state, unless cancelled using a dedicated write operation handler
      * @param state   node state to write
+     * @param writeOperationHandler  the write operation handler through which all write calls
+     *                               induced by by this call are routed.
      * @param cancel  supplier to signal cancellation of this write operation
      * @return segment node state equal to {@code state} or {@code null} if cancelled.
      * @throws IOException
      */
     @CheckForNull
-    public SegmentNodeState writeNode(final NodeState state, Supplier<Boolean> cancel)
+    public SegmentNodeState writeNode(final NodeState state,
+                                      WriteOperationHandler writeOperationHandler,
+                                      Supplier<Boolean> cancel)
     throws IOException {
         try {
             return new SegmentNodeState(writeOperationHandler.execute(new SegmentWriteOperation(cancel) {
@@ -331,9 +276,9 @@ public class SegmentWriter {
 
         private final Supplier<Boolean> cancel;
         private SegmentBufferWriter writer;
-        private Cache<String> stringCache;
-        private Cache<Template> templateCache;
-        private Cache<String> nodeCache;
+        private RecordCache<String> stringCache;
+        private RecordCache<Template> templateCache;
+        private NodeCache nodeCache;
 
         protected SegmentWriteOperation(Supplier<Boolean> cancel) {
             this.cancel = cancel;
@@ -350,9 +295,9 @@ public class SegmentWriter {
             checkState(this.writer == null);
             this.writer = writer;
             int generation = writer.getGeneration();
-            this.stringCache = SegmentWriter.this.stringCache.generation(generation);
-            this.templateCache = SegmentWriter.this.templateCache.generation(generation);
-            this.nodeCache = SegmentWriter.this.nodeCache.generation(generation);
+            this.stringCache = cacheManager.getStringCache(generation);
+            this.templateCache = cacheManager.getTemplateCache(generation);
+            this.nodeCache = cacheManager.getNodeCache(generation);
             return this;
         }
 

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java?rev=1744356&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java Wed May 18 07:53:59 2016
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.collect.Maps.newConcurrentMap;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// FIXME OAK-4277: Finalise de-duplication caches
+// implement configuration, monitoring and management
+// add unit tests
+// document, nullability
+public class WriterCacheManager {
+    private static final Logger LOG = LoggerFactory.getLogger(WriterCacheManager.class);
+
+    private static final int STRING_RECORDS_CACHE_SIZE = Integer.getInteger(
+            "oak.segment.writer.stringsCacheSize", 15000);
+
+    /**
+     * Cache of recently stored string records, used to avoid storing duplicates
+     * of frequently occurring data.
+     */
+    private final Generation<RecordCache<String>> stringCaches =
+            new Generation<>(STRING_RECORDS_CACHE_SIZE <= 0
+                    ? RecordCache.<String>empty()
+                    : RecordCache.<String>factory(STRING_RECORDS_CACHE_SIZE));
+
+    private static final int TPL_RECORDS_CACHE_SIZE = Integer.getInteger(
+            "oak.segment.writer.templatesCacheSize", 3000);
+
+    /**
+     * Cache of recently stored template records, used to avoid storing
+     * duplicates of frequently occurring data.
+     */
+    private final Generation<RecordCache<Template>> templateCaches =
+            new Generation<>(TPL_RECORDS_CACHE_SIZE <= 0
+                    ? RecordCache.<Template>empty()
+                    : RecordCache.<Template>factory(TPL_RECORDS_CACHE_SIZE));
+
+    private final Generation<NodeCache> nodeCaches =
+            new Generation<>(NodeCache.factory(1000000, 20));
+
+    private static class Generation<T> {
+        private final ConcurrentMap<Integer, Supplier<T>> generations = newConcurrentMap();
+        private final Supplier<T> cacheFactory;
+
+        Generation(Supplier<T> cacheFactory) {
+            this.cacheFactory = cacheFactory;
+        }
+
+        T getGeneration(final int generation) {
+            // Preemptive check to limit the number of wasted (Memoizing)Supplier instances
+            if (!generations.containsKey(generation)) {
+                generations.putIfAbsent(generation, memoize(cacheFactory));
+            }
+            return generations.get(generation).get();
+        }
+
+        void evictGenerations(Predicate<Integer> evict) {
+            Iterator<Integer> it = generations.keySet().iterator();
+            while (it.hasNext()) {
+                if (evict.apply(it.next())) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    // FIXME OAK-4277 replace with GC monitor (improved with notification of failed and interrupted compaction)
+    public void evictCaches(Predicate<Integer> evict) {
+        stringCaches.evictGenerations(evict);
+        templateCaches.evictGenerations(evict);
+        nodeCaches.evictGenerations(evict);
+    }
+
+    public RecordCache<String> getStringCache(int generation) {
+        return stringCaches.getGeneration(generation);
+    }
+
+    public RecordCache<Template> getTemplateCache(int generation) {
+        return templateCaches.getGeneration(generation);
+    }
+
+    public NodeCache getNodeCache(int generation) {
+        return nodeCaches.getGeneration(generation);
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed May 18 07:53:59 2016
@@ -69,8 +69,6 @@ import com.google.common.base.Supplier;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
-import org.apache.jackrabbit.oak.segment.RecordCache;
-import org.apache.jackrabbit.oak.segment.RecordCache.DeduplicationCache;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentBufferWriter;
@@ -82,7 +80,6 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentStore;
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentVersion;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
@@ -1061,22 +1058,6 @@ public class FileStore implements Segmen
         gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
         Stopwatch watch = Stopwatch.createStarted();
 
-        // FIXME OAK-4277: Finalise de-duplication caches
-        // Make the capacity and initial depth of the deduplication cache configurable
-        final DeduplicationCache<String> nodeCache = new DeduplicationCache<String>(1000000, 20);
-
-        // FIXME OAK-4279: Rework offline compaction
-        // This way of compacting has no progress logging
-        final int gcGeneration = tracker.getGcGeneration() + 1;
-        SegmentWriter writer = new SegmentWriter(this, version,
-            new SegmentBufferWriter(this, version, "c", gcGeneration),
-            new RecordCache<String>() {
-                @Override
-                protected Cache<String> getCache(int generation) {
-                    return nodeCache;
-                }
-            });
-
         SegmentNodeState before = getHead();
         long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
                 .getChildNodeCount(Long.MAX_VALUE);
@@ -1088,8 +1069,11 @@ public class FileStore implements Segmen
                     GC_COUNT, existing);
         }
 
+        final int newGeneration = tracker.getGcGeneration() + 1;
+        SegmentBufferWriter bufferWriter = new SegmentBufferWriter(
+                this, version, "c", newGeneration);
         Supplier<Boolean> cancel = newCancelCompactionCondition();
-        SegmentNodeState after = compact(writer, before, cancel);
+        SegmentNodeState after = compact(bufferWriter, before, cancel);
         if (after == null) {
             gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
             return false;
@@ -1101,15 +1085,14 @@ public class FileStore implements Segmen
         try {
             int cycles = 0;
             boolean success = false;
-            while (cycles++ < gcOptions.getRetryCount()
-                && !(success = setHead(before, after))) {
+            while (cycles++ < gcOptions.getRetryCount() && !(success = setHead(before, after))) {
                 // Some other concurrent changes have been made.
                 // Rebase (and compact) those changes on top of the
                 // compacted state before retrying to set the head.
                 gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
                     "Compacting these commits. Cycle {}", GC_COUNT, cycles);
                 SegmentNodeState head = getHead();
-                after = compact(writer, head, cancel);
+                after = compact(bufferWriter, head, cancel);
                 if (after == null) {
                     gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
                     return false;
@@ -1119,21 +1102,13 @@ public class FileStore implements Segmen
                         GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
                 before = head;
             }
-            if (success) {
-                tracker.getWriter().addCachedNodes(gcGeneration, nodeCache);
-                // FIXME OAK-4285: Align cleanup of segment id tables with the new cleanup strategy
-                // ith clean brutal we need to remove those ids that have been cleaned
-                // i.e. those whose segment was from an old generation
-                tracker.clearSegmentIdTables(Predicates.<SegmentId>alwaysFalse());
-                // FIXME OAK-4283: Align GCMonitor API with implementation
-                // Refactor GCMonitor: there is no more compaction map stats
-                gcMonitor.compacted(new long[]{}, new long[]{}, new long[]{});
-            } else {
+            if (!success) {
                 gcMonitor.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
                         GC_COUNT, cycles - 1);
                 if (gcOptions.getForceAfterFail()) {
                     gcMonitor.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
-                    if (!forceCompact(writer, cancel)) {
+                    success = forceCompact(bufferWriter, cancel);
+                    if (!success) {
                         gcMonitor.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
                             "Most likely compaction didn't get exclusive access to the store or was " +
                             "prematurely cancelled. Cleaning up.",
@@ -1141,17 +1116,44 @@ public class FileStore implements Segmen
                         cleanup(new Predicate<Integer>() {
                             @Override
                             public boolean apply(Integer generation) {
-                                return generation == gcGeneration;
+                                return generation == newGeneration;
                             }
                         });
-                        return false;
                     }
                 }
             }
 
-            gcMonitor.info("TarMK GC #{}: compaction completed in {} ({} ms), after {} cycles",
-                    GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
-            return true;
+            if (success) {
+                tracker.getWriter().evictCaches(new Predicate<Integer>() {
+                    @Override
+                    public boolean apply(Integer generation) {
+                        return generation < newGeneration;
+                    }
+                });
+
+                // FIXME OAK-4285: Align cleanup of segment id tables with the new cleanup strategy
+                // ith clean brutal we need to remove those ids that have been cleaned
+                // i.e. those whose segment was from an old generation
+                tracker.clearSegmentIdTables(Predicates.<SegmentId>alwaysFalse());
+
+                // FIXME OAK-4283: Align GCMonitor API with implementation
+                // Refactor GCMonitor: there is no more compaction map stats
+                gcMonitor.compacted(new long[]{}, new long[]{}, new long[]{});
+
+                gcMonitor.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
+                        GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
+                return true;
+            } else {
+                tracker.getWriter().evictCaches(new Predicate<Integer>() {
+                    @Override
+                    public boolean apply(Integer generation) {
+                        return generation == newGeneration;
+                    }
+                });
+                gcMonitor.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
+                        GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
+                return false;
+            }
         } catch (InterruptedException e) {
             gcMonitor.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
             currentThread().interrupt();
@@ -1162,22 +1164,22 @@ public class FileStore implements Segmen
         }
     }
 
-    private static SegmentNodeState compact(SegmentWriter writer, NodeState node,
-                                            Supplier<Boolean> cancel)
+    private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState node,
+                                     Supplier<Boolean> cancel)
     throws IOException {
-        SegmentNodeState compacted = writer.writeNode(node, cancel);
+        SegmentNodeState compacted = tracker.getWriter().writeNode(node, bufferWriter, cancel);
         if (compacted != null) {
-            writer.flush();
+            bufferWriter.flush();
         }
         return compacted;
     }
 
-    private boolean forceCompact(SegmentWriter writer, Supplier<Boolean> cancel)
+    private boolean forceCompact(SegmentBufferWriter bufferWriter, Supplier<Boolean> cancel)
     throws InterruptedException, IOException {
         if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) {
             try {
                 SegmentNodeState head = getHead();
-                SegmentNodeState after = compact(writer, head, cancel);
+                SegmentNodeState after = compact(bufferWriter, head, cancel);
                 if (after == null) {
                     gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
                     return false;

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java Wed May 18 07:53:59 2016
@@ -30,6 +30,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.segment.ListRecord.LEVEL_SIZE;
 import static org.apache.jackrabbit.oak.segment.Segment.readString;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -428,7 +429,8 @@ public class RecordTest {
     @Test
     public void testCancel() throws IOException {
         NodeBuilder builder = EMPTY_NODE.builder();
-        NodeState state = writer.writeNode(builder.getNodeState(), Suppliers.ofInstance(true));
+        SegmentBufferWriter bufferWriter = new SegmentBufferWriter(store, LATEST_VERSION, "test");
+        NodeState state = writer.writeNode(builder.getNodeState(), bufferWriter, Suppliers.ofInstance(true));
         assertNull(state);
     }