You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/01/28 09:20:05 UTC

svn commit: r1727263 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java

Author: mreutegg
Date: Thu Jan 28 08:20:05 2016
New Revision: 1727263

URL: http://svn.apache.org/viewvc?rev=1727263&view=rev
Log:
OAK-3559: Bulk document updates in MongoDocumentStore

Apply Tomek Rekawek's patch with minor modifications.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1727263&r1=1727262&r2=1727263&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Thu Jan 28 08:20:05 2016
@@ -21,10 +21,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -39,6 +43,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import com.mongodb.MongoClientURI;
@@ -72,8 +77,14 @@ import org.apache.jackrabbit.oak.util.Pe
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Maps;
 import com.mongodb.BasicDBObject;
+import com.mongodb.BulkWriteError;
+import com.mongodb.BulkWriteException;
+import com.mongodb.BulkWriteOperation;
+import com.mongodb.BulkWriteResult;
+import com.mongodb.BulkWriteUpsert;
 import com.mongodb.DB;
 import com.mongodb.DBCollection;
 import com.mongodb.DBCursor;
@@ -88,7 +99,9 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Predicates.not;
 import static com.google.common.base.Predicates.notNull;
 import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Maps.filterKeys;
 import static com.google.common.collect.Maps.filterValues;
+import static com.google.common.collect.Sets.difference;
 
 /**
  * A document store that uses MongoDB as the backend.
@@ -165,6 +178,14 @@ public class MongoDocumentStore implemen
     private long maxLockedQueryTimeMS =
             Long.getLong("oak.mongo.maxLockedQueryTimeMS", TimeUnit.SECONDS.toMillis(3));
 
+    /**
+     * The number of documents to put into one bulk update.
+     * <p>
+     * Default is 30.
+     */
+    private int bulkSize =
+            Integer.getInteger("oak.mongo.bulkSize", 30);
+
     private String lastReadWriteMode;
 
     private final Map<String, String> metadata;
@@ -713,7 +734,7 @@ public class MongoDocumentStore implemen
         DBCollection dbCollection = getDBCollection(collection);
         // make sure we don't modify the original updateOp
         updateOp = updateOp.copy();
-        DBObject update = createUpdate(updateOp);
+        DBObject update = createUpdate(updateOp, false);
 
         Lock lock = null;
         if (collection == Collection.NODES) {
@@ -804,13 +825,221 @@ public class MongoDocumentStore implemen
         return doc;
     }
 
+    /**
+     * Try to apply all the {@link UpdateOp}s with at least MongoDB requests as
+     * possible. The return value is the list of the old documents (before
+     * applying changes). The mechanism is as follows:
+     *
+     * <ol>
+     * <li>For each UpdateOp try to read the assigned document from the cache.
+     *     Add them to {@code oldDocs}.</li>
+     * <li>Prepare a list of all UpdateOps that doesn't have their documents and
+     *     read them in one find() call. Add results to {@code oldDocs}.</li>
+     * <li>Prepare a bulk update. For each remaining UpdateOp add following
+     *     operation:
+     *   <ul>
+     *   <li>Find document with the same id and the same mod_count as in the
+     *       {@code oldDocs}.</li>
+     *   <li>Apply changes from the UpdateOps.</li>
+     *   </ul>
+     * </li>
+     * <li>Execute the bulk update.</li>
+     * </ol>
+     *
+     * If some other process modifies the target documents between points 2 and
+     * 3, the mod_count will be increased as well and the bulk update will fail
+     * for the concurrently modified docs. The method will then remove the
+     * failed documents from the {@code oldDocs} and restart the process from
+     * point 2. It will stop after 3rd iteration.
+     */
+    @SuppressWarnings("unchecked")
+    @CheckForNull
     @Override
