You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/05/18 07:53:59 UTC
svn commit: r1744356 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/
main/java/org/apache/jackrabbit/oak/segment/file/
test/java/org/apache/jackrabbit/oak/segment/
Author: mduerig
Date: Wed May 18 07:53:59 2016
New Revision: 1744356
URL: http://svn.apache.org/viewvc?rev=1744356&view=rev
Log:
OAK-4277: Finalise de-duplication caches
- Move individual caches to top level to make configuration, management, monitoring and testing easier
- Introduce manager for deduplication caches
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java?rev=1744356&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/NodeCache.java Wed May 18 07:53:59 2016
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// FIXME OAK-4277: Finalise de-duplication caches
+// implement configuration, monitoring and management
+// add unit tests
+// document, nullability
+public class NodeCache {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
+
+ private final int capacity;
+ private final List<Map<String, RecordId>> nodes;
+
+ private int size;
+
+ private final Set<Integer> muteDepths = newHashSet();
+
+ public static final Supplier<NodeCache> factory(final int capacity, final int maxDepth) {
+ return new Supplier<NodeCache>() {
+ @Override
+ public NodeCache get() {
+ return new NodeCache(capacity, maxDepth);
+ }
+ };
+ }
+
+ public static final Supplier<NodeCache> empty() {
+ return new Supplier<NodeCache>() {
+ @Override
+ public NodeCache get() {
+ return new NodeCache(0, 0) {
+ @Override
+ public synchronized void put(String key, RecordId value, int depth) { }
+
+ @Override
+ public synchronized RecordId get(String key) { return null; }
+ };
+ }
+ };
+ }
+
+ public NodeCache(int capacity, int maxDepth) {
+ checkArgument(capacity > 0);
+ checkArgument(maxDepth > 0);
+ this.capacity = capacity;
+ this.nodes = newArrayList();
+ for (int k = 0; k < maxDepth; k++) {
+ nodes.add(new HashMap<String, RecordId>());
+ }
+ }
+
+ public synchronized void put(String key, RecordId value, int depth) {
+ // FIXME OAK-4277: Finalise de-duplication caches
+ // Validate and optimise the eviction strategy.
+ // Nodes with many children should probably get a boost to
+ // protecting them from preemptive eviction. Also it might be
+ // necessary to implement pinning (e.g. for checkpoints).
+ while (size >= capacity) {
+ int d = nodes.size() - 1;
+ int removed = nodes.remove(d).size();
+ size -= removed;
+ if (removed > 0) {
+ // FIXME OAK-4165: Too verbose logging during revision gc
+ LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
+ "New size is {}", d, size + removed, capacity, size);
+ }
+ }
+
+ if (depth < nodes.size()) {
+ if (nodes.get(depth).put(key, value) == null) {
+ size++;
+ }
+ } else {
+ if (muteDepths.add(depth)) {
+ LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
+ key, value, depth, nodes.size());
+ }
+ }
+ }
+
+ public synchronized RecordId get(String key) {
+ for (Map<String, RecordId> map : nodes) {
+ if (!map.isEmpty()) {
+ RecordId recordId = map.get(key);
+ if (recordId != null) {
+ return recordId;
+ }
+ }
+ }
+ return null;
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java Wed May 18 07:53:59 2016
@@ -19,200 +19,56 @@
package org.apache.jackrabbit.oak.segment;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Suppliers.memoize;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Maps.newConcurrentMap;
-import static com.google.common.collect.Sets.newHashSet;
-
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
// FIXME OAK-4277: Finalise de-duplication caches
-// implement monitoring for this cache
+// implement configuration, monitoring and management
// add unit tests
+// document, nullability
public class RecordCache<T> {
- private static final Logger LOG = LoggerFactory.getLogger(RecordCache.class);
- // FIXME OAK-4277: Finalise de-duplication caches
- // make this configurable
-
- private final ConcurrentMap<Integer, Supplier<Cache<T>>> generations = newConcurrentMap();
-
- public abstract static class Cache<T> {
- public static <T> Cache<T> disabled() {
- return new Cache<T>() {
- @Override void put(T key, RecordId value) { }
- @Override void put(T key, RecordId value, int cost) { }
- @Override RecordId get(T key) { return null; }
- @Override void clear() { }
- };
- }
- abstract void put(T key, RecordId value);
- abstract void put(T key, RecordId value, int cost);
- abstract RecordId get(T key);
- abstract void clear();
- }
+ private final Map<T, RecordId> records;
- public static <T> RecordCache<T> disabled() {
- return new RecordCache<T>() {
- @Override public Cache<T> generation(int generation) { return Cache.disabled(); }
- @Override public void clear(int generation) { }
- @Override public void clear() { }
+ public static final <T> Supplier<RecordCache<T>> factory(final int size) {
+ return new Supplier<RecordCache<T>>() {
+ @Override
+ public RecordCache<T> get() {
+ return new RecordCache<>(size);
+ }
};
}
- protected Cache<T> getCache(int generation) {
- return Cache.disabled();
- }
-
- public Cache<T> generation(final int generation) {
- // Preemptive check to limit the number of wasted Supplier instances
- if (!generations.containsKey(generation)) {
- generations.putIfAbsent(generation, memoize(new Supplier<Cache<T>>() {
- @Override
- public Cache<T> get() {
- return getCache(generation);
- }
- }));
- }
- return generations.get(generation).get();
- }
-
- public void put(Cache<T> cache, int generation) {
- generations.put(generation, Suppliers.ofInstance(cache));
- }
-
- public void clearUpTo(int maxGen) {
- Iterator<Integer> it = generations.keySet().iterator();
- while (it.hasNext()) {
- Integer gen = it.next();
- if (gen <= maxGen) {
- it.remove();
+ public static final <T> Supplier<RecordCache<T>> empty() {
+ return new Supplier<RecordCache<T>>() {
+ @Override
+ public RecordCache<T> get() {
+ return new RecordCache<T>(0) {
+ @Override
+ public synchronized void put(T key, RecordId value) { }
+
+ @Override
+ public synchronized RecordId get(T key) { return null; }
+ };
}
- }
- }
-
- public void clear(int generation) {
- generations.remove(generation);
+ };
}
- public void clear() {
- generations.clear();
+ public RecordCache(final int size) {
+ records = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
+ return size() >= size;
+ }
+ };
}
- public static final class LRUCache<T> extends Cache<T> {
- private final Map<T, RecordId> map;
-
- public LRUCache(final int size) {
- map = new LinkedHashMap<T, RecordId>(size * 4 / 3, 0.75f, true) {
- @Override
- protected boolean removeEldestEntry(Map.Entry<T, RecordId> eldest) {
- return size() >= size;
- }
- };
- }
-
- @Override
- public synchronized void put(T key, RecordId value) {
- map.put(key, value);
- }
-
- @Override
- public void put(T key, RecordId value, int cost) {
- throw new UnsupportedOperationException("Cannot put with a cost");
- }
-
- @Override
- public synchronized RecordId get(T key) {
- return map.get(key);
- }
-
- @Override
- public synchronized void clear() {
- map.clear();
- }
+ public synchronized void put(T key, RecordId value) {
+ records.put(key, value);
}
- public static final class DeduplicationCache<T> extends Cache<T> {
- private final int capacity;
- private final List<Map<T, RecordId>> maps;
-
- private int size;
-
- private final Set<Integer> muteDepths = newHashSet();
-
- public DeduplicationCache(int capacity, int maxDepth) {
- checkArgument(capacity > 0);
- checkArgument(maxDepth > 0);
- this.capacity = capacity;
- this.maps = newArrayList();
- for (int k = 0; k < maxDepth; k++) {
- maps.add(new HashMap<T, RecordId>());
- }
- }
-
- @Override
- public void put(T key, RecordId value) {
- throw new UnsupportedOperationException("Cannot put without a cost");
- }
-
- @Override
- public synchronized void put(T key, RecordId value, int cost) {
- // FIXME OAK-4277: Finalise de-duplication caches
- // Validate and optimise the eviction strategy.
- // Nodes with many children should probably get a boost to
- // protecting them from preemptive eviction. Also it might be
- // necessary to implement pinning (e.g. for checkpoints).
- while (size >= capacity) {
- int d = maps.size() - 1;
- int removed = maps.remove(d).size();
- size -= removed;
- if (removed > 0) {
- // FIXME OAK-4165: Too verbose logging during revision gc
- LOG.info("Evicted cache at depth {} as size {} reached capacity {}. " +
- "New size is {}", d, size + removed, capacity, size);
- }
- }
-
- if (cost < maps.size()) {
- if (maps.get(cost).put(key, value) == null) {
- size++;
- }
- } else {
- if (muteDepths.add(cost)) {
- LOG.info("Not caching {} -> {} as depth {} reaches or exceeds the maximum of {}",
- key, value, cost, maps.size());
- }
- }
- }
-
- @Override
- public synchronized RecordId get(T key) {
- for (Map<T, RecordId> map : maps) {
- if (!map.isEmpty()) {
- RecordId recordId = map.get(key);
- if (recordId != null) {
- return recordId;
- }
- }
- }
- return null;
- }
-
- @Override
- public synchronized void clear() {
- maps.clear();
- }
-
+ public synchronized RecordId get(T key) {
+ return records.get(key);
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed May 18 07:53:59 2016
@@ -56,6 +56,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.jcr.PropertyType;
+import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.io.Closeables;
@@ -63,7 +64,6 @@ import org.apache.jackrabbit.oak.api.Blo
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
-import org.apache.jackrabbit.oak.segment.RecordCache.Cache;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
@@ -86,47 +86,7 @@ public class SegmentWriter {
static final int BLOCK_SIZE = 1 << 12; // 4kB
- private static final int STRING_RECORDS_CACHE_SIZE = Integer.getInteger(
- "oak.segment.writer.stringsCacheSize", 15000);
-
- /**
- * Cache of recently stored string records, used to avoid storing duplicates
- * of frequently occurring data.
- */
- private final RecordCache<String> stringCache =
- STRING_RECORDS_CACHE_SIZE <= 0
- ? RecordCache.<String>disabled()
- : new RecordCache<String>() {
- @Override
- protected Cache<String> getCache(int generation) {
- return new LRUCache<String>(STRING_RECORDS_CACHE_SIZE);
- }
- };
-
- private static final int TPL_RECORDS_CACHE_SIZE = Integer.getInteger(
- "oak.segment.writer.templatesCacheSize", 3000);
-
- /**
- * Cache of recently stored template records, used to avoid storing
- * duplicates of frequently occurring data.
- */
- private final RecordCache<Template> templateCache =
- TPL_RECORDS_CACHE_SIZE <= 0
- ? RecordCache.<Template>disabled()
- : new RecordCache<Template>() {
- @Override
- protected Cache<Template> getCache(int generation) {
- return new LRUCache<Template>(TPL_RECORDS_CACHE_SIZE);
- }
- };
-
- private final RecordCache<String> nodeCache;
-
- // FIXME OAK-4277: Finalise de-duplication caches
- // Do we need a deduplication cache also for binaries?
- // Probably/preferably not as long binaries are already de-duplicated
- // by rewriting its list of block ids and because we should recommend
- // using a data store for big binaries.
+ private final WriterCacheManager cacheManager;
private final SegmentStore store;
@@ -144,38 +104,19 @@ public class SegmentWriter {
* @param store store to write to
* @param version segment version to write
* @param writeOperationHandler handler for write operations.
- * @param nodeCache de-duplication cache for nodes
*/
- public SegmentWriter(SegmentStore store, SegmentVersion version, WriteOperationHandler writeOperationHandler,
- RecordCache<String> nodeCache) {
+ public SegmentWriter(SegmentStore store, SegmentVersion version,
+ WriteOperationHandler writeOperationHandler) {
this.store = store;
this.version = version;
this.writeOperationHandler = writeOperationHandler;
- this.nodeCache = nodeCache;
- }
-
- /**
- * Create a new instance of a {@code SegmentWriter}. Note the thread safety properties
- * pointed out in the class comment.
- *
- * @param store store to write to
- * @param version segment version to write
- * @param writeOperationHandler handler for write operations.
- */
- public SegmentWriter(SegmentStore store, SegmentVersion version, WriteOperationHandler writeOperationHandler) {
- this(store, version, writeOperationHandler, new RecordCache<String>());
+ this.cacheManager = new WriterCacheManager();
}
// FIXME OAK-4277: Finalise de-duplication caches
- // There should be a cleaner way for adding the cached nodes from the compactor
- public void addCachedNodes(int generation, Cache<String> cache) {
- nodeCache.put(cache, generation);
-
- // FIXME OAK-4277: Finalise de-duplication caches
- // Find a better way to evict the cache from within the cache itself
- stringCache.clearUpTo(generation - 1);
- templateCache.clearUpTo(generation - 1);
- nodeCache.clearUpTo(generation - 1);
+ // There should be a cleaner way to control the deduplication caches across gc generations
+ public void evictCaches(Predicate<Integer> evict) {
+ cacheManager.evictCaches(evict);
}
public void flush() throws IOException {
@@ -290,14 +231,18 @@ public class SegmentWriter {
}
/**
- * Write a node state, unless cancelled
+ * Write a node state, unless cancelled using a dedicated write operation handler
* @param state node state to write
+ * @param writeOperationHandler the write operation handler through which all write calls
+ * induced by by this call are routed.
* @param cancel supplier to signal cancellation of this write operation
* @return segment node state equal to {@code state} or {@code null} if cancelled.
* @throws IOException
*/
@CheckForNull
- public SegmentNodeState writeNode(final NodeState state, Supplier<Boolean> cancel)
+ public SegmentNodeState writeNode(final NodeState state,
+ WriteOperationHandler writeOperationHandler,
+ Supplier<Boolean> cancel)
throws IOException {
try {
return new SegmentNodeState(writeOperationHandler.execute(new SegmentWriteOperation(cancel) {
@@ -331,9 +276,9 @@ public class SegmentWriter {
private final Supplier<Boolean> cancel;
private SegmentBufferWriter writer;
- private Cache<String> stringCache;
- private Cache<Template> templateCache;
- private Cache<String> nodeCache;
+ private RecordCache<String> stringCache;
+ private RecordCache<Template> templateCache;
+ private NodeCache nodeCache;
protected SegmentWriteOperation(Supplier<Boolean> cancel) {
this.cancel = cancel;
@@ -350,9 +295,9 @@ public class SegmentWriter {
checkState(this.writer == null);
this.writer = writer;
int generation = writer.getGeneration();
- this.stringCache = SegmentWriter.this.stringCache.generation(generation);
- this.templateCache = SegmentWriter.this.templateCache.generation(generation);
- this.nodeCache = SegmentWriter.this.nodeCache.generation(generation);
+ this.stringCache = cacheManager.getStringCache(generation);
+ this.templateCache = cacheManager.getTemplateCache(generation);
+ this.nodeCache = cacheManager.getNodeCache(generation);
return this;
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java?rev=1744356&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriterCacheManager.java Wed May 18 07:53:59 2016
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.collect.Maps.newConcurrentMap;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// FIXME OAK-4277: Finalise de-duplication caches
+// implement configuration, monitoring and management
+// add unit tests
+// document, nullability
+public class WriterCacheManager {
+ private static final Logger LOG = LoggerFactory.getLogger(WriterCacheManager.class);
+
+ private static final int STRING_RECORDS_CACHE_SIZE = Integer.getInteger(
+ "oak.segment.writer.stringsCacheSize", 15000);
+
+ /**
+ * Cache of recently stored string records, used to avoid storing duplicates
+ * of frequently occurring data.
+ */
+ private final Generation<RecordCache<String>> stringCaches =
+ new Generation<>(STRING_RECORDS_CACHE_SIZE <= 0
+ ? RecordCache.<String>empty()
+ : RecordCache.<String>factory(STRING_RECORDS_CACHE_SIZE));
+
+ private static final int TPL_RECORDS_CACHE_SIZE = Integer.getInteger(
+ "oak.segment.writer.templatesCacheSize", 3000);
+
+ /**
+ * Cache of recently stored template records, used to avoid storing
+ * duplicates of frequently occurring data.
+ */
+ private final Generation<RecordCache<Template>> templateCaches =
+ new Generation<>(TPL_RECORDS_CACHE_SIZE <= 0
+ ? RecordCache.<Template>empty()
+ : RecordCache.<Template>factory(TPL_RECORDS_CACHE_SIZE));
+
+ private final Generation<NodeCache> nodeCaches =
+ new Generation<>(NodeCache.factory(1000000, 20));
+
+ private static class Generation<T> {
+ private final ConcurrentMap<Integer, Supplier<T>> generations = newConcurrentMap();
+ private final Supplier<T> cacheFactory;
+
+ Generation(Supplier<T> cacheFactory) {
+ this.cacheFactory = cacheFactory;
+ }
+
+ T getGeneration(final int generation) {
+ // Preemptive check to limit the number of wasted (Memoizing)Supplier instances
+ if (!generations.containsKey(generation)) {
+ generations.putIfAbsent(generation, memoize(cacheFactory));
+ }
+ return generations.get(generation).get();
+ }
+
+ void evictGenerations(Predicate<Integer> evict) {
+ Iterator<Integer> it = generations.keySet().iterator();
+ while (it.hasNext()) {
+ if (evict.apply(it.next())) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ // FIXME OAK-4277 replace with GC monitor (improved with notification of failed and interrupted compaction)
+ public void evictCaches(Predicate<Integer> evict) {
+ stringCaches.evictGenerations(evict);
+ templateCaches.evictGenerations(evict);
+ nodeCaches.evictGenerations(evict);
+ }
+
+ public RecordCache<String> getStringCache(int generation) {
+ return stringCaches.getGeneration(generation);
+ }
+
+ public RecordCache<Template> getTemplateCache(int generation) {
+ return templateCaches.getGeneration(generation);
+ }
+
+ public NodeCache getNodeCache(int generation) {
+ return nodeCaches.getGeneration(generation);
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed May 18 07:53:59 2016
@@ -69,8 +69,6 @@ import com.google.common.base.Supplier;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
-import org.apache.jackrabbit.oak.segment.RecordCache;
-import org.apache.jackrabbit.oak.segment.RecordCache.DeduplicationCache;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.Segment;
import org.apache.jackrabbit.oak.segment.SegmentBufferWriter;
@@ -82,7 +80,6 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentVersion;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
@@ -1061,22 +1058,6 @@ public class FileStore implements Segmen
gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
Stopwatch watch = Stopwatch.createStarted();
- // FIXME OAK-4277: Finalise de-duplication caches
- // Make the capacity and initial depth of the deduplication cache configurable
- final DeduplicationCache<String> nodeCache = new DeduplicationCache<String>(1000000, 20);
-
- // FIXME OAK-4279: Rework offline compaction
- // This way of compacting has no progress logging
- final int gcGeneration = tracker.getGcGeneration() + 1;
- SegmentWriter writer = new SegmentWriter(this, version,
- new SegmentBufferWriter(this, version, "c", gcGeneration),
- new RecordCache<String>() {
- @Override
- protected Cache<String> getCache(int generation) {
- return nodeCache;
- }
- });
-
SegmentNodeState before = getHead();
long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
.getChildNodeCount(Long.MAX_VALUE);
@@ -1088,8 +1069,11 @@ public class FileStore implements Segmen
GC_COUNT, existing);
}
+ final int newGeneration = tracker.getGcGeneration() + 1;
+ SegmentBufferWriter bufferWriter = new SegmentBufferWriter(
+ this, version, "c", newGeneration);
Supplier<Boolean> cancel = newCancelCompactionCondition();
- SegmentNodeState after = compact(writer, before, cancel);
+ SegmentNodeState after = compact(bufferWriter, before, cancel);
if (after == null) {
gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
return false;
@@ -1101,15 +1085,14 @@ public class FileStore implements Segmen
try {
int cycles = 0;
boolean success = false;
- while (cycles++ < gcOptions.getRetryCount()
- && !(success = setHead(before, after))) {
+ while (cycles++ < gcOptions.getRetryCount() && !(success = setHead(before, after))) {
// Some other concurrent changes have been made.
// Rebase (and compact) those changes on top of the
// compacted state before retrying to set the head.
gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
"Compacting these commits. Cycle {}", GC_COUNT, cycles);
SegmentNodeState head = getHead();
- after = compact(writer, head, cancel);
+ after = compact(bufferWriter, head, cancel);
if (after == null) {
gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
return false;
@@ -1119,21 +1102,13 @@ public class FileStore implements Segmen
GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
before = head;
}
- if (success) {
- tracker.getWriter().addCachedNodes(gcGeneration, nodeCache);
- // FIXME OAK-4285: Align cleanup of segment id tables with the new cleanup strategy
- // ith clean brutal we need to remove those ids that have been cleaned
- // i.e. those whose segment was from an old generation
- tracker.clearSegmentIdTables(Predicates.<SegmentId>alwaysFalse());
- // FIXME OAK-4283: Align GCMonitor API with implementation
- // Refactor GCMonitor: there is no more compaction map stats
- gcMonitor.compacted(new long[]{}, new long[]{}, new long[]{});
- } else {
+ if (!success) {
gcMonitor.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
GC_COUNT, cycles - 1);
if (gcOptions.getForceAfterFail()) {
gcMonitor.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
- if (!forceCompact(writer, cancel)) {
+ success = forceCompact(bufferWriter, cancel);
+ if (!success) {
gcMonitor.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
"Most likely compaction didn't get exclusive access to the store or was " +
"prematurely cancelled. Cleaning up.",
@@ -1141,17 +1116,44 @@ public class FileStore implements Segmen
cleanup(new Predicate<Integer>() {
@Override
public boolean apply(Integer generation) {
- return generation == gcGeneration;
+ return generation == newGeneration;
}
});
- return false;
}
}
}
- gcMonitor.info("TarMK GC #{}: compaction completed in {} ({} ms), after {} cycles",
- GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
- return true;
+ if (success) {
+ tracker.getWriter().evictCaches(new Predicate<Integer>() {
+ @Override
+ public boolean apply(Integer generation) {
+ return generation < newGeneration;
+ }
+ });
+
+ // FIXME OAK-4285: Align cleanup of segment id tables with the new cleanup strategy
+ // ith clean brutal we need to remove those ids that have been cleaned
+ // i.e. those whose segment was from an old generation
+ tracker.clearSegmentIdTables(Predicates.<SegmentId>alwaysFalse());
+
+ // FIXME OAK-4283: Align GCMonitor API with implementation
+ // Refactor GCMonitor: there is no more compaction map stats
+ gcMonitor.compacted(new long[]{}, new long[]{}, new long[]{});
+
+ gcMonitor.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
+ return true;
+ } else {
+ tracker.getWriter().evictCaches(new Predicate<Integer>() {
+ @Override
+ public boolean apply(Integer generation) {
+ return generation == newGeneration;
+ }
+ });
+ gcMonitor.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles - 1);
+ return false;
+ }
} catch (InterruptedException e) {
gcMonitor.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
currentThread().interrupt();
@@ -1162,22 +1164,22 @@ public class FileStore implements Segmen
}
}
- private static SegmentNodeState compact(SegmentWriter writer, NodeState node,
- Supplier<Boolean> cancel)
+ private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState node,
+ Supplier<Boolean> cancel)
throws IOException {
- SegmentNodeState compacted = writer.writeNode(node, cancel);
+ SegmentNodeState compacted = tracker.getWriter().writeNode(node, bufferWriter, cancel);
if (compacted != null) {
- writer.flush();
+ bufferWriter.flush();
}
return compacted;
}
- private boolean forceCompact(SegmentWriter writer, Supplier<Boolean> cancel)
+ private boolean forceCompact(SegmentBufferWriter bufferWriter, Supplier<Boolean> cancel)
throws InterruptedException, IOException {
if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) {
try {
SegmentNodeState head = getHead();
- SegmentNodeState after = compact(writer, head, cancel);
+ SegmentNodeState after = compact(bufferWriter, head, cancel);
if (after == null) {
gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
return false;
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java?rev=1744356&r1=1744355&r2=1744356&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java Wed May 18 07:53:59 2016
@@ -30,6 +30,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.segment.ListRecord.LEVEL_SIZE;
import static org.apache.jackrabbit.oak.segment.Segment.readString;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -428,7 +429,8 @@ public class RecordTest {
@Test
public void testCancel() throws IOException {
NodeBuilder builder = EMPTY_NODE.builder();
- NodeState state = writer.writeNode(builder.getNodeState(), Suppliers.ofInstance(true));
+ SegmentBufferWriter bufferWriter = new SegmentBufferWriter(store, LATEST_VERSION, "test");
+ NodeState state = writer.writeNode(builder.getNodeState(), bufferWriter, Suppliers.ofInstance(true));
assertNull(state);
}