You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2016/10/28 08:43:01 UTC

svn commit: r1766970 - in /jackrabbit/oak/branches/1.4/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/ main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/ test/java/org/apache/jackrabbit/oak/p...

Author: tomekr
Date: Fri Oct 28 08:43:01 2016
New Revision: 1766970

URL: http://svn.apache.org/viewvc?rev=1766970&view=rev
Log:
OAK-4882: Bottleneck in the asynchronous persistent cache

Added:
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
    jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
Removed:
    jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java
Modified:
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
    jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
    jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java

Added: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java?rev=1766970&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java (added)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java Fri Oct 28 08:43:01 2016
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.collect.Maps.newConcurrentMap;
+
+public class CacheMetadata<K> {
+
+    private final ConcurrentMap<K, MetadataEntry> metadataMap = newConcurrentMap();
+
+    private boolean enabled = true;
+
+    boolean isEnabled() {
+        return enabled;
+    }
+
+    void disable() {
+        this.enabled = false;
+    }
+
+    void put(K key) {
+        if (!enabled) {
+            return;
+        }
+        getOrCreate(key, false);
+    }
+
+    void putFromPersistenceAndIncrement(K key) {
+        if (!enabled) {
+            return;
+        }
+        getOrCreate(key, true).incrementCount();
+    }
+
+    void increment(K key) {
+        if (!enabled) {
+            return;
+        }
+        getOrCreate(key, false).incrementCount();
+    }
+
+    MetadataEntry remove(Object key) {
+        if (!enabled) {
+            return null;
+        }
+        return metadataMap.remove(key);
+    }
+
+    void putAll(Iterable<?> keys) {
+        if (!enabled) {
+            return;
+        }
+        for (Object k : keys) {
+            getOrCreate((K) k, false);
+        }
+    }
+
+    void incrementAll(Iterable<?> keys) {
+        if (!enabled) {
+            return;
+        }
+        for (Object k : keys) {
+            getOrCreate((K) k, false).incrementCount();
+        }
+    }
+
+    void removeAll(Iterable<?> keys) {
+        if (!enabled) {
+            return;
+        }
+        for (Object k : keys) {
+            metadataMap.remove(k);
+        }
+    }
+
+    void clear() {
+        if (!enabled) {
+            return;
+        }
+        metadataMap.clear();
+    }
+
+    private MetadataEntry getOrCreate(K key, boolean readFromPersistentCache) {
+        if (!enabled) {
+            return null;
+        }
+        MetadataEntry metadata = metadataMap.get(key);
+        if (metadata == null) {
+            MetadataEntry newEntry = new MetadataEntry(readFromPersistentCache);
+            MetadataEntry oldEntry = metadataMap.putIfAbsent(key, newEntry);
+            metadata = oldEntry == null ? newEntry : oldEntry;
+        }
+        return metadata;
+    }
+
+
+    static class MetadataEntry {
+
+        private final AtomicLong accessCount = new AtomicLong();
+
+        private final boolean readFromPersistentCache;
+
+        private MetadataEntry(boolean readFromPersistentCache) {
+            this.readFromPersistentCache = readFromPersistentCache;
+        }
+
+        void incrementCount() {
+            accessCount.incrementAndGet();
+        }
+
+        long getAccessCount() {
+            return accessCount.get();
+        }
+
+        boolean isReadFromPersistentCache() {
+            return readFromPersistentCache;
+        }
+    }
+
+}

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java Fri Oct 28 08:43:01 2016
@@ -56,12 +56,6 @@ class NodeCache<K, V> implements Cache<K
 
     private static final Set<RemovalCause> EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
 
-    /**
-     * Whether to use the queue to put items into cache. Default: false (cache
-     * will be updated synchronously).
-     */
-    private static final boolean ASYNC_CACHE = Boolean.getBoolean("oak.cache.asynchronous");
-
     private final PersistentCache cache;
     private final PersistentCacheStats stats;
     private final Cache<K, V> memCache;
@@ -69,7 +63,8 @@ class NodeCache<K, V> implements Cache<K
     private final CacheType type;
     private final DataType keyType;
     private final DataType valueType;
