You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2016/05/04 14:21:38 UTC
svn commit: r1742290 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Author: reschke
Date: Wed May 4 14:21:38 2016
New Revision: 1742290
URL: http://svn.apache.org/viewvc?rev=1742290&view=rev
Log:
OAK-4320: use the cache tracker in the RDB Document Store
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1742290&r1=1742289&r2=1742290&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Wed May 4 14:21:38 2016
@@ -76,6 +76,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks;
@@ -89,9 +90,6 @@ import com.google.common.collect.Immutab
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.hash.BloomFilter;
-import com.google.common.hash.Funnel;
-import com.google.common.hash.PrimitiveSink;
/**
* Implementation of {@link DocumentStore} for relational databases.
@@ -665,6 +663,15 @@ public class RDBDocumentStore implements
}
}
+ private <T extends Document> T getIfCached(Collection<T> collection, String id, long modCount) {
+ T doc = getIfCached(collection, id);
+ if (doc != null && doc.getModCount() == modCount) {
+ return doc;
+ } else {
+ return null;
+ }
+ }
+
@Override
public Iterable<CacheStats> getCacheStats() {
return nodesCache.getCacheStats();
@@ -1322,7 +1329,6 @@ public class RDBDocumentStore implements
for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
- Set<QueryContext> seenQueryContext = Collections.emptySet();
Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
if (collection == Collection.NODES) {
@@ -1330,17 +1336,7 @@ public class RDBDocumentStore implements
cachedDocs = new HashMap<String, NodeDocument>();
for (String key : chunkedIds) {
cachedDocs.put(key, nodesCache.getIfPresent(key));
- }
-
- // keep concurrently running queries from updating
- // the cache entry for this key
- seenQueryContext = new HashSet<QueryContext>();
- for (QueryContext qc : qmap.values()) {
- qc.addKeys(chunkedIds);
- seenQueryContext.add(qc);
- }
- for (String id : chunkedIds) {
- nodesCache.invalidate(id);
+ nodesCache.invalidate(key);
}
}
@@ -1363,13 +1359,6 @@ public class RDBDocumentStore implements
}
if (success) {
if (collection == Collection.NODES) {
- // keep concurrently running queries from updating
- // the cache entry for this key
- for (QueryContext qc : qmap.values()) {
- if (!seenQueryContext.contains(qc)) {
- qc.addKeys(chunkedIds);
- }
- }
for (String id : chunkedIds) {
nodesCache.invalidate(id);
}
@@ -1391,72 +1380,6 @@ public class RDBDocumentStore implements
}
}
- /**
- * Class used to track which documents may have been updated since the start
- * of the query and thus may not put into the cache.
- */
- private class QueryContext {
-
- private static final double FPP = 0.01d;
- private static final int ENTRIES_SCOPED = 1000;
- private static final int ENTRIES_OPEN = 10000;
-
- private final String fromKey, toKey;
- private volatile BloomFilter<String> filter = null;
-
- private BloomFilter<String> getFilter() {
- if (filter == null) {
- synchronized (this) {
- if (filter == null) {
- filter = BloomFilter.create(new Funnel<String>() {
- private static final long serialVersionUID = -7114267990225941161L;
-
- @Override
- public void funnel(String from, PrimitiveSink into) {
- into.putUnencodedChars(from);
- }
- }, toKey.equals(NodeDocument.MAX_ID_VALUE) ? ENTRIES_OPEN : ENTRIES_SCOPED, FPP);
- }
- }
- }
- return filter;
- }
-
- public QueryContext(String fromKey, String toKey) {
- this.fromKey = fromKey;
- this.toKey = toKey;
- }
-
- public void addKey(String key) {
- if (fromKey.compareTo(key) < 0 && toKey.compareTo(key) > 0) {
- getFilter().put(key);
- }
- }
-
- public void addKeys(List<String> keys) {
- for (String key: keys) {
- addKey(key);
- }
- }
-
- public boolean mayUpdate(String key) {
- return filter == null ? true : !getFilter().mightContain(key);
- }
-
- synchronized public void dispose() {
- if (LOG.isDebugEnabled()) {
- if (filter != null) {
- LOG.debug("Disposing QueryContext for range " + fromKey + "..." + toKey + " - filter fpp was: "
- + filter.expectedFpp());
- } else {
- LOG.debug("Disposing QueryContext for range " + fromKey + "..." + toKey + " - no filter was needed");
- }
- }
- }
- }
-
- private Map<Thread, QueryContext> qmap = new ConcurrentHashMap<Thread, QueryContext>();
-
private <T extends Document> List<T> internalQuery(Collection<T> collection, String fromKey, String toKey,
List<String> excludeKeyPatterns, List<QueryCondition> conditions, int limit) {
Connection connection = null;
@@ -1472,13 +1395,12 @@ public class RDBDocumentStore implements
final Stopwatch watch = startWatch();
int resultSize = 0;
+ CacheChangesTracker tracker = null;
try {
- long now = System.currentTimeMillis();
- QueryContext qp = null;
if (collection == Collection.NODES) {
- qp = new QueryContext(fromKey, toKey);
- qmap.put(Thread.currentThread(), qp);
+ tracker = nodesCache.registerTracker(fromKey, toKey);
}
+ long now = System.currentTimeMillis();
connection = this.ch.getROConnection();
String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey;
String to = collection == Collection.NODES && NodeDocument.MAX_ID_VALUE.equals(toKey) ? null : toKey;
@@ -1488,20 +1410,31 @@ public class RDBDocumentStore implements
int size = dbresult.size();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
- RDBRow row = dbresult.set(i, null); // free RDBRow ASAP
- T doc = runThroughCache(collection, row, now, qp);
+ // free RDBRow as early as possible
+ RDBRow row = dbresult.set(i, null);
+ T doc = getIfCached(collection, row.getId(), row.getModcount());
+ if (doc == null) {
+ // parse DB contents into document if and only if it's not
+ // already in the cache
+ doc = convertFromDBObject(collection, row);
+ } else {
+ // otherwise mark it as fresh
+ ((NodeDocument) doc).markUpToDate(now);
+ }
result.add(doc);
}
- resultSize = result.size();
- if (qp != null) {
- qp.dispose();
+ if (collection == Collection.NODES) {
+ nodesCache.putNonConflictingDocs(tracker, castAsNodeDocumentList(result));
}
+ resultSize = result.size();
return result;
} catch (Exception ex) {
LOG.error("SQL exception on query", ex);
throw new DocumentStoreException(ex);
} finally {
- qmap.remove(Thread.currentThread());
+ if (tracker != null) {
+ tracker.close();
+ }
this.ch.closeConnection(connection);
stats.doneQuery(watch.elapsed(TimeUnit.NANOSECONDS), collection, fromKey, toKey,
!conditions.isEmpty(), resultSize, -1, false);
@@ -1830,6 +1763,11 @@ public class RDBDocumentStore implements
return (T) doc;
}
+ @SuppressWarnings("unchecked")
+ private static <T extends Document> List<NodeDocument> castAsNodeDocumentList(List<T> list) {
+ return (List<NodeDocument>) list;
+ }
+
private NodeDocumentCache nodesCache;
private NodeDocumentLocks locks;
@@ -1864,43 +1802,6 @@ public class RDBDocumentStore implements
return ser.fromRow(collection, row);
}
- private <T extends Document> T runThroughCache(Collection<T> collection, RDBRow row, long now, QueryContext qp) {
-
- if (collection != Collection.NODES) {
- // not in the cache anyway
- return convertFromDBObject(collection, row);
- }
-
- String id = row.getId();
- NodeDocument inCache = nodesCache.getIfPresent(id);
- Long modCount = row.getModcount();
-
- // do not overwrite document in cache if the
- // existing one in the cache is newer
- if (inCache != null && inCache != NodeDocument.NULL) {
- // check mod count
- Long cachedModCount = inCache.getModCount();
- if (cachedModCount == null) {
- throw new IllegalStateException("Missing " + Document.MOD_COUNT);
- }
- if (modCount <= cachedModCount) {
- // we can use the cached document
- inCache.markUpToDate(now);
- return castAsT(inCache);
- }
- }
-
- NodeDocument fresh = (NodeDocument) convertFromDBObject(collection, row);
- fresh.seal();
-
- if (!qp.mayUpdate(id)) {
- return castAsT(fresh);
- }
-
- nodesCache.putIfNewer(fresh);
- return castAsT(fresh);
- }
-
private static boolean hasChangesToCollisions(UpdateOp update) {
if (!USECMODCOUNT) {
return false;