You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/01/06 09:49:46 UTC
svn commit: r1723251 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/
main/java/org/apache/jackrabbit/oak/plugins/document/rdb/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: mreutegg
Date: Wed Jan 6 08:49:46 2016
New Revision: 1723251
URL: http://svn.apache.org/viewvc?rev=1723251&view=rev
Log:
OAK-3634: RDB/MongoDocumentStore may return stale documents
Implement fixes for RDB and MongoDocumentStore and enable tests
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1723251&r1=1723250&r2=1723251&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Wed Jan 6 08:49:46 2016
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -83,6 +82,8 @@ import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Predicates.notNull;
+import static com.google.common.collect.Maps.filterValues;
/**
* A document store that uses MongoDB as the backend.
@@ -284,22 +285,9 @@ public class MongoDocumentStore implemen
LOG.trace("invalidateCache: batch size: {} of total so far {}",
ids.size(), size);
}
-
- QueryBuilder query = QueryBuilder.start(Document.ID).in(ids);
- // Fetch only the modCount and id
- final BasicDBObject fields = new BasicDBObject(Document.ID, 1);
- fields.put(Document.MOD_COUNT, 1);
-
- DBCursor cursor = nodes.find(query.get(), fields);
- cursor.setReadPreference(ReadPreference.primary());
- result.queryCount++;
- Map<String, Number> modCounts = new HashMap<String, Number>();
- for (DBObject obj : cursor) {
- String id = (String) obj.get(Document.ID);
- Number modCount = (Number) obj.get(Document.MOD_COUNT);
- modCounts.put(id, modCount);
- }
+ Map<String, Number> modCounts = getModCounts(ids);
+ result.queryCount++;
int invalidated = nodesCache.invalidateOutdated(modCounts);
result.cacheEntriesProcessedCount += modCounts.size();
@@ -906,18 +894,26 @@ public class MongoDocumentStore implemen
try {
dbCollection.update(query.get(), update, false, true);
if (collection == Collection.NODES) {
+ Map<String, Number> modCounts = getModCounts(filterValues(cachedDocs, notNull()).keySet());
// update cache
for (Entry<String, NodeDocument> entry : cachedDocs.entrySet()) {
// the cachedDocs is not empty, so the collection = NODES
Lock lock = nodeLocks.acquire(entry.getKey());
try {
- if (entry.getValue() == null || entry.getValue() == NodeDocument.NULL) {
+ Number postUpdateModCount = modCounts.get(entry.getKey());
+ if (postUpdateModCount != null
+ && entry.getValue() != null
+ && entry.getValue() != NodeDocument.NULL
+ && (postUpdateModCount.longValue() - 1) == entry.getValue().getModCount()) {
+ // post update modCount is one higher than
+ // what we currently see in the cache. we can
+ // replace the cached document
+ NodeDocument newDoc = applyChanges(Collection.NODES, entry.getValue(), updateOp.shallowCopy(entry.getKey()));
+ nodesCache.replaceCachedDocument(entry.getValue(), newDoc);
+ } else {
// make sure concurrently loaded document is
// invalidated
nodesCache.invalidate(entry.getKey());
- } else {
- NodeDocument newDoc = applyChanges(Collection.NODES, entry.getValue(), updateOp.shallowCopy(entry.getKey()));
- nodesCache.replaceCachedDocument(entry.getValue(), newDoc);
}
} finally {
lock.unlock();
@@ -925,6 +921,11 @@ public class MongoDocumentStore implemen
}
}
} catch (MongoException e) {
+ // some documents may still have been updated
+ // invalidate all documents affected by this update call
+ for (String k : keys) {
+ nodesCache.invalidate(k);
+ }
throw DocumentStoreException.convert(e);
}
} finally {
@@ -932,6 +933,35 @@ public class MongoDocumentStore implemen
}
}
+ /**
+ * Returns the {@link Document#MOD_COUNT} value of the documents with the
+ * given {@code keys}. The returned map will only contain entries for
+ * existing documents.
+ *
+ * @param keys the keys of the documents.
+ * @return map with key to {@link Document#MOD_COUNT} value mapping.
+ * @throws MongoException if the call fails
+ */
+ @Nonnull
+ private Map<String, Number> getModCounts(Iterable<String> keys)
+ throws MongoException {
+ QueryBuilder query = QueryBuilder.start(Document.ID).in(keys);
+ // Fetch only the modCount and id
+ final BasicDBObject fields = new BasicDBObject(Document.ID, 1);
+ fields.put(Document.MOD_COUNT, 1);
+
+ DBCursor cursor = nodes.find(query.get(), fields);
+ cursor.setReadPreference(ReadPreference.primary());
+
+ Map<String, Number> modCounts = Maps.newHashMap();
+ for (DBObject obj : cursor) {
+ String id = (String) obj.get(Document.ID);
+ Number modCount = (Number) obj.get(Document.MOD_COUNT);
+ modCounts.put(id, modCount);
+ }
+ return modCounts;
+ }
+
DocumentReadPreference getReadPreference(int maxCacheAge){
if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) {
return DocumentReadPreference.PRIMARY;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1723251&r1=1723250&r2=1723251&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Wed Jan 6 08:49:46 2016
@@ -1261,6 +1261,9 @@ public class RDBDocumentStore implements
qc.addKeys(chunkedIds);
seenQueryContext.add(qc);
}
+ for (String id : chunkedIds) {
+ nodesCache.invalidate(id);
+ }
}
Connection connection = null;
@@ -1285,23 +1288,8 @@ public class RDBDocumentStore implements
qc.addKeys(chunkedIds);
}
}
- }
- for (Entry<String, NodeDocument> entry : cachedDocs.entrySet()) {
- T oldDoc = castAsT(entry.getValue());
- String id = entry.getKey();
- Lock lock = locks.acquire(id);
- try {
- if (oldDoc == null) {
- // make sure concurrently loaded document is
- // invalidated
- nodesCache.invalidate(id);
- } else {
- addUpdateCounters(update);
- T newDoc = createNewDocument(collection, oldDoc, update);
- nodesCache.replaceCachedDocument((NodeDocument) oldDoc, (NodeDocument) newDoc);
- }
- } finally {
- lock.unlock();
+ for (String id : chunkedIds) {
+ nodesCache.invalidate(id);
}
}
} else {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1723251&r1=1723250&r2=1723251&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java Wed Jan 6 08:49:46 2016
@@ -414,6 +414,35 @@ public class BasicDocumentStoreTest exte
}
@Test
+ public void testModifyModified() {
+ // https://issues.apache.org/jira/browse/OAK-2940
+ String id = this.getClass().getName() + ".testModifyModified";
+ // create a test node
+ UpdateOp up = new UpdateOp(id, true);
+ up.set("_id", id);
+ up.set("_modified", 1000L);
+ boolean success = super.ds.create(Collection.NODES, Collections.singletonList(up));
+ assertTrue(success);
+ removeMe.add(id);
+
+ // update with "max" operation
+ up = new UpdateOp(id, false);
+ up.set("_id", id);
+ up.max("_modified", 2000L);
+ super.ds.update(Collection.NODES, Collections.singletonList(id), up);
+ NodeDocument nd = super.ds.find(Collection.NODES, id, 0);
+ assertEquals(((Number)nd.get("_modified")).longValue(), 2000L);
+
+ // update with "set" operation
+ up = new UpdateOp(id, false);
+ up.set("_id", id);
+ up.set("_modified", 1500L);
+ super.ds.update(Collection.NODES, Collections.singletonList(id), up);
+ nd = super.ds.find(Collection.NODES, id, 0);
+ assertEquals(((Number)nd.get("_modified")).longValue(), 1500L);
+ }
+
+ @Test
public void testInterestingStrings() {
// test case "gclef:\uD834\uDD1E" will fail on MySQL unless properly configured to use utf8mb4 charset // Assume.assumeTrue(!(super.dsname.equals("RDB-MySQL")));
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java?rev=1723251&r1=1723250&r2=1723251&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java Wed Jan 6 08:49:46 2016
@@ -30,12 +30,16 @@ import java.util.Random;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
-import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MultiDocumentStoreTest extends AbstractMultiDocumentStoreTest {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiDocumentStoreTest.class);
+
public MultiDocumentStoreTest(DocumentStoreFixture dsf) {
super(dsf);
}
@@ -52,7 +56,7 @@ public class MultiDocumentStoreTest exte
UpdateOp up = new UpdateOp(id, true);
up.set("_id", id);
- up.set("_foo", 0l);
+ up.set("_foo", 0);
assertTrue(super.ds1.create(Collection.NODES, Collections.singletonList(up)));
removeMe.add(id);
@@ -61,7 +65,7 @@ public class MultiDocumentStoreTest exte
for (int i = 0; i < increments; i++) {
up = new UpdateOp(id, true);
up.set("_id", id);
- up.increment("_foo", 1l);
+ up.increment("_foo", 1);
if (i % 2 == 0) {
super.ds1.update(Collection.NODES, Collections.singletonList(id), up);
}
@@ -76,82 +80,6 @@ public class MultiDocumentStoreTest exte
}
@Test
- public void testInterleavedUpdate2() {
- String id = this.getClass().getName() + ".testInterleavedUpdate2";
-
- // remove if present
- NodeDocument nd1 = super.ds1.find(Collection.NODES, id);
- if (nd1 != null) {
- super.ds1.remove(Collection.NODES, id);
- }
-
- UpdateOp up = new UpdateOp(id, true);
- up.set("_id", id);
- up.set("_modified", 1L);
- assertTrue(super.ds1.create(Collection.NODES, Collections.singletonList(up)));
- removeMe.add(id);
-
- nd1 = super.ds1.find(Collection.NODES, id, 0);
- Number n = nd1.getModCount();
- if (n != null) {
- // Document store uses modCount
- int n1 = n.intValue();
-
- // get the document into ds2's cache
- NodeDocument nd2 = super.ds2.find(Collection.NODES, id, 0);
- int n2 = nd2.getModCount().intValue();
- assertEquals(n1, n2);
-
- UpdateOp upds1 = new UpdateOp(id, true);
- upds1.set("_id", id);
- upds1.set("foo", "bar");
- upds1.set("_modified", 2L);
- super.ds1.update(Collection.NODES, Collections.singletonList(id), upds1);
- nd1 = super.ds1.find(Collection.NODES, id);
- int oldn1 = n1;
- n1 = nd1.getModCount().intValue();
- assertEquals(oldn1 + 1, n1);
- assertEquals("bar", nd1.get("foo"));
-
- // modify in DS2
- UpdateOp upds2 = new UpdateOp(id, true);
- upds2.set("_id", id);
- upds2.set("foo", "qux");
- upds2.set("_modified", 3L);
- super.ds2.update(Collection.NODES, Collections.singletonList(id), upds2);
- nd2 = super.ds2.find(Collection.NODES, id);
- n2 = nd2.getModCount().intValue();
- assertEquals(oldn1 + 1, n2);
- assertEquals("qux", nd2.get("foo"));
-
- // both stores are now at the same modCount with different contents
- upds1 = new UpdateOp(id, true);
- upds1.set("_id", id);
- upds1.set("foo", "barbar");
- upds1.max("_modified", 0L);
- NodeDocument prev = super.ds1.findAndUpdate(Collection.NODES, upds1);
- // prev document should contain mod from DS2
- assertEquals("qux", prev.get("foo"));
- assertEquals(oldn1 + 2, prev.getModCount().intValue());
- assertEquals(3L, prev.getModified().intValue());
-
- // the new document must not have a _modified time smaller than
- // before the update
- nd1 = super.ds1.find(Collection.NODES, id, 0);
- assertEquals(super.dsname + ": _modified value must never ever get smaller", 3L, nd1.getModified().intValue());
-
- // verify that _modified can indeed be *set* to a smaller value, see
- // https://issues.apache.org/jira/browse/OAK-2940
- upds1 = new UpdateOp(id, true);
- upds1.set("_id", id);
- upds1.set("_modified", 0L);
- super.ds1.findAndUpdate(Collection.NODES, upds1);
- nd1 = super.ds1.find(Collection.NODES, id, 0);
- assertEquals(super.dsname + ": _modified value must be set to 0", 0L, nd1.getModified().intValue());
- }
- }
-
- @Test
public void testInvalidateCache() {
// use a "proper" ID because otherwise Mongo's cache invalidation will fail
// see OAK-2588
@@ -281,7 +209,6 @@ public class MultiDocumentStoreTest exte
}
}
- @Ignore("OAK-3634")
@Test
public void testChangeVisibility() {
String id = this.getClass().getName() + ".testChangeVisibility";
@@ -319,7 +246,6 @@ public class MultiDocumentStoreTest exte
}
}
- @Ignore("OAK-3634")
@Test
public void concurrentUpdate() throws Exception {
String id = Utils.getIdFromPath("/foo");
@@ -422,14 +348,19 @@ public class MultiDocumentStoreTest exte
@Override
public void run() {
String p = Thread.currentThread().getName();
- try {
- for (int i = 0; i < 1000; i++) {
- UpdateOp op = new UpdateOp(id, false);
- op.set(p, counter++);
+ for (int i = 0; i < 1000; i++) {
+ UpdateOp op = new UpdateOp(id, false);
+ op.set(p, counter++);
+ try {
ds.update(Collection.NODES, Collections.singletonList(id), op);
+ } catch (Exception e) {
+ if (ds instanceof RDBDocumentStore
+ && e.getMessage().contains("race?")) {
+ LOG.warn(e.toString());
+ } else {
+ exceptions.add(e);
+ }
}
- } catch (Exception e) {
- exceptions.add(e);
}
}
}