-    public <T extends Document> List<T> createOrUpdate(Collection<T> collection, List<UpdateOp> updateOps) {
-        List<T> result = new ArrayList<T>(updateOps.size());
-        for (UpdateOp update : updateOps) {
-            result.add(createOrUpdate(collection, update));
+    public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
+                                                       List<UpdateOp> updateOps) {
+        log("createOrUpdate", updateOps);
+
+        Map<String, UpdateOp> operationsToCover = new LinkedHashMap<String, UpdateOp>();
+        List<UpdateOp> duplicates = new ArrayList<UpdateOp>();
+        Map<UpdateOp, T> results = new LinkedHashMap<UpdateOp, T>();
+
+        final long start = PERFLOG.start();
+        try {
+            for (UpdateOp updateOp : updateOps) {
+                UpdateUtils.assertUnconditional(updateOp);
+                UpdateOp clone = updateOp.copy();
+                if (operationsToCover.containsKey(updateOp.getId())) {
+                    duplicates.add(clone);
+                } else {
+                    operationsToCover.put(updateOp.getId(), clone);
+                }
+                results.put(clone, null);
+            }
+
+            Map<String, T> oldDocs = new HashMap<String, T>();
+            if (collection == Collection.NODES) {
+                oldDocs.putAll((Map<String, T>) getCachedNodes(operationsToCover.keySet()));
+            }
+
+            for (int i = 0; i < 3; i++) {
+                if (operationsToCover.size() <= 2) {
+                    // bulkUpdate() method invokes Mongo twice, so sending 2 updates
+                    // in bulk mode wouldn't result in any performance gain
+                    break;
+                }
+                for (List<UpdateOp> partition : Lists.partition(Lists.newArrayList(operationsToCover.values()), bulkSize)) {
+                    Map<UpdateOp, T> successfulUpdates = bulkUpdate(collection, partition, oldDocs);
+                    results.putAll(successfulUpdates);
+                    operationsToCover.values().removeAll(successfulUpdates.keySet());
+                }
+            }
+
+            // if there are some changes left, we'll apply them one after another
+            Iterator<UpdateOp> it = Iterators.concat(operationsToCover.values().iterator(), duplicates.iterator());
+            while (it.hasNext()) {
+                UpdateOp op = it.next();
+                it.remove();
+                T oldDoc = createOrUpdate(collection, op);
+                if (oldDoc != null) {
+                    results.put(op, oldDoc);
+                }
+            }
+        } finally {
+            PERFLOG.end(start, 1, "createOrUpdate {}", updateOps);
         }
-        return result;
+        List<T> resultList = new ArrayList<T>(results.values());
+        log("createOrUpdate returns", resultList);
+        return resultList;
+    }
+
+    private Map<String, NodeDocument> getCachedNodes(Set<String> keys) {
+        Map<String, NodeDocument> nodes = new HashMap<String, NodeDocument>();
+        for (String key : keys) {
+            NodeDocument cached = nodesCache.getIfPresent(key);
+            if (cached != null && cached != NodeDocument.NULL) {
+                nodes.put(key, cached);
+            }
+        }
+        return nodes;
+    }
+
+    private <T extends Document> Map<UpdateOp, T> bulkUpdate(Collection<T> collection,
+                                                             List<UpdateOp> updateOperations,
+                                                             Map<String, T> oldDocs) {
+        Map<String, UpdateOp> bulkOperations = createMap(updateOperations);
+        Set<String> lackingDocs = difference(bulkOperations.keySet(), oldDocs.keySet());
+        oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+        Lock lock = null;
+        if (collection == Collection.NODES) {
+            lock = nodeLocks.acquire(bulkOperations.keySet());
+        }
+
+        try {
+            BulkUpdateResult bulkResult = sendBulkUpdate(collection, bulkOperations.values(), oldDocs);
+
+            if (collection == Collection.NODES) {
+                for (UpdateOp op : filterKeys(bulkOperations, in(bulkResult.upserts)).values()) {
+                    NodeDocument doc = Collection.NODES.newDocument(this);
+                    UpdateUtils.applyChanges(doc, op);
+                    nodesCache.put(doc);
+                }
+
+                for (String key : difference(bulkOperations.keySet(), bulkResult.failedUpdates)) {
+                    T oldDoc = oldDocs.get(key);
+                    if (oldDoc != null) {
+                        NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, bulkOperations.get(key));
+                        nodesCache.put(newDoc);
+                        oldDoc.seal();
+                    }
+                }
+            }
+            oldDocs.keySet().removeAll(bulkResult.failedUpdates);
+
+            Map<UpdateOp, T> result = new HashMap<UpdateOp, T>();
+            for (Entry<String, UpdateOp> entry : bulkOperations.entrySet()) {
+                if (bulkResult.failedUpdates.contains(entry.getKey())) {
+                    continue;
+                } else if (bulkResult.upserts.contains(entry.getKey())) {
+                    result.put(entry.getValue(), null);
+                } else {
+                    result.put(entry.getValue(), oldDocs.get(entry.getKey()));
+                }
+            }
+            return result;
+        } finally {
+            if (lock != null) {
+                lock.unlock();
+            }
+        }
+    }
+
+    private static Map<String, UpdateOp> createMap(List<UpdateOp> updateOps) {
+        return Maps.uniqueIndex(updateOps, new Function<UpdateOp, String>() {
+            @Override
+            public String apply(UpdateOp input) {
+                return input.getId();
+            }
+        });
+    }
+
+    private <T extends Document> Map<String, T> findDocuments(Collection<T> collection, Set<String> keys) {
+        Map<String, T> docs = new HashMap<String, T>();
+        if (!keys.isEmpty()) {
+            DBObject[] conditions = new DBObject[keys.size()];
+            int i = 0;
+            for (String key : keys) {
+                conditions[i++] = getByKeyQuery(key).get();
+            }
+
+            QueryBuilder builder = new QueryBuilder();
+            builder.or(conditions);
+            DBCursor cursor = getDBCollection(collection).find(builder.get());
+            while (cursor.hasNext()) {
+                T foundDoc = convertFromDBObject(collection, cursor.next());
+                docs.put(foundDoc.getId(), foundDoc);
+            }
+        }
+        return docs;
+    }
+
+    private <T extends Document> BulkUpdateResult sendBulkUpdate(Collection<T> collection,
+            java.util.Collection<UpdateOp> updateOps, Map<String, T> oldDocs) {
+        DBCollection dbCollection = getDBCollection(collection);
+        BulkWriteOperation bulk = dbCollection.initializeUnorderedBulkOperation();
+        String[] bulkIds = new String[updateOps.size()];
+        int i = 0;
+        for (UpdateOp updateOp : updateOps) {
+            String id = updateOp.getId();
+            QueryBuilder query = createQueryForUpdate(id, updateOp.getConditions());
+            T oldDoc = oldDocs.get(id);
+            DBObject update;
+            if (oldDoc == null) {
+                query.not().exists(Document.MOD_COUNT);
+                update = createUpdate(updateOp, true);
+            } else {
+                query.and(Document.MOD_COUNT).is(oldDoc.getModCount());
+                update = createUpdate(updateOp, false);
+            }
+            bulk.find(query.get()).upsert().updateOne(update);
+            bulkIds[i++] = id;
+        }
+
+        BulkWriteResult bulkResult;
+        Set<String> failedUpdates = new HashSet<String>();
+        Set<String> upserts = new HashSet<String>();
+        try {
+            bulkResult = bulk.execute();
+        } catch (BulkWriteException e) {
+            bulkResult = e.getWriteResult();
+            for (BulkWriteError err : e.getWriteErrors()) {
+                failedUpdates.add(bulkIds[err.getIndex()]);
+            }
+        }
+        for (BulkWriteUpsert upsert : bulkResult.getUpserts()) {
+            upserts.add(bulkIds[upsert.getIndex()]);
+        }
+        return new BulkUpdateResult(failedUpdates, upserts);
     }
 
     @Override
