You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2016/05/04 14:21:38 UTC

svn commit: r1742290 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java

Author: reschke
Date: Wed May  4 14:21:38 2016
New Revision: 1742290

URL: http://svn.apache.org/viewvc?rev=1742290&view=rev
Log:
OAK-4320: use the cache tracker in the RDB Document Store

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1742290&r1=1742289&r2=1742290&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Wed May  4 14:21:38 2016
@@ -76,6 +76,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
 import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks;
@@ -89,9 +90,6 @@ import com.google.common.collect.Immutab
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.hash.BloomFilter;
-import com.google.common.hash.Funnel;
-import com.google.common.hash.PrimitiveSink;
 
 /**
  * Implementation of {@link DocumentStore} for relational databases.
@@ -665,6 +663,15 @@ public class RDBDocumentStore implements
         }
     }
 
+    private <T extends Document> T getIfCached(Collection<T> collection, String id, long modCount) {
+        T doc = getIfCached(collection, id);
+        if (doc != null && doc.getModCount() == modCount) {
+            return doc;
+        } else {
+            return null;
+        }
+    }
+
     @Override
     public Iterable<CacheStats> getCacheStats() {
         return nodesCache.getCacheStats();
@@ -1322,7 +1329,6 @@ public class RDBDocumentStore implements
 
             for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
 
-                Set<QueryContext> seenQueryContext = Collections.emptySet();
                 Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
 
                 if (collection == Collection.NODES) {
@@ -1330,17 +1336,7 @@ public class RDBDocumentStore implements
                     cachedDocs = new HashMap<String, NodeDocument>();
                     for (String key : chunkedIds) {
                         cachedDocs.put(key, nodesCache.getIfPresent(key));
-                    }
-
-                    // keep concurrently running queries from updating
-                    // the cache entry for this key
-                    seenQueryContext = new HashSet<QueryContext>();
-                    for (QueryContext qc : qmap.values()) {
-                        qc.addKeys(chunkedIds);
-                        seenQueryContext.add(qc);
-                    }
-                    for (String id : chunkedIds) {
-                        nodesCache.invalidate(id);
+                        nodesCache.invalidate(key);
                     }
                 }
 
@@ -1363,13 +1359,6 @@ public class RDBDocumentStore implements
                 }
                 if (success) {
                     if (collection == Collection.NODES) {
-                        // keep concurrently running queries from updating
-                        // the cache entry for this key
-                        for (QueryContext qc : qmap.values()) {
-                            if (!seenQueryContext.contains(qc)) {
-                                qc.addKeys(chunkedIds);
-                            }
-                        }
                         for (String id : chunkedIds) {
                             nodesCache.invalidate(id);
                         }
@@ -1391,72 +1380,6 @@ public class RDBDocumentStore implements
         }
     }
 
-    /**
-     * Class used to track which documents may have been updated since the start
-     * of the query and thus may not put into the cache.
-     */
-    private class QueryContext {
-
-        private static final double FPP = 0.01d;
-        private static final int ENTRIES_SCOPED = 1000;
-        private static final int ENTRIES_OPEN = 10000;
-
-        private final String fromKey, toKey;
-        private volatile BloomFilter<String> filter = null;
-
-        private BloomFilter<String> getFilter() {
-            if (filter == null) {
-                synchronized (this) {
-                    if (filter == null) {
-                        filter = BloomFilter.create(new Funnel<String>() {
-                            private static final long serialVersionUID = -7114267990225941161L;
-
-                            @Override
-                            public void funnel(String from, PrimitiveSink into) {
-                                into.putUnencodedChars(from);
-                            }
-                        }, toKey.equals(NodeDocument.MAX_ID_VALUE) ? ENTRIES_OPEN : ENTRIES_SCOPED, FPP);
-                    }
-                }
-            }
-            return filter;
-        }
-
-        public QueryContext(String fromKey, String toKey) {
-            this.fromKey = fromKey;
-            this.toKey = toKey;
-        }
-
-        public void addKey(String key) {
-            if (fromKey.compareTo(key) < 0 && toKey.compareTo(key) > 0) {
-                getFilter().put(key);
-            }
-        }
-
-        public void addKeys(List<String> keys) {
-            for (String key: keys) {
-                addKey(key);
-            }
-        }
-
-        public boolean mayUpdate(String key) {
-            return filter == null ? true : !getFilter().mightContain(key);
-        }
-
-        synchronized public void dispose() {
-            if (LOG.isDebugEnabled()) {
-                if (filter != null) {
-                    LOG.debug("Disposing QueryContext for range " + fromKey + "..." + toKey + " - filter fpp was: "
-                            + filter.expectedFpp());
-                } else {
-                    LOG.debug("Disposing QueryContext for range " + fromKey + "..." + toKey + " - no filter was needed");
-                }
-            }
-        }
-    }
-
-    private Map<Thread, QueryContext> qmap = new ConcurrentHashMap<Thread, QueryContext>();
-
     private <T extends Document> List<T> internalQuery(Collection<T> collection, String fromKey, String toKey,
             List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit) {
         Connection connection = null;
@@ -1472,13 +1395,12 @@ public class RDBDocumentStore implements
 
         final Stopwatch watch = startWatch();
         int resultSize = 0;
+        CacheChangesTracker tracker = null;
         try {
-            long now = System.currentTimeMillis();
-            QueryContext qp = null;
             if (collection == Collection.NODES) {
-                qp = new QueryContext(fromKey, toKey);
-                qmap.put(Thread.currentThread(), qp);
+                tracker = nodesCache.registerTracker(fromKey, toKey);
             }
+            long now = System.currentTimeMillis();
             connection = this.ch.getROConnection();
             String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey;
             String to = collection == Collection.NODES && NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey;
@@ -1488,20 +1410,31 @@ public class RDBDocumentStore implements
             int size = dbresult.size();
             List<T> result = new ArrayList<T>(size);
             for (int i = 0; i < size; i++) {
-                RDBRow row = dbresult.set(i, null); // free RDBRow ASAP
-                T doc = runThroughCache(collection, row, now, qp);
+                // free RDBRow as early as possible
+                RDBRow row = dbresult.set(i, null);
+                T doc = getIfCached(collection, row.getId(), row.getModcount());
+                if (doc == null) {
+                    // parse DB contents into document if and only if it's not
+                    // already in the cache
+                    doc = convertFromDBObject(collection, row);
+                } else {
+                    // otherwise mark it as fresh
+                    ((NodeDocument) doc).markUpToDate(now);
+                }
                 result.add(doc);
             }
-            resultSize = result.size();
-            if (qp != null) {
-                qp.dispose();
+            if (collection == Collection.NODES) {
+                nodesCache.putNonConflictingDocs(tracker, castAsNodeDocumentList(result));
             }
+            resultSize = result.size();
             return result;
         } catch (Exception ex) {
             LOG.error("SQL exception on query", ex);
             throw new DocumentStoreException(ex);
         } finally {
-            qmap.remove(Thread.currentThread());
+            if (tracker != null) {
+                tracker.close();
+            }
             this.ch.closeConnection(connection);
             stats.doneQuery(watch.elapsed(TimeUnit.NANOSECONDS), collection, fromKey, toKey,
                     !conditions.isEmpty(), resultSize, -1, false);
@@ -1830,6 +1763,11 @@ public class RDBDocumentStore implements
         return (T) doc;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T extends Document> List<NodeDocument> castAsNodeDocumentList(List<T> list) {
+        return (List<NodeDocument>) list;
+    }
+
     private NodeDocumentCache nodesCache;
 
     private NodeDocumentLocks locks;
@@ -1864,43 +1802,6 @@ public class RDBDocumentStore implements
         return ser.fromRow(collection, row);
     }
 
-    private <T extends Document> T runThroughCache(Collection<T> collection, RDBRow row, long now, QueryContext qp) {
-
-        if (collection != Collection.NODES) {
-            // not in the cache anyway
-            return convertFromDBObject(collection, row);
-        }
-
-        String id = row.getId();
-        NodeDocument inCache = nodesCache.getIfPresent(id);
-        Long modCount = row.getModcount();
-
-        // do not overwrite document in cache if the
-        // existing one in the cache is newer
-        if (inCache != null && inCache != NodeDocument.NULL) {
-            // check mod count
-            Long cachedModCount = inCache.getModCount();
-            if (cachedModCount == null) {
-                throw new IllegalStateException("Missing " + Document.MOD_COUNT);
-            }
-            if (modCount <= cachedModCount) {
-                // we can use the cached document
-                inCache.markUpToDate(now);
-                return castAsT(inCache);
-            }
-        }
-
-        NodeDocument fresh = (NodeDocument) convertFromDBObject(collection, row);
-        fresh.seal();
-
-        if (!qp.mayUpdate(id)) {
-            return castAsT(fresh);
-        }
-
-        nodesCache.putIfNewer(fresh);
-        return castAsT(fresh);
-    }
-
     private static boolean hasChangesToCollisions(UpdateOp update) {
         if (!USECMODCOUNT) {
             return false;