You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2016/10/28 08:43:01 UTC
svn commit: r1766970 - in /jackrabbit/oak/branches/1.4/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/
main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/
test/java/org/apache/jackrabbit/oak/p...
Author: tomekr
Date: Fri Oct 28 08:43:01 2016
New Revision: 1766970
URL: http://svn.apache.org/viewvc?rev=1766970&view=rev
Log:
OAK-4882: Bottleneck in the asynchronous persistent cache
Added:
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
Removed:
jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Modified:
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
Added: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java?rev=1766970&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java (added)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java Fri Oct 28 08:43:01 2016
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+
+public class CacheMetadata<K> {
+
+ private final ConcurrentMap<K, MetadataEntry> metadataMap = newConcurrentMap();
+
+ private boolean enabled = true;
+
+ boolean isEnabled() {
+ return enabled;
+ }
+
+ void disable() {
+ this.enabled = false;
+ }
+
+ void put(K key) {
+ if (!enabled) {
+ return;
+ }
+ getOrCreate(key, false);
+ }
+
+ void putFromPersistenceAndIncrement(K key) {
+ if (!enabled) {
+ return;
+ }
+ getOrCreate(key, true).incrementCount();
+ }
+
+ void increment(K key) {
+ if (!enabled) {
+ return;
+ }
+ getOrCreate(key, false).incrementCount();
+ }
+
+ MetadataEntry remove(Object key) {
+ if (!enabled) {
+ return null;
+ }
+ return metadataMap.remove(key);
+ }
+
+ void putAll(Iterable<?> keys) {
+ if (!enabled) {
+ return;
+ }
+ for (Object k : keys) {
+ getOrCreate((K) k, false);
+ }
+ }
+
+ void incrementAll(Iterable<?> keys) {
+ if (!enabled) {
+ return;
+ }
+ for (Object k : keys) {
+ getOrCreate((K) k, false).incrementCount();
+ }
+ }
+
+ void removeAll(Iterable<?> keys) {
+ if (!enabled) {
+ return;
+ }
+ for (Object k : keys) {
+ metadataMap.remove(k);
+ }
+ }
+
+ void clear() {
+ if (!enabled) {
+ return;
+ }
+ metadataMap.clear();
+ }
+
+ private MetadataEntry getOrCreate(K key, boolean readFromPersistentCache) {
+ if (!enabled) {
+ return null;
+ }
+ MetadataEntry metadata = metadataMap.get(key);
+ if (metadata == null) {
+ MetadataEntry newEntry = new MetadataEntry(readFromPersistentCache);
+ MetadataEntry oldEntry = metadataMap.putIfAbsent(key, newEntry);
+ metadata = oldEntry == null ? newEntry : oldEntry;
+ }
+ return metadata;
+ }
+
+
+ static class MetadataEntry {
+
+ private final AtomicLong accessCount = new AtomicLong();
+
+ private final boolean readFromPersistentCache;
+
+ private MetadataEntry(boolean readFromPersistentCache) {
+ this.readFromPersistentCache = readFromPersistentCache;
+ }
+
+ void incrementCount() {
+ accessCount.incrementAndGet();
+ }
+
+ long getAccessCount() {
+ return accessCount.get();
+ }
+
+ boolean isReadFromPersistentCache() {
+ return readFromPersistentCache;
+ }
+ }
+
+}
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java Fri Oct 28 08:43:01 2016
@@ -56,12 +56,6 @@ class NodeCache<K, V> implements Cache<K
private static final Set<RemovalCause> EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
- /**
- * Whether to use the queue to put items into cache. Default: false (cache
- * will be updated synchronously).
- */
- private static final boolean ASYNC_CACHE = Boolean.getBoolean("oak.cache.asynchronous");
-
private final PersistentCache cache;
private final PersistentCacheStats stats;
private final Cache<K, V> memCache;
@@ -69,7 +63,8 @@ class NodeCache<K, V> implements Cache<K
private final CacheType type;
private final DataType keyType;
private final DataType valueType;
- private final CacheWriteQueue<K, V> writerQueue;
+ private final CacheMetadata<K> memCacheMetadata;
+ CacheWriteQueue<K, V> writeQueue;
NodeCache(
PersistentCache cache,
@@ -86,12 +81,14 @@ class NodeCache<K, V> implements Cache<K
map = new MultiGenerationMap<K, V>();
keyType = new KeyDataType(type);
valueType = new ValueDataType(docNodeStore, docStore, type);
- if (ASYNC_CACHE) {
- this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
- LOG.info("The persistent cache writes will be asynchronous");
+ this.memCacheMetadata = new CacheMetadata<K>();
+ if (cache.isAsyncCache()) {
+ this.writeQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
+ LOG.info("The persistent cache {} writes will be asynchronous", type);
} else {
- this.writerQueue = null;
- LOG.info("The persistent cache writes will be synchronous");
+ this.writeQueue = null;
+ this.memCacheMetadata.disable();
+ LOG.info("The persistent cache {} writes will be synchronous", type);
}
this.stats = new PersistentCacheStats(type, statisticsProvider);
}
@@ -121,9 +118,6 @@ class NodeCache<K, V> implements Cache<K
}
private V readIfPresent(K key) {
- if (ASYNC_CACHE && writerQueue.waitsForInvalidation(key)) {
- return null;
- }
cache.switchGenerationIfNeeded();
TimerStats.Context ctx = stats.startReadTimer();
V v = map.get(key);
@@ -169,6 +163,7 @@ class NodeCache<K, V> implements Cache<K
public V getIfPresent(Object key) {
V value = memCache.getIfPresent(key);
if (value != null) {
+ memCacheMetadata.increment((K) key);
return value;
}
stats.markRequest();
@@ -176,6 +171,7 @@ class NodeCache<K, V> implements Cache<K
value = readIfPresent((K) key);
if (value != null) {
memCache.put((K) key, value);
+ memCacheMetadata.putFromPersistenceAndIncrement((K) key);
stats.markHit();
}
return value;
@@ -196,8 +192,9 @@ class NodeCache<K, V> implements Cache<K
TimerStats.Context ctx = stats.startLoaderTimer();
try {
value = memCache.get(key, valueLoader);
+ memCacheMetadata.increment(key);
ctx.stop();
- if (!ASYNC_CACHE) {
+ if (!cache.isAsyncCache()) {
write((K) key, value);
}
broadcast(key, value);
@@ -211,13 +208,16 @@ class NodeCache<K, V> implements Cache<K
@Override
public ImmutableMap<K, V> getAllPresent(
Iterable<?> keys) {
- return memCache.getAllPresent(keys);
+ ImmutableMap<K, V> result = memCache.getAllPresent(keys);
+ memCacheMetadata.incrementAll(keys);
+ return result;
}
@Override
public void put(K key, V value) {
memCache.put(key, value);
- if (!ASYNC_CACHE) {
+ memCacheMetadata.put(key);
+ if (!cache.isAsyncCache()) {
write((K) key, value);
}
broadcast(key, value);
@@ -227,8 +227,9 @@ class NodeCache<K, V> implements Cache<K
@Override
public void invalidate(Object key) {
memCache.invalidate(key);
- if (ASYNC_CACHE) {
- writerQueue.addInvalidate(singleton((K) key));
+ memCacheMetadata.remove(key);
+ if (cache.isAsyncCache()) {
+ writeQueue.addInvalidate(singleton((K) key));
} else {
write((K) key, null);
}
@@ -239,16 +240,19 @@ class NodeCache<K, V> implements Cache<K
@Override
public void putAll(Map<? extends K, ? extends V> m) {
memCache.putAll(m);
+ memCacheMetadata.putAll(m.keySet());
}
@Override
public void invalidateAll(Iterable<?> keys) {
memCache.invalidateAll(keys);
+ memCacheMetadata.removeAll(keys);
}
@Override
public void invalidateAll() {
memCache.invalidateAll();
+ memCacheMetadata.clear();
map.clear();
stats.markInvalidateAll();
}
@@ -271,6 +275,7 @@ class NodeCache<K, V> implements Cache<K
@Override
public void cleanUp() {
memCache.cleanUp();
+ memCacheMetadata.clear();
}
@Override
@@ -281,12 +286,14 @@ class NodeCache<K, V> implements Cache<K
if (buff.get() == 0) {
value = null;
memCache.invalidate(key);
+ memCacheMetadata.remove(key);
} else {
value = (V) valueType.read(buff);
memCache.put(key, value);
+ memCacheMetadata.put(key);
}
stats.markRecvBroadcast();
- if (!ASYNC_CACHE) {
+ if (!cache.isAsyncCache()) {
write(key, value);
}
}
@@ -296,9 +303,11 @@ class NodeCache<K, V> implements Cache<K
*/
@Override
public void evicted(K key, V value, RemovalCause cause) {
- if (ASYNC_CACHE && EVICTION_CAUSES.contains(cause) && value != null) {
+ if (cache.isAsyncCache() && EVICTION_CAUSES.contains(cause) && value != null) {
// invalidations are handled separately
- writerQueue.addPut(key, value);
+ if (qualifiesToPersist(memCacheMetadata.remove(key))) {
+ writeQueue.addPut(key, value);
+ }
long memory = 0L;
memory += (key == null ? 0L: keyType.getMemory(key));
@@ -308,8 +317,11 @@ class NodeCache<K, V> implements Cache<K
}
}
+ private boolean qualifiesToPersist(CacheMetadata.MetadataEntry metadata) {
+ return metadata == null || (!metadata.isReadFromPersistentCache() && metadata.getAccessCount() > 0);
+ }
+
public PersistentCacheStats getPersistentCacheStats() {
return stats;
}
-
}
\ No newline at end of file
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java Fri Oct 28 08:43:01 2016
@@ -53,7 +53,13 @@ import com.google.common.cache.Cache;
public class PersistentCache implements Broadcaster.Listener {
static final Logger LOG = LoggerFactory.getLogger(PersistentCache.class);
-
+
+ /**
+ * Whether to use the queue to put items into cache. Default: false (cache
+ * will be updated synchronously).
+ */
+ private static final boolean ASYNC_CACHE = Boolean.getBoolean("oak.cache.asynchronous");
+
private static final String FILE_PREFIX = "cache-";
private static final String FILE_SUFFIX = ".data";
private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -67,6 +73,7 @@ public class PersistentCache implements
private boolean cacheDocChildren;
private boolean compactOnClose;
private boolean compress = true;
+ private boolean asyncCache = ASYNC_CACHE;
private HashMap<CacheType, GenerationCache> caches =
new HashMap<CacheType, GenerationCache>();
@@ -140,6 +147,10 @@ public class PersistentCache implements
manualCommit = true;
} else if (p.startsWith("broadcast=")) {
broadcast = p.split("=")[1];
+ } else if (p.equals("+async")) {
+ asyncCache = true;
+ } else if (p.equals("-async")) {
+ asyncCache = false;
}
}
this.directory = dir;
@@ -502,7 +513,11 @@ public class PersistentCache implements
public int getExceptionCount() {
return exceptionCount;
}
-
+
+ public boolean isAsyncCache() {
+ return asyncCache;
+ }
+
void broadcast(CacheType type, Function<WriteBuffer, Void> writer) {
Broadcaster b = broadcaster;
if (b == null) {
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java Fri Oct 28 08:43:01 2016
@@ -29,23 +29,4 @@ interface CacheAction<K, V> {
*/
void execute();
- /**
- * Cancel the action without executing it
- */
- void cancel();
-
- /**
- * Return the keys affected by this action
- *
- * @return keys affected by this action
- */
- Iterable<K> getAffectedKeys();
-
- /**
- * Return the owner of this action
- *
- * @return {@link CacheWriteQueue} executing this action
- */
- CacheWriteQueue<K, V> getOwner();
-
}
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java Fri Oct 28 08:43:01 2016
@@ -16,13 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import static com.google.common.collect.Multimaps.index;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -30,13 +23,10 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
/**
- * An asynchronous buffer of the CacheAction objects. The buffer removes
- * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than
- * {@link #MAX_SIZE}.
+ * An asynchronous buffer of the CacheAction objects. The buffer only accepts
+ * {@link #MAX_SIZE} number of elements. If the queue is already full, the new
+ * elements are dropped.
*/
public class CacheActionDispatcher implements Runnable {
@@ -45,14 +35,9 @@ public class CacheActionDispatcher imple
/**
* What's the length of the queue.
*/
- static final int MAX_SIZE = 1024;
-
- /**
- * How many actions remove once the queue is longer than {@link #MAX_SIZE}.
- */
- static final int ACTIONS_TO_REMOVE = 256;
+ static final int MAX_SIZE = 2048;
- final BlockingQueue<CacheAction<?, ?>> queue = new ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2);
+ final BlockingQueue<CacheAction<?, ?>> queue = new ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE);
private volatile boolean isRunning = true;
@@ -68,7 +53,6 @@ public class CacheActionDispatcher imple
LOG.debug("Interrupted the queue.poll()", e);
}
}
- applyInvalidateActions();
}
/**
@@ -79,91 +63,13 @@ public class CacheActionDispatcher imple
}
/**
- * Adds the new action and cleans the queue if necessary.
+ * Tries to add new action.
*
* @param action to be added
*/
- synchronized void add(CacheAction<?, ?> action) {
- if (queue.size() >= MAX_SIZE) {
- cleanTheQueue();
- }
- queue.offer(action);
- }
-
- /**
- * Clean the queue and add a single invalidate action for all the removed entries.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void cleanTheQueue() {
- List<CacheAction> removed = removeOldest();
- for (Entry<CacheWriteQueue, Collection<CacheAction>> e : groupByOwner(removed).entrySet()) {
- CacheWriteQueue owner = e.getKey();
- Collection<CacheAction> actions = e.getValue();
- List<Object> affectedKeys = cancelAll(actions);
- owner.addInvalidate(affectedKeys);
- }
- }
-
- /**
- * Remove {@link #ACTIONS_TO_REMOVE} oldest actions.
- *
- * @return A list of removed items.
- */
- @SuppressWarnings("rawtypes")
- private List<CacheAction> removeOldest() {
- List<CacheAction> removed = new ArrayList<CacheAction>();
- while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) {
- CacheAction toBeCanceled = queue.poll();
- if (toBeCanceled == null) {
- break;
- } else {
- removed.add(toBeCanceled);
- }
+ void add(CacheAction<?, ?> action) {
+ if (!queue.offer(action)) {
+ LOG.trace("The queue is full, element {} has been rejected", action);
}
- return removed;
}
-
- /**
- * Group passed actions by their owners.
- *
- * @param actions to be grouped
- * @return map in which owner is the key and assigned action list is the value
- */
- @SuppressWarnings("rawtypes")
- private static Map<CacheWriteQueue, Collection<CacheAction>> groupByOwner(List<CacheAction> actions) {
- return index(actions, new Function<CacheAction, CacheWriteQueue>() {
- @Override
- public CacheWriteQueue apply(CacheAction input) {
- return input.getOwner();
- }
- }).asMap();
- }
-
- /**
- * Cancel all passed actions.
- *
- * @param actions to cancel
- * @return list of affected keys
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private static List<Object> cancelAll(Collection<CacheAction> actions) {
- List<Object> cancelledKeys = new ArrayList<Object>();
- for (CacheAction action : actions) {
- action.cancel();
- Iterables.addAll(cancelledKeys, action.getAffectedKeys());
- }
- return cancelledKeys;
- }
-
- @SuppressWarnings("rawtypes")
- private void applyInvalidateActions() {
- CacheAction action;
- do {
- action = queue.poll();
- if (action instanceof InvalidateCacheAction) {
- action.execute();
- }
- } while (action != null);
- }
-
}
\ No newline at end of file
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java Fri Oct 28 08:43:01 2016
@@ -16,21 +16,10 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
+import java.util.Map;
-/**
- * A fronted for the {@link CacheActionDispatcher} creating actions and maintaining their state.
- *
- * @param <K> key type
- * @param <V> value type
- */
public class CacheWriteQueue<K, V> {
private final CacheActionDispatcher dispatcher;
@@ -39,65 +28,18 @@ public class CacheWriteQueue<K, V> {
private final Map<K, V> map;
- final Multiset<K> queuedKeys = HashMultiset.create();
-
- final Set<K> waitsForInvalidation = new HashSet<K>();
-
public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache cache, Map<K, V> map) {
this.dispatcher = dispatcher;
this.cache = cache;
this.map = map;
}
- /**
- * Add new invalidate action.
- *
- * @param keys to be invalidated
- */
- public void addInvalidate(Iterable<K> keys) {
- synchronized(this) {
- for (K key : keys) {
- queuedKeys.add(key);
- waitsForInvalidation.add(key);
- }
- }
- dispatcher.add(new InvalidateCacheAction<K, V>(this, keys));
- }
-
- /**
- * Add new put action
- *
- * @param key to be put to cache
- * @param value to be put to cache
- */
public void addPut(K key, V value) {
- synchronized(this) {
- queuedKeys.add(key);
- waitsForInvalidation.remove(key);
- }
- dispatcher.add(new PutToCacheAction<K, V>(this, key, value));
+ dispatcher.add(new PutToCacheAction<K, V>(key, value, this));
}
- /**
- * Check if the last action added for this key was invalidate
- *
- * @param key to check
- * @return {@code true} if the last added action was invalidate
- */
- public synchronized boolean waitsForInvalidation(K key) {
- return waitsForInvalidation.contains(key);
- }
-
- /**
- * Remove the action state when it's finished or cancelled.
- *
- * @param key to be removed
- */
- synchronized void remove(K key) {
- queuedKeys.remove(key);
- if (!queuedKeys.contains(key)) {
- waitsForInvalidation.remove(key);
- }
+ public void addInvalidate(Iterable<K> keys) {
+ dispatcher.add(new InvalidateCacheAction<K, V>(keys, this));
}
PersistentCache getCache() {
@@ -107,4 +49,4 @@ public class CacheWriteQueue<K, V> {
Map<K, V> getMap() {
return map;
}
-}
\ No newline at end of file
+}
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java Fri Oct 28 08:43:01 2016
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import java.util.Map;
+import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
/**
@@ -32,49 +33,26 @@ class InvalidateCacheAction<K, V> implem
private final Map<K, V> map;
- private final CacheWriteQueue<K, V> owner;
-
private final Iterable<K> keys;
- InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K> keys) {
- this.owner = cacheWriteQueue;
+ InvalidateCacheAction(Iterable<K> keys, CacheWriteQueue<K, V> queue) {
this.keys = keys;
- this.cache = cacheWriteQueue.getCache();
- this.map = cacheWriteQueue.getMap();
+ this.cache = queue.getCache();
+ this.map = queue.getMap();
}
@Override
public void execute() {
- try {
- if (map != null) {
- for (K key : keys) {
- cache.switchGenerationIfNeeded();
- map.remove(key);
- }
+ if (map != null) {
+ for (K key : keys) {
+ cache.switchGenerationIfNeeded();
+ map.remove(key);
}
- } finally {
- decrement();
}
}
@Override
- public void cancel() {
- decrement();
- }
-
- @Override
- public CacheWriteQueue<K, V> getOwner() {
- return owner;
- }
-
- @Override
- public Iterable<K> getAffectedKeys() {
- return keys;
- }
-
- private void decrement() {
- for (K key : keys) {
- owner.remove(key);
- }
+ public String toString() {
+ return new StringBuilder("InvalidateCacheAction").append(Iterables.toString(keys)).toString();
}
}
\ No newline at end of file
Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java Fri Oct 28 08:43:01 2016
@@ -20,6 +20,7 @@ import static java.util.Collections.sing
import java.util.Map;
+import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
/**
@@ -34,48 +35,27 @@ class PutToCacheAction<K, V> implements
private final Map<K, V> map;
- private final CacheWriteQueue<K, V> owner;
-
private final K key;
private final V value;
- PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) {
- this.owner = cacheWriteQueue;
+ PutToCacheAction(K key, V value, CacheWriteQueue<K, V> queue) {
this.key = key;
this.value = value;
- this.cache = cacheWriteQueue.getCache();
- this.map = cacheWriteQueue.getMap();
+ this.cache = queue.getCache();
+ this.map = queue.getMap();
}
@Override
public void execute() {
- try {
- if (map != null) {
- cache.switchGenerationIfNeeded();
- map.put(key, value);
- }
- } finally {
- decrement();
+ if (map != null) {
+ cache.switchGenerationIfNeeded();
+ map.put(key, value);
}
}
@Override
- public void cancel() {
- decrement();
- }
-
- @Override
- public CacheWriteQueue<K, V> getOwner() {
- return owner;
- }
-
- @Override
- public Iterable<K> getAffectedKeys() {
- return singleton(key);
- }
-
- private void decrement() {
- owner.remove(key);
+ public String toString() {
+ return new StringBuilder("PutToCacheAction[").append(key).append(']').toString();
}
}
\ No newline at end of file
Added: jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java?rev=1766970&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java (added)
+++ jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java Fri Oct 28 08:43:01 2016
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import com.google.common.cache.RemovalCause;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.plugins.document.PathRev;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+
+public class AsyncQueueTest {
+
+ private static final StringValue VAL = new StringValue("xyz");
+
+ private List<PathRev> putActions;
+
+ private List<PathRev> invalidateActions;
+
+ private NodeCache<PathRev, StringValue> nodeCache;
+
+ private int id;
+
+ @Before
+ public void setup() throws IOException {
+ FileUtils.deleteDirectory(new File("target/cacheTest"));
+ PersistentCache pCache = new PersistentCache("target/cacheTest,+async");
+ final AtomicReference<NodeCache<PathRev, StringValue>> nodeCacheRef = new AtomicReference<NodeCache<PathRev, StringValue>>();
+ CacheLIRS<PathRev, StringValue> cache = new CacheLIRS.Builder<PathRev, StringValue>().maximumSize(1).evictionCallback(new CacheLIRS.EvictionCallback<PathRev, StringValue>() {
+ @Override
+ public void evicted(@Nonnull PathRev key, @Nullable StringValue value, @Nonnull RemovalCause cause) {
+ if (nodeCacheRef.get() != null) {
+ nodeCacheRef.get().evicted(key, value, cause);
+ }
+ }
+ }).build();
+ nodeCache = (NodeCache<PathRev, StringValue>) pCache.wrap(null, null, cache, CacheType.NODE);
+ nodeCacheRef.set(nodeCache);
+
+ CacheWriteQueueWrapper writeQueue = new CacheWriteQueueWrapper(nodeCache.writeQueue);
+ nodeCache.writeQueue = writeQueue;
+
+ this.putActions = writeQueue.putActions;
+ this.invalidateActions = writeQueue.invalidateActions;
+ this.id = 0;
+ }
+
+ @Test
+ public void unusedItemsShouldntBePersisted() {
+ PathRev k = generatePathRev();
+ nodeCache.put(k, VAL);
+ flush();
+ assertEquals(emptyList(), putActions);
+ }
+
+ @Test
+ public void readItemsShouldntBePersistedAgain() {
+ PathRev k = generatePathRev();
+ nodeCache.put(k, VAL);
+ nodeCache.getIfPresent(k);
+ flush();
+ assertEquals(asList(k), putActions);
+
+ putActions.clear();
+ nodeCache.getIfPresent(k); // k should be loaded from persisted cache
+ flush();
+ assertEquals(emptyList(), putActions); // k is not persisted again
+ }
+
+ @Test
+ public void usedItemsShouldBePersisted() {
+ PathRev k = generatePathRev();
+ nodeCache.put(k, VAL);
+ nodeCache.getIfPresent(k);
+ flush();
+ assertEquals(asList(k), putActions);
+ }
+
+ private PathRev generatePathRev() {
+ return new PathRev("/" + id++, new RevisionVector(new Revision(0, 0, 0)));
+ }
+
+ private void flush() {
+ for (int i = 0; i < 1024; i++) {
+ nodeCache.put(generatePathRev(), VAL); // cause eviction of k
+ }
+ }
+
+ private static class CacheWriteQueueWrapper extends CacheWriteQueue<PathRev, StringValue> {
+
+ private final CacheWriteQueue<PathRev, StringValue> wrapped;
+
+ private final List<PathRev> putActions = newArrayList();
+
+ private final List<PathRev> invalidateActions = newArrayList();
+
+ public CacheWriteQueueWrapper(CacheWriteQueue<PathRev, StringValue> wrapped) {
+ super(null, null, null);
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void addPut(PathRev key, StringValue value) {
+ putActions.add(key);
+ wrapped.addPut(key, value);
+ }
+
+ public void addInvalidate(Iterable<PathRev> keys) {
+ invalidateActions.addAll(newArrayList(keys));
+ wrapped.addInvalidate(keys);
+ }
+ }
+
+}
Modified: jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java Fri Oct 28 08:43:01 2016
@@ -18,33 +18,21 @@
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
-import static com.google.common.collect.ImmutableSet.of;
-import static com.google.common.collect.Iterables.size;
import static java.lang.String.valueOf;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE;
import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction;
import org.junit.Test;
import org.mockito.Mockito;
@@ -59,17 +47,8 @@ public class CacheActionDispatcherTest {
for (int i = 0; i < MAX_SIZE + 10; i++) {
dispatcher.add(createWriteAction(valueOf(i), queue));
}
- assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size());
- assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString());
-
- InvalidateCacheAction<?, ?> invalidateAction = null;
- for (CacheAction<?, ?> action : dispatcher.queue) {
- if (action instanceof InvalidateCacheAction) {
- invalidateAction = (InvalidateCacheAction<?, ?>) action;
- }
- }
- assertNotNull(invalidateAction);
- assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys()));
+ assertEquals(MAX_SIZE, dispatcher.queue.size());
+ assertEquals("0", dispatcher.queue.peek().toString());
}
@Test
@@ -128,31 +107,6 @@ public class CacheActionDispatcherTest {
assertFalse(queueThread.isAlive());
}
- @Test
- public void testExecuteInvalidatesOnShutdown() throws InterruptedException {
- Map<String, Object> cacheMap = new HashMap<String, Object>();
- CacheActionDispatcher dispatcher = new CacheActionDispatcher();
- CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String, Object>(dispatcher,
- Mockito.mock(PersistentCache.class), cacheMap);
- Thread queueThread = new Thread(dispatcher);
- queueThread.start();
-
- cacheMap.put("2", new Object());
- cacheMap.put("3", new Object());
- cacheMap.put("4", new Object());
- dispatcher.add(new DummyCacheWriteAction("1", queue, 100));
- dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("2")));
- dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("3")));
- dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("4")));
- Thread.sleep(10); // make sure the first action started
-
- dispatcher.stop();
- assertEquals(of("2", "3", "4"), cacheMap.keySet());
-
- queueThread.join();
- assertTrue(cacheMap.isEmpty());
- }
-
private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue<String, Object> queue) {
return new DummyCacheWriteAction(id, queue);
}
@@ -188,22 +142,9 @@ public class CacheActionDispatcherTest {
}
@Override
- public void cancel() {
- }
-
- @Override
public String toString() {
return id;
}
- @Override
- public Iterable<String> getAffectedKeys() {
- return Collections.singleton(id);
- }
-
- @Override
- public CacheWriteQueue<String, Object> getOwner() {
- return queue;
- }
}
}