-    private final CacheWriteQueue<K, V> writerQueue;
+    private final CacheMetadata<K> memCacheMetadata;
+    CacheWriteQueue<K, V> writeQueue;
 
     NodeCache(
             PersistentCache cache,
@@ -86,12 +81,14 @@ class NodeCache<K, V> implements Cache<K
         map = new MultiGenerationMap<K, V>();
         keyType = new KeyDataType(type);
         valueType = new ValueDataType(docNodeStore, docStore, type);
-        if (ASYNC_CACHE) {
-            this.writerQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
-            LOG.info("The persistent cache writes will be asynchronous");
+        this.memCacheMetadata = new CacheMetadata<K>();
+        if (cache.isAsyncCache()) {
+            this.writeQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
+            LOG.info("The persistent cache {} writes will be asynchronous", type);
         } else {
-            this.writerQueue = null;
-            LOG.info("The persistent cache writes will be synchronous");
+            this.writeQueue = null;
+            this.memCacheMetadata.disable();
+            LOG.info("The persistent cache {} writes will be synchronous", type);
         }
         this.stats = new PersistentCacheStats(type, statisticsProvider);
     }
@@ -121,9 +118,6 @@ class NodeCache<K, V> implements Cache<K
     }
     
     private V readIfPresent(K key) {
-        if (ASYNC_CACHE && writerQueue.waitsForInvalidation(key)) {
-            return null;
-        }
         cache.switchGenerationIfNeeded();
         TimerStats.Context ctx = stats.startReadTimer();
         V v = map.get(key);
@@ -169,6 +163,7 @@ class NodeCache<K, V> implements Cache<K
     public V getIfPresent(Object key) {
         V value = memCache.getIfPresent(key);
         if (value != null) {
+            memCacheMetadata.increment((K) key);
             return value;
         }
         stats.markRequest();
@@ -176,6 +171,7 @@ class NodeCache<K, V> implements Cache<K
         value = readIfPresent((K) key);
         if (value != null) {
             memCache.put((K) key, value);
+            memCacheMetadata.putFromPersistenceAndIncrement((K) key);
             stats.markHit();
         }
         return value;
@@ -196,8 +192,9 @@ class NodeCache<K, V> implements Cache<K
         TimerStats.Context ctx = stats.startLoaderTimer();
         try {
             value = memCache.get(key, valueLoader);
+            memCacheMetadata.increment(key);
             ctx.stop();
-            if (!ASYNC_CACHE) {
+            if (!cache.isAsyncCache()) {
                 write((K) key, value);
             }
             broadcast(key, value);
@@ -211,13 +208,16 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public ImmutableMap<K, V> getAllPresent(
             Iterable<?> keys) {
-        return memCache.getAllPresent(keys);
+        ImmutableMap<K, V> result = memCache.getAllPresent(keys);
+        memCacheMetadata.incrementAll(keys);
+        return result;
     }
 
     @Override
     public void put(K key, V value) {
         memCache.put(key, value);
-        if (!ASYNC_CACHE) {
+        memCacheMetadata.put(key);
+        if (!cache.isAsyncCache()) {
             write((K) key, value);
         }
         broadcast(key, value);
@@ -227,8 +227,9 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public void invalidate(Object key) {
         memCache.invalidate(key);
-        if (ASYNC_CACHE) {
-            writerQueue.addInvalidate(singleton((K) key));
+        memCacheMetadata.remove(key);
+        if (cache.isAsyncCache()) {
+            writeQueue.addInvalidate(singleton((K) key));
         } else {
             write((K) key, null);
         }
@@ -239,16 +240,19 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public void putAll(Map<? extends K, ? extends V> m) {
         memCache.putAll(m);
+        memCacheMetadata.putAll(m.keySet());
     }
 
     @Override
     public void invalidateAll(Iterable<?> keys) {
         memCache.invalidateAll(keys);
+        memCacheMetadata.removeAll(keys);
     }
 
     @Override
     public void invalidateAll() {
         memCache.invalidateAll();
+        memCacheMetadata.clear();
         map.clear();
         stats.markInvalidateAll();
     }
@@ -271,6 +275,7 @@ class NodeCache<K, V> implements Cache<K
     @Override
     public void cleanUp() {
         memCache.cleanUp();
+        memCacheMetadata.clear();
     }
 
     @Override
@@ -281,12 +286,14 @@ class NodeCache<K, V> implements Cache<K
         if (buff.get() == 0) {
             value = null;
             memCache.invalidate(key);
+            memCacheMetadata.remove(key);
         } else {
             value = (V) valueType.read(buff);
             memCache.put(key, value);
+            memCacheMetadata.put(key);
         }
         stats.markRecvBroadcast();
-        if (!ASYNC_CACHE) {
+        if (!cache.isAsyncCache()) {
             write(key, value);
         }
     }
@@ -296,9 +303,11 @@ class NodeCache<K, V> implements Cache<K
      */
     @Override
     public void evicted(K key, V value, RemovalCause cause) {
-        if (ASYNC_CACHE && EVICTION_CAUSES.contains(cause) && value != null) { 
+        if (cache.isAsyncCache() && EVICTION_CAUSES.contains(cause) && value != null) {
             // invalidations are handled separately
-            writerQueue.addPut(key, value);
+            if (qualifiesToPersist(memCacheMetadata.remove(key))) {
+                writeQueue.addPut(key, value);
+            }
 
             long memory = 0L;
             memory += (key == null ? 0L: keyType.getMemory(key));
@@ -308,8 +317,11 @@ class NodeCache<K, V> implements Cache<K
         }
     }
 
+    private boolean qualifiesToPersist(CacheMetadata.MetadataEntry metadata) {
+        return metadata == null || (!metadata.isReadFromPersistentCache() && metadata.getAccessCount() > 0);
+    }
+
     public PersistentCacheStats getPersistentCacheStats() {
         return stats;
     }
-
 }
\ No newline at end of file

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java Fri Oct 28 08:43:01 2016
@@ -53,7 +53,13 @@ import com.google.common.cache.Cache;
 public class PersistentCache implements Broadcaster.Listener {
     
     static final Logger LOG = LoggerFactory.getLogger(PersistentCache.class);
-   
+
+    /**
+     * Whether to use the queue to put items into cache. Default: false (cache
+     * will be updated synchronously).
+     */
+    private static final boolean ASYNC_CACHE = Boolean.getBoolean("oak.cache.asynchronous");
+
     private static final String FILE_PREFIX = "cache-";
     private static final String FILE_SUFFIX = ".data";
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -67,6 +73,7 @@ public class PersistentCache implements
     private boolean cacheDocChildren;
     private boolean compactOnClose;
     private boolean compress = true;
+    private boolean asyncCache = ASYNC_CACHE;
     private HashMap<CacheType, GenerationCache> caches = 
             new HashMap<CacheType, GenerationCache>();
     
@@ -140,6 +147,10 @@ public class PersistentCache implements
                 manualCommit = true;
             } else if (p.startsWith("broadcast=")) {
                 broadcast = p.split("=")[1];               
+            } else if (p.equals("+async")) {
+                asyncCache = true;
+            } else if (p.equals("-async")) {
+                asyncCache = false;
             }
         }
         this.directory = dir;
@@ -502,7 +513,11 @@ public class PersistentCache implements
     public int getExceptionCount() {
         return exceptionCount;
     }
-    
+
+    public boolean isAsyncCache() {
+        return asyncCache;
+    }
+
     void broadcast(CacheType type, Function<WriteBuffer, Void> writer) {
         Broadcaster b = broadcaster;
         if (b == null) {

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java Fri Oct 28 08:43:01 2016
@@ -29,23 +29,4 @@ interface CacheAction<K, V> {
      */
     void execute();
 
-    /**
-     * Cancel the action without executing it
-     */
-    void cancel();
-
-    /**
-     * Return the keys affected by this action
-     *
-     * @return keys affected by this action
-     */
-    Iterable<K> getAffectedKeys();
-
-    /**
-     * Return the owner of this action
-     *
-     * @return {@link CacheWriteQueue} executing this action
-     */
-    CacheWriteQueue<K, V> getOwner();
-
 }

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java Fri Oct 28 08:43:01 2016
@@ -16,13 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
 
-import static com.google.common.collect.Multimaps.index;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -30,13 +23,10 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
 /**
- * An asynchronous buffer of the CacheAction objects. The buffer removes
- * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than
- * {@link #MAX_SIZE}.
+ * An asynchronous buffer of the CacheAction objects. The buffer only accepts
+ * {@link #MAX_SIZE} number of elements. If the queue is already full, the new
+ * elements are dropped.
  */
 public class CacheActionDispatcher implements Runnable {
 
@@ -45,14 +35,9 @@ public class CacheActionDispatcher imple
     /**
      * What's the length of the queue.
      */
-    static final int MAX_SIZE = 1024;
-
-    /**
-     * How many actions remove once the queue is longer than {@link #MAX_SIZE}.
-     */
-    static final int ACTIONS_TO_REMOVE = 256;
+    static final int MAX_SIZE = 2048;
 
-    final BlockingQueue<CacheAction<?, ?>> queue = new ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE * 2);
+    final BlockingQueue<CacheAction<?, ?>> queue = new ArrayBlockingQueue<CacheAction<?, ?>>(MAX_SIZE);
 
     private volatile boolean isRunning = true;
 
@@ -68,7 +53,6 @@ public class CacheActionDispatcher imple
                 LOG.debug("Interrupted the queue.poll()", e);
             }
         }
-        applyInvalidateActions();
     }
 
     /**
@@ -79,91 +63,13 @@ public class CacheActionDispatcher imple
     }
 
     /**
-     * Adds the new action and cleans the queue if necessary.
+     * Tries to add new action.
      *
      * @param action to be added
      */
-    synchronized void add(CacheAction<?, ?> action) {
-        if (queue.size() >= MAX_SIZE) {
-            cleanTheQueue();
-        }
-        queue.offer(action);
-    }
-
-    /**
-     * Clean the queue and add a single invalidate action for all the removed entries. 
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private void cleanTheQueue() {
-        List<CacheAction> removed = removeOldest();
-        for (Entry<CacheWriteQueue, Collection<CacheAction>> e : groupByOwner(removed).entrySet()) {
-            CacheWriteQueue owner = e.getKey();
-            Collection<CacheAction> actions = e.getValue();
-            List<Object> affectedKeys = cancelAll(actions);
-            owner.addInvalidate(affectedKeys);
-        }
-    }
-
-    /**
-     * Remove {@link #ACTIONS_TO_REMOVE} oldest actions.
-     *
-     * @return A list of removed items.
-     */
-    @SuppressWarnings("rawtypes")
-    private List<CacheAction> removeOldest() {
-        List<CacheAction> removed = new ArrayList<CacheAction>();
-        while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) {
-            CacheAction toBeCanceled = queue.poll();
-            if (toBeCanceled == null) {
-                break;
-            } else {
-                removed.add(toBeCanceled);
-            }
+    void add(CacheAction<?, ?> action) {
+        if (!queue.offer(action)) {
+            LOG.trace("The queue is full, element {} has been rejected", action);
         }
-        return removed;
     }
-
-    /**
-     * Group passed actions by their owners.
-     *
-     * @param actions to be grouped
-     * @return map in which owner is the key and assigned action list is the value
-     */
-    @SuppressWarnings("rawtypes")
-    private static Map<CacheWriteQueue, Collection<CacheAction>> groupByOwner(List<CacheAction> actions) {
-        return index(actions, new Function<CacheAction, CacheWriteQueue>() {
-            @Override
-            public CacheWriteQueue apply(CacheAction input) {
-                return input.getOwner();
-            }
-        }).asMap();
-    }
-
-    /**
-     * Cancel all passed actions.
-     *
-     * @param actions to cancel
-     * @return list of affected keys
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private static List<Object> cancelAll(Collection<CacheAction> actions) {
-        List<Object> cancelledKeys = new ArrayList<Object>();
-        for (CacheAction action : actions) {
-            action.cancel();
-            Iterables.addAll(cancelledKeys, action.getAffectedKeys());
-        }
-        return cancelledKeys;
-    }
-
-    @SuppressWarnings("rawtypes")
-    private void applyInvalidateActions() {
-        CacheAction action;
-        do {
-            action = queue.poll();
-            if (action instanceof InvalidateCacheAction) {
-                action.execute();
-            }
-        } while (action != null);
-    }
-
 }
\ No newline at end of file

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java Fri Oct 28 08:43:01 2016
@@ -16,21 +16,10 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
+import java.util.Map;
 
-/**
- * A fronted for the {@link CacheActionDispatcher} creating actions and maintaining their state.
- *
- * @param <K> key type
- * @param <V> value type
- */
 public class CacheWriteQueue<K, V> {
 
     private final CacheActionDispatcher dispatcher;
@@ -39,65 +28,18 @@ public class CacheWriteQueue<K, V> {
 
     private final Map<K, V> map;
 
-    final Multiset<K> queuedKeys = HashMultiset.create();
-
-    final Set<K> waitsForInvalidation = new HashSet<K>();
-
     public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache cache, Map<K, V> map) {
         this.dispatcher = dispatcher;
         this.cache = cache;
         this.map = map;
     }
 
-    /**
-     * Add new invalidate action.
-     *
-     * @param keys to be invalidated
-     */
-    public void addInvalidate(Iterable<K> keys) {
-        synchronized(this) {
-            for (K key : keys) {
-                queuedKeys.add(key);
-                waitsForInvalidation.add(key);
-            }
-        }
-        dispatcher.add(new InvalidateCacheAction<K, V>(this, keys));
-    }
-
-    /**
-     * Add new put action
-     *
-     * @param key to be put to cache
-     * @param value to be put to cache
-     */
     public void addPut(K key, V value) {
-        synchronized(this) {
-            queuedKeys.add(key);
-            waitsForInvalidation.remove(key);
-        }
-        dispatcher.add(new PutToCacheAction<K, V>(this, key, value));
+        dispatcher.add(new PutToCacheAction<K, V>(key, value, this));
     }
 
-    /**
-     * Check if the last action added for this key was invalidate
-     *
-     * @param key to check 
-     * @return {@code true} if the last added action was invalidate
-     */
-    public synchronized boolean waitsForInvalidation(K key) {
-        return waitsForInvalidation.contains(key);
-    }
-
-    /**
-     * Remove the action state when it's finished or cancelled.
-     *
-     * @param key to be removed
-     */
-    synchronized void remove(K key) {
-        queuedKeys.remove(key);
-        if (!queuedKeys.contains(key)) {
-            waitsForInvalidation.remove(key);
-        }
+    public void addInvalidate(Iterable<K> keys) {
+        dispatcher.add(new InvalidateCacheAction<K, V>(keys, this));
     }
 
     PersistentCache getCache() {
@@ -107,4 +49,4 @@ public class CacheWriteQueue<K, V> {
     Map<K, V> getMap() {
         return map;
     }
-}
\ No newline at end of file
+}

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java Fri Oct 28 08:43:01 2016
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.Map;
 
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 
 /**
@@ -32,49 +33,26 @@ class InvalidateCacheAction<K, V> implem
 
     private final Map<K, V> map;
 
-    private final CacheWriteQueue<K, V> owner;
-
     private final Iterable<K> keys;
 
-    InvalidateCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, Iterable<K> keys) {
-        this.owner = cacheWriteQueue;
+    InvalidateCacheAction(Iterable<K> keys, CacheWriteQueue<K, V> queue) {
         this.keys = keys;
-        this.cache = cacheWriteQueue.getCache();
-        this.map = cacheWriteQueue.getMap();
+        this.cache = queue.getCache();
+        this.map = queue.getMap();
     }
 
     @Override
     public void execute() {
-        try {
-            if (map != null) {
-                for (K key : keys) {
-                    cache.switchGenerationIfNeeded();
-                    map.remove(key);
-                }
+        if (map != null) {
+            for (K key : keys) {
+                cache.switchGenerationIfNeeded();
+                map.remove(key);
             }
-        } finally {
-            decrement();
         }
     }
 
     @Override
-    public void cancel() {
-        decrement();
-    }
-
-    @Override
-    public CacheWriteQueue<K, V> getOwner() {
-        return owner;
-    }
-
-    @Override
-    public Iterable<K> getAffectedKeys() {
-        return keys;
-    }
-
-    private void decrement() {
-        for (K key : keys) {
-            owner.remove(key);
-        }
+    public String toString() {
+        return new StringBuilder("InvalidateCacheAction").append(Iterables.toString(keys)).toString();
     }
 }
\ No newline at end of file

Modified: jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java Fri Oct 28 08:43:01 2016
@@ -20,6 +20,7 @@ import static java.util.Collections.sing
 
 import java.util.Map;
 
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 
 /**
@@ -34,48 +35,27 @@ class PutToCacheAction<K, V> implements
 
     private final Map<K, V> map;
 
-    private final CacheWriteQueue<K, V> owner;
-
     private final K key;
 
     private final V value;
 
-    PutToCacheAction(CacheWriteQueue<K, V> cacheWriteQueue, K key, V value) {
-        this.owner = cacheWriteQueue;
+    PutToCacheAction(K key, V value, CacheWriteQueue<K, V> queue) {
         this.key = key;
         this.value = value;
-        this.cache = cacheWriteQueue.getCache();
-        this.map = cacheWriteQueue.getMap();
+        this.cache = queue.getCache();
+        this.map = queue.getMap();
     }
 
     @Override
     public void execute() {
-        try {
-            if (map != null) {
-                cache.switchGenerationIfNeeded();
-                map.put(key, value);
-            }
-        } finally {
-            decrement();
+        if (map != null) {
+            cache.switchGenerationIfNeeded();
+            map.put(key, value);
         }
     }
 
     @Override
-    public void cancel() {
-        decrement();
-    }
-
-    @Override
-    public CacheWriteQueue<K, V> getOwner() {
-        return owner;
-    }
-
-    @Override
-    public Iterable<K> getAffectedKeys() {
-        return singleton(key);
-    }
-
-    private void decrement() {
-        owner.remove(key);
+    public String toString() {
+        return new StringBuilder("PutToCacheAction[").append(key).append(']').toString();
     }
 }
\ No newline at end of file

Added: jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java?rev=1766970&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java (added)
+++ jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/AsyncQueueTest.java Fri Oct 28 08:43:01 2016
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.persistentCache;
+
+import com.google.common.cache.RemovalCause;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.plugins.document.PathRev;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
+import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+
+public class AsyncQueueTest {
+
+    private static final StringValue VAL = new StringValue("xyz");
+
+    private List<PathRev> putActions;
+
+    private List<PathRev> invalidateActions;
+
+    private NodeCache<PathRev, StringValue> nodeCache;
+
+    private int id;
+
+    @Before
+    public void setup() throws IOException {
+        FileUtils.deleteDirectory(new File("target/cacheTest"));
+        PersistentCache pCache = new PersistentCache("target/cacheTest,+async");
+        final AtomicReference<NodeCache<PathRev, StringValue>> nodeCacheRef = new AtomicReference<NodeCache<PathRev, StringValue>>();
+        CacheLIRS<PathRev, StringValue> cache = new CacheLIRS.Builder<PathRev, StringValue>().maximumSize(1).evictionCallback(new CacheLIRS.EvictionCallback<PathRev, StringValue>() {
+            @Override
+            public void evicted(@Nonnull PathRev key, @Nullable StringValue value, @Nonnull RemovalCause cause) {
+                if (nodeCacheRef.get() != null) {
+                    nodeCacheRef.get().evicted(key, value, cause);
+                }
+            }
+        }).build();
+        nodeCache = (NodeCache<PathRev, StringValue>) pCache.wrap(null, null, cache, CacheType.NODE);
+        nodeCacheRef.set(nodeCache);
+
+        CacheWriteQueueWrapper writeQueue = new CacheWriteQueueWrapper(nodeCache.writeQueue);
+        nodeCache.writeQueue = writeQueue;
+
+        this.putActions = writeQueue.putActions;
+        this.invalidateActions = writeQueue.invalidateActions;
+        this.id = 0;
+    }
+
+    @Test
+    public void unusedItemsShouldntBePersisted() {
+        PathRev k = generatePathRev();
+        nodeCache.put(k, VAL);
+        flush();
+        assertEquals(emptyList(), putActions);
+    }
+
+    @Test
+    public void readItemsShouldntBePersistedAgain() {
+        PathRev k = generatePathRev();
+        nodeCache.put(k, VAL);
+        nodeCache.getIfPresent(k);
+        flush();
+        assertEquals(asList(k), putActions);
+
+        putActions.clear();
+        nodeCache.getIfPresent(k); // k should be loaded from persisted cache
+        flush();
+        assertEquals(emptyList(), putActions); // k is not persisted again
+    }
+
+    @Test
+    public void usedItemsShouldBePersisted() {
+        PathRev k = generatePathRev();
+        nodeCache.put(k, VAL);
+        nodeCache.getIfPresent(k);
+        flush();
+        assertEquals(asList(k), putActions);
+    }
+
+    private PathRev generatePathRev() {
+        return new PathRev("/" + id++, new RevisionVector(new Revision(0, 0, 0)));
+    }
+
+    private void flush() {
+        for (int i = 0; i < 1024; i++) {
+            nodeCache.put(generatePathRev(), VAL); // cause eviction of k
+        }
+    }
+
+    private static class CacheWriteQueueWrapper extends CacheWriteQueue<PathRev, StringValue> {
+
+        private final CacheWriteQueue<PathRev, StringValue>  wrapped;
+
+        private final List<PathRev> putActions = newArrayList();
+
+        private final List<PathRev> invalidateActions = newArrayList();
+
+        public CacheWriteQueueWrapper(CacheWriteQueue<PathRev, StringValue>  wrapped) {
+            super(null, null, null);
+            this.wrapped = wrapped;
+        }
+
+        @Override
+        public void addPut(PathRev key, StringValue value) {
+            putActions.add(key);
+            wrapped.addPut(key, value);
+        }
+
+        public void addInvalidate(Iterable<PathRev> keys) {
+            invalidateActions.addAll(newArrayList(keys));
+            wrapped.addInvalidate(keys);
+        }
+    }
+
+}

Modified: jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java?rev=1766970&r1=1766969&r2=1766970&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java (original)
+++ jackrabbit/oak/branches/1.4/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java Fri Oct 28 08:43:01 2016
@@ -18,33 +18,21 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.persistentCache.async;
 
-import static com.google.common.collect.ImmutableSet.of;
-import static com.google.common.collect.Iterables.size;
 import static java.lang.String.valueOf;
 import static java.lang.System.currentTimeMillis;
 import static java.lang.Thread.sleep;
-import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE;
 import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
-import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -59,17 +47,8 @@ public class CacheActionDispatcherTest {
         for (int i = 0; i < MAX_SIZE + 10; i++) {
             dispatcher.add(createWriteAction(valueOf(i), queue));
         }
-        assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size());
-        assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString());
-
-        InvalidateCacheAction<?, ?> invalidateAction = null;
-        for (CacheAction<?, ?> action : dispatcher.queue) {
-            if (action instanceof InvalidateCacheAction) {
-                invalidateAction = (InvalidateCacheAction<?, ?>) action;
-            }
-        }
-        assertNotNull(invalidateAction);
-        assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys()));
+        assertEquals(MAX_SIZE, dispatcher.queue.size());
+        assertEquals("0", dispatcher.queue.peek().toString());
     }
 
     @Test
@@ -128,31 +107,6 @@ public class CacheActionDispatcherTest {
         assertFalse(queueThread.isAlive());
     }
 
-    @Test
-    public void testExecuteInvalidatesOnShutdown() throws InterruptedException {
-        Map<String, Object> cacheMap = new HashMap<String, Object>();
-        CacheActionDispatcher dispatcher = new CacheActionDispatcher();
-        CacheWriteQueue<String, Object> queue = new CacheWriteQueue<String, Object>(dispatcher,
-                Mockito.mock(PersistentCache.class), cacheMap);
-        Thread queueThread = new Thread(dispatcher);
-        queueThread.start();
-
-        cacheMap.put("2", new Object());
-        cacheMap.put("3", new Object());
-        cacheMap.put("4", new Object());
-        dispatcher.add(new DummyCacheWriteAction("1", queue, 100));
-        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("2")));
-        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("3")));
-        dispatcher.add(new InvalidateCacheAction<String, Object>(queue, Collections.singleton("4")));
-        Thread.sleep(10); // make sure the first action started
-
-        dispatcher.stop();
-        assertEquals(of("2", "3", "4"), cacheMap.keySet());
-
-        queueThread.join();
-        assertTrue(cacheMap.isEmpty());
-    }
-
     private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue<String, Object> queue) {
         return new DummyCacheWriteAction(id, queue);
     }
@@ -188,22 +142,9 @@ public class CacheActionDispatcherTest {
         }
 
         @Override
-        public void cancel() {
-        }
-
-        @Override
         public String toString() {
             return id;
         }
 
-        @Override
-        public Iterable<String> getAffectedKeys() {
-            return Collections.singleton(id);
-        }
-
-        @Override
-        public CacheWriteQueue<String, Object> getOwner() {
-            return queue;
-        }
     }
 }