You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/01/28 09:20:05 UTC
svn commit: r1727263 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
Author: mreutegg
Date: Thu Jan 28 08:20:05 2016
New Revision: 1727263
URL: http://svn.apache.org/viewvc?rev=1727263&view=rev
Log:
OAK-3559: Bulk document updates in MongoDocumentStore
Apply Tomek Rekawek's patch with minor modifications.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1727263&r1=1727262&r2=1727263&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Thu Jan 28 08:20:05 2016
@@ -21,10 +21,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -39,6 +43,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.mongodb.MongoClientURI;
@@ -72,8 +77,14 @@ import org.apache.jackrabbit.oak.util.Pe
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
+import com.mongodb.BulkWriteError;
+import com.mongodb.BulkWriteException;
+import com.mongodb.BulkWriteOperation;
+import com.mongodb.BulkWriteResult;
+import com.mongodb.BulkWriteUpsert;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
@@ -88,7 +99,9 @@ import static com.google.common.base.Pre
import static com.google.common.base.Predicates.not;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.Maps.filterValues;
+import static com.google.common.collect.Sets.difference;
/**
* A document store that uses MongoDB as the backend.
@@ -165,6 +178,14 @@ public class MongoDocumentStore implemen
private long maxLockedQueryTimeMS =
Long.getLong("oak.mongo.maxLockedQueryTimeMS", TimeUnit.SECONDS.toMillis(3));
+ /**
+ * The number of documents to put into one bulk update.
+ * <p>
+ * Default is 30.
+ */
+ private int bulkSize =
+ Integer.getInteger("oak.mongo.bulkSize", 30);
+
private String lastReadWriteMode;
private final Map<String, String> metadata;
@@ -713,7 +734,7 @@ public class MongoDocumentStore implemen
DBCollection dbCollection = getDBCollection(collection);
// make sure we don't modify the original updateOp
updateOp = updateOp.copy();
- DBObject update = createUpdate(updateOp);
+ DBObject update = createUpdate(updateOp, false);
Lock lock = null;
if (collection == Collection.NODES) {
@@ -804,13 +825,221 @@ public class MongoDocumentStore implemen
return doc;
}
+ /**
+ * Try to apply all the {@link UpdateOp}s with at least MongoDB requests as
+ * possible. The return value is the list of the old documents (before
+ * applying changes). The mechanism is as follows:
+ *
+ * <ol>
+ * <li>For each UpdateOp try to read the assigned document from the cache.
+ * Add them to {@code oldDocs}.</li>
+ * <li>Prepare a list of all UpdateOps that doesn't have their documents and
+ * read them in one find() call. Add results to {@code oldDocs}.</li>
+ * <li>Prepare a bulk update. For each remaining UpdateOp add following
+ * operation:
+ * <ul>
+ * <li>Find document with the same id and the same mod_count as in the
+ * {@code oldDocs}.</li>
+ * <li>Apply changes from the UpdateOps.</li>
+ * </ul>
+ * </li>
+ * <li>Execute the bulk update.</li>
+ * </ol>
+ *
+ * If some other process modifies the target documents between points 2 and
+ * 3, the mod_count will be increased as well and the bulk update will fail
+ * for the concurrently modified docs. The method will then remove the
+ * failed documents from the {@code oldDocs} and restart the process from
+ * point 2. It will stop after 3rd iteration.
+ */
+ @SuppressWarnings("unchecked")
+ @CheckForNull
@Override
- public <T extends Document> List<T> createOrUpdate(Collection<T> collection, List<UpdateOp> updateOps) {
- List<T> result = new ArrayList<T>(updateOps.size());
- for (UpdateOp update : updateOps) {
- result.add(createOrUpdate(collection, update));
+ public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
+ List<UpdateOp> updateOps) {
+ log("createOrUpdate", updateOps);
+
+ Map<String, UpdateOp> operationsToCover = new LinkedHashMap<String, UpdateOp>();
+ List<UpdateOp> duplicates = new ArrayList<UpdateOp>();
+ Map<UpdateOp, T> results = new LinkedHashMap<UpdateOp, T>();
+
+ final long start = PERFLOG.start();
+ try {
+ for (UpdateOp updateOp : updateOps) {
+ UpdateUtils.assertUnconditional(updateOp);
+ UpdateOp clone = updateOp.copy();
+ if (operationsToCover.containsKey(updateOp.getId())) {
+ duplicates.add(clone);
+ } else {
+ operationsToCover.put(updateOp.getId(), clone);
+ }
+ results.put(clone, null);
+ }
+
+ Map<String, T> oldDocs = new HashMap<String, T>();
+ if (collection == Collection.NODES) {
+ oldDocs.putAll((Map<String, T>) getCachedNodes(operationsToCover.keySet()));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ if (operationsToCover.size() <= 2) {
+ // bulkUpdate() method invokes Mongo twice, so sending 2 updates
+ // in bulk mode wouldn't result in any performance gain
+ break;
+ }
+ for (List<UpdateOp> partition : Lists.partition(Lists.newArrayList(operationsToCover.values()), bulkSize)) {
+ Map<UpdateOp, T> successfulUpdates = bulkUpdate(collection, partition, oldDocs);
+ results.putAll(successfulUpdates);
+ operationsToCover.values().removeAll(successfulUpdates.keySet());
+ }
+ }
+
+ // if there are some changes left, we'll apply them one after another
+ Iterator<UpdateOp> it = Iterators.concat(operationsToCover.values().iterator(), duplicates.iterator());
+ while (it.hasNext()) {
+ UpdateOp op = it.next();
+ it.remove();
+ T oldDoc = createOrUpdate(collection, op);
+ if (oldDoc != null) {
+ results.put(op, oldDoc);
+ }
+ }
+ } finally {
+ PERFLOG.end(start, 1, "createOrUpdate {}", updateOps);
}
- return result;
+ List<T> resultList = new ArrayList<T>(results.values());
+ log("createOrUpdate returns", resultList);
+ return resultList;
+ }
+
+ private Map<String, NodeDocument> getCachedNodes(Set<String> keys) {
+ Map<String, NodeDocument> nodes = new HashMap<String, NodeDocument>();
+ for (String key : keys) {
+ NodeDocument cached = nodesCache.getIfPresent(key);
+ if (cached != null && cached != NodeDocument.NULL) {
+ nodes.put(key, cached);
+ }
+ }
+ return nodes;
+ }
+
+ private <T extends Document> Map<UpdateOp, T> bulkUpdate(Collection<T> collection,
+ List<UpdateOp> updateOperations,
+ Map<String, T> oldDocs) {
+ Map<String, UpdateOp> bulkOperations = createMap(updateOperations);
+ Set<String> lackingDocs = difference(bulkOperations.keySet(), oldDocs.keySet());
+ oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+ Lock lock = null;
+ if (collection == Collection.NODES) {
+ lock = nodeLocks.acquire(bulkOperations.keySet());
+ }
+
+ try {
+ BulkUpdateResult bulkResult = sendBulkUpdate(collection, bulkOperations.values(), oldDocs);
+
+ if (collection == Collection.NODES) {
+ for (UpdateOp op : filterKeys(bulkOperations, in(bulkResult.upserts)).values()) {
+ NodeDocument doc = Collection.NODES.newDocument(this);
+ UpdateUtils.applyChanges(doc, op);
+ nodesCache.put(doc);
+ }
+
+ for (String key : difference(bulkOperations.keySet(), bulkResult.failedUpdates)) {
+ T oldDoc = oldDocs.get(key);
+ if (oldDoc != null) {
+ NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, bulkOperations.get(key));
+ nodesCache.put(newDoc);
+ oldDoc.seal();
+ }
+ }
+ }
+ oldDocs.keySet().removeAll(bulkResult.failedUpdates);
+
+ Map<UpdateOp, T> result = new HashMap<UpdateOp, T>();
+ for (Entry<String, UpdateOp> entry : bulkOperations.entrySet()) {
+ if (bulkResult.failedUpdates.contains(entry.getKey())) {
+ continue;
+ } else if (bulkResult.upserts.contains(entry.getKey())) {
+ result.put(entry.getValue(), null);
+ } else {
+ result.put(entry.getValue(), oldDocs.get(entry.getKey()));
+ }
+ }
+ return result;
+ } finally {
+ if (lock != null) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static Map<String, UpdateOp> createMap(List<UpdateOp> updateOps) {
+ return Maps.uniqueIndex(updateOps, new Function<UpdateOp, String>() {
+ @Override
+ public String apply(UpdateOp input) {
+ return input.getId();
+ }
+ });
+ }
+
+ private <T extends Document> Map<String, T> findDocuments(Collection<T> collection, Set<String> keys) {
+ Map<String, T> docs = new HashMap<String, T>();
+ if (!keys.isEmpty()) {
+ DBObject[] conditions = new DBObject[keys.size()];
+ int i = 0;
+ for (String key : keys) {
+ conditions[i++] = getByKeyQuery(key).get();
+ }
+
+ QueryBuilder builder = new QueryBuilder();
+ builder.or(conditions);
+ DBCursor cursor = getDBCollection(collection).find(builder.get());
+ while (cursor.hasNext()) {
+ T foundDoc = convertFromDBObject(collection, cursor.next());
+ docs.put(foundDoc.getId(), foundDoc);
+ }
+ }
+ return docs;
+ }
+
+ private <T extends Document> BulkUpdateResult sendBulkUpdate(Collection<T> collection,
+ java.util.Collection<UpdateOp> updateOps, Map<String, T> oldDocs) {
+ DBCollection dbCollection = getDBCollection(collection);
+ BulkWriteOperation bulk = dbCollection.initializeUnorderedBulkOperation();
+ String[] bulkIds = new String[updateOps.size()];
+ int i = 0;
+ for (UpdateOp updateOp : updateOps) {
+ String id = updateOp.getId();
+ QueryBuilder query = createQueryForUpdate(id, updateOp.getConditions());
+ T oldDoc = oldDocs.get(id);
+ DBObject update;
+ if (oldDoc == null) {
+ query.not().exists(Document.MOD_COUNT);
+ update = createUpdate(updateOp, true);
+ } else {
+ query.and(Document.MOD_COUNT).is(oldDoc.getModCount());
+ update = createUpdate(updateOp, false);
+ }
+ bulk.find(query.get()).upsert().updateOne(update);
+ bulkIds[i++] = id;
+ }
+
+ BulkWriteResult bulkResult;
+ Set<String> failedUpdates = new HashSet<String>();
+ Set<String> upserts = new HashSet<String>();
+ try {
+ bulkResult = bulk.execute();
+ } catch (BulkWriteException e) {
+ bulkResult = e.getWriteResult();
+ for (BulkWriteError err : e.getWriteErrors()) {
+ failedUpdates.add(bulkIds[err.getIndex()]);
+ }
+ }
+ for (BulkWriteUpsert upsert : bulkResult.getUpserts()) {
+ upserts.add(bulkIds[upsert.getIndex()]);
+ }
+ return new BulkUpdateResult(failedUpdates, upserts);
}
@Override
@@ -910,7 +1139,7 @@ public class MongoDocumentStore implemen
QueryBuilder query = QueryBuilder.start(Document.ID).in(keys);
// make sure we don't modify the original updateOp
updateOp = updateOp.copy();
- DBObject update = createUpdate(updateOp);
+ DBObject update = createUpdate(updateOp, false);
final Stopwatch watch = startWatch();
try {
Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
@@ -1185,10 +1414,11 @@ public class MongoDocumentStore implemen
* Creates a MongoDB update object from the given UpdateOp.
*
* @param updateOp the update op.
+ * @param includeId whether to include the SET id operation
* @return the DBObject.
*/
@Nonnull
- private static DBObject createUpdate(UpdateOp updateOp) {
+ private static DBObject createUpdate(UpdateOp updateOp, boolean includeId) {
BasicDBObject setUpdates = new BasicDBObject();
BasicDBObject maxUpdates = new BasicDBObject();
BasicDBObject incUpdates = new BasicDBObject();
@@ -1200,7 +1430,7 @@ public class MongoDocumentStore implemen
// other updates
for (Entry<Key, Operation> entry : updateOp.getChanges().entrySet()) {
Key k = entry.getKey();
- if (k.getName().equals(Document.ID)) {
+ if (!includeId && k.getName().equals(Document.ID)) {
// avoid exception "Mod on _id not allowed"
continue;
}
@@ -1344,6 +1574,18 @@ public class MongoDocumentStore implemen
return diff;
}
+ private static class BulkUpdateResult {
+
+ private final Set<String> failedUpdates;
+
+ private final Set<String> upserts;
+
+ private BulkUpdateResult(Set<String> failedUpdates, Set<String> upserts) {
+ this.failedUpdates = failedUpdates;
+ this.upserts = upserts;
+ }
+ }
+
private static class InvalidationResult implements CacheInvalidationStats {
int invalidationCount;
int upToDateCount;
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java?rev=1727263&r1=1727262&r2=1727263&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java Thu Jan 28 08:20:05 2016
@@ -16,16 +16,20 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import static java.util.Collections.synchronizedList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -210,6 +214,131 @@ public class MultiDocumentStoreTest exte
}
@Test
+ public void concurrentBatchUpdate() throws Exception {
+ assumeTrue(dsf == DocumentStoreFixture.MONGO);
+ final CountDownLatch ready = new CountDownLatch(2);
+ final CountDownLatch go = new CountDownLatch(1);
+ final List<String> ids = Lists.newArrayList();
+ for (int i = 0; i < 100; i++) {
+ ids.add(Utils.getIdFromPath("/node-" + i));
+ }
+ removeMe.addAll(ids);
+ final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>());
+ final Map<String, NodeDocument> result1 = Maps.newHashMap();
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ List<UpdateOp> ops = Lists.newArrayList();
+ for (String id : ids) {
+ UpdateOp op = new UpdateOp(id, true);
+ op.set(Document.ID, id);
+ op.set("_t1", "value");
+ ops.add(op);
+ }
+ Collections.shuffle(ops);
+ ready.countDown();
+ go.await();
+ List<NodeDocument> docs = ds1.createOrUpdate(Collection.NODES, ops);
+ for (int i = 0; i < ops.size(); i++) {
+ UpdateOp op = ops.get(i);
+ result1.put(op.getId(), docs.get(i));
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ final Map<String, NodeDocument> result2 = Maps.newHashMap();
+ Thread t2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ List<UpdateOp> ops = Lists.newArrayList();
+ for (String id : ids) {
+ UpdateOp op = new UpdateOp(id, true);
+ op.set(Document.ID, id);
+ op.set("_t2", "value");
+ ops.add(op);
+ }
+ Collections.shuffle(ops);
+ ready.countDown();
+ go.await();
+ List<NodeDocument> docs = ds2.createOrUpdate(Collection.NODES, ops);
+ for (int i = 0; i < ops.size(); i++) {
+ UpdateOp op = ops.get(i);
+ result2.put(op.getId(), docs.get(i));
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ t1.start();
+ t2.start();
+ ready.await();
+ go.countDown();
+ t1.join();
+ t2.join();
+ for (Exception e : exceptions) {
+ fail(e.toString());
+ }
+ for (String id : ids) {
+ NodeDocument d1 = result1.get(id);
+ NodeDocument d2 = result2.get(id);
+ if (d1 != null) {
+ assertNull(d2);
+ } else {
+ assertNotNull(d2);
+ }
+ }
+ }
+
+ @Test
+ public void batchUpdateCachedDocument() throws Exception {
+ String id = Utils.getIdFromPath("/foo");
+ removeMe.add(id);
+
+ UpdateOp op = new UpdateOp(id, true);
+ op.set(Document.ID, id);
+ op.set("_ds1", 1);
+ assertNull(ds1.createOrUpdate(Collection.NODES, op));
+
+ // force ds2 to populate the cache with doc
+ assertNotNull(ds2.find(Collection.NODES, id));
+
+ // modify doc via ds1
+ op = new UpdateOp(id, false);
+ op.set("_ds1", 2);
+ assertNotNull(ds1.createOrUpdate(Collection.NODES, op));
+
+ // modify doc via ds2 with batch createOrUpdate
+ op = new UpdateOp(id, false);
+ op.set("_ds2", 1);
+ List<UpdateOp> ops = Lists.newArrayList();
+ ops.add(op);
+ for (int i = 0; i < 10; i++) {
+ // add more ops to make sure a batch
+ // update call is triggered
+ String docId = Utils.getIdFromPath("/node-" + i);
+ UpdateOp update = new UpdateOp(docId, true);
+ update.set(Document.ID, docId);
+ update.set("_ds2", 1);
+ removeMe.add(docId);
+ ops.add(update);
+ }
+ List<NodeDocument> old = ds2.createOrUpdate(Collection.NODES, ops);
+ assertEquals(11, old.size());
+ assertNotNull(old.get(0));
+ assertEquals(2L, old.get(0).get("_ds1"));
+
+ NodeDocument foo = ds2.find(Collection.NODES, id);
+ assertNotNull(foo);
+ assertEquals(2L, foo.get("_ds1"));
+ assertEquals(1L, foo.get("_ds2"));
+ }
+
+ @Test
public void testChangeVisibility() {
String id = this.getClass().getName() + ".testChangeVisibility";