@@ -910,7 +1139,7 @@ public class MongoDocumentStore implemen
         QueryBuilder query = QueryBuilder.start(Document.ID).in(keys);
         // make sure we don't modify the original updateOp
         updateOp = updateOp.copy();
-        DBObject update = createUpdate(updateOp);
+        DBObject update = createUpdate(updateOp, false);
         final Stopwatch watch = startWatch();
         try {
             Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
@@ -1185,10 +1414,11 @@ public class MongoDocumentStore implemen
      * Creates a MongoDB update object from the given UpdateOp.
      *
      * @param updateOp the update op.
+     * @param includeId whether to include the SET id operation
      * @return the DBObject.
      */
     @Nonnull
-    private static DBObject createUpdate(UpdateOp updateOp) {
+    private static DBObject createUpdate(UpdateOp updateOp, boolean includeId) {
         BasicDBObject setUpdates = new BasicDBObject();
         BasicDBObject maxUpdates = new BasicDBObject();
         BasicDBObject incUpdates = new BasicDBObject();
@@ -1200,7 +1430,7 @@ public class MongoDocumentStore implemen
         // other updates
         for (Entry<Key, Operation> entry : updateOp.getChanges().entrySet()) {
             Key k = entry.getKey();
-            if (k.getName().equals(Document.ID)) {
+            if (!includeId && k.getName().equals(Document.ID)) {
                 // avoid exception "Mod on _id not allowed"
                 continue;
             }
@@ -1344,6 +1574,18 @@ public class MongoDocumentStore implemen
         return diff;
     }
 
+    private static class BulkUpdateResult {
+
+        private final Set<String> failedUpdates;
+
+        private final Set<String> upserts;
+
+        private BulkUpdateResult(Set<String> failedUpdates, Set<String> upserts) {
+            this.failedUpdates = failedUpdates;
+            this.upserts = upserts;
+        }
+    }
+
     private static class InvalidationResult implements CacheInvalidationStats {
         int invalidationCount;
         int upToDateCount;

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java?rev=1727263&r1=1727262&r2=1727263&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java Thu Jan 28 08:20:05 2016
@@ -16,16 +16,20 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static java.util.Collections.synchronizedList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -210,6 +214,131 @@ public class MultiDocumentStoreTest exte
     }
 
     @Test
+    public void concurrentBatchUpdate() throws Exception {
+        assumeTrue(dsf == DocumentStoreFixture.MONGO);
+        final CountDownLatch ready = new CountDownLatch(2);
+        final CountDownLatch go = new CountDownLatch(1);
+        final List<String> ids = Lists.newArrayList();
+        for (int i = 0; i < 100; i++) {
+            ids.add(Utils.getIdFromPath("/node-" + i));
+        }
+        removeMe.addAll(ids);
+        final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>());
+        final Map<String, NodeDocument> result1 = Maps.newHashMap();
+        Thread t1 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    List<UpdateOp> ops = Lists.newArrayList();
+                    for (String id : ids) {
+                        UpdateOp op = new UpdateOp(id, true);
+                        op.set(Document.ID, id);
+                        op.set("_t1", "value");
+                        ops.add(op);
+                    }
+                    Collections.shuffle(ops);
+                    ready.countDown();
+                    go.await();
+                    List<NodeDocument> docs = ds1.createOrUpdate(Collection.NODES, ops);
+                    for (int i = 0; i < ops.size(); i++) {
+                        UpdateOp op = ops.get(i);
+                        result1.put(op.getId(), docs.get(i));
+                    }
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+        final Map<String, NodeDocument> result2 = Maps.newHashMap();
+        Thread t2 = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    List<UpdateOp> ops = Lists.newArrayList();
+                    for (String id : ids) {
+                        UpdateOp op = new UpdateOp(id, true);
+                        op.set(Document.ID, id);
+                        op.set("_t2", "value");
+                        ops.add(op);
+                    }
+                    Collections.shuffle(ops);
+                    ready.countDown();
+                    go.await();
+                    List<NodeDocument> docs = ds2.createOrUpdate(Collection.NODES, ops);
+                    for (int i = 0; i < ops.size(); i++) {
+                        UpdateOp op = ops.get(i);
+                        result2.put(op.getId(), docs.get(i));
+                    }
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+        t1.start();
+        t2.start();
+        ready.await();
+        go.countDown();
+        t1.join();
+        t2.join();
+        for (Exception e : exceptions) {
+            fail(e.toString());
+        }
+        for (String id : ids) {
+            NodeDocument d1 = result1.get(id);
+            NodeDocument d2 = result2.get(id);
+            if (d1 != null) {
+                assertNull(d2);
+            } else {
+                assertNotNull(d2);
+            }
+        }
+    }
+
+    @Test
+    public void batchUpdateCachedDocument() throws Exception {
+        String id = Utils.getIdFromPath("/foo");
+        removeMe.add(id);
+
+        UpdateOp op = new UpdateOp(id, true);
+        op.set(Document.ID, id);
+        op.set("_ds1", 1);
+        assertNull(ds1.createOrUpdate(Collection.NODES, op));
+
+        // force ds2 to populate the cache with doc
+        assertNotNull(ds2.find(Collection.NODES, id));
+
+        // modify doc via ds1
+        op = new UpdateOp(id, false);
+        op.set("_ds1", 2);
+        assertNotNull(ds1.createOrUpdate(Collection.NODES, op));
+
+        // modify doc via ds2 with batch createOrUpdate
+        op = new UpdateOp(id, false);
+        op.set("_ds2", 1);
+        List<UpdateOp> ops = Lists.newArrayList();
+        ops.add(op);
+        for (int i = 0; i < 10; i++) {
+            // add more ops to make sure a batch
+            // update call is triggered
+            String docId = Utils.getIdFromPath("/node-" + i);
+            UpdateOp update = new UpdateOp(docId, true);
+            update.set(Document.ID, docId);
+            update.set("_ds2", 1);
+            removeMe.add(docId);
+            ops.add(update);
+        }
+        List<NodeDocument> old = ds2.createOrUpdate(Collection.NODES, ops);
+        assertEquals(11, old.size());
+        assertNotNull(old.get(0));
+        assertEquals(2L, old.get(0).get("_ds1"));
+
+        NodeDocument foo = ds2.find(Collection.NODES, id);
+        assertNotNull(foo);
+        assertEquals(2L, foo.get("_ds1"));
+        assertEquals(1L, foo.get("_ds2"));
+    }
+
+    @Test
     public void testChangeVisibility() {
         String id = this.getClass().getName() + ".testChangeVisibility";