You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/10/27 14:38:36 UTC

svn commit: r1766838 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: mreutegg
Date: Thu Oct 27 14:38:36 2016
New Revision: 1766838

URL: http://svn.apache.org/viewvc?rev=1766838&view=rev
Log:
OAK-3018: Use batch-update in backgroundWrite

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java?rev=1766838&r1=1766837&r2=1766838&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java Thu Oct 27 14:38:36 2016
@@ -26,6 +26,7 @@ class BackgroundWriteStats {
     long lock;
     long write;
     long num;
+    long calls;
     long totalWriteTime;
 
     @Override
@@ -35,6 +36,7 @@ class BackgroundWriteStats {
                 ", lock:" + lock +
                 ", write:" + write +
                 ", num:" + num +
+                ", calls:" + calls +
                 ", totalWriteTime:" + totalWriteTime;
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1766838&r1=1766837&r2=1766838&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Thu Oct 27 14:38:36 2016
@@ -16,7 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -27,6 +26,7 @@ import java.util.concurrent.locks.Lock;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.util.MapFactory;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.stats.Clock;
@@ -36,11 +36,12 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.PeekingIterator;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.commons.PathUtils.ROOT_PATH;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 
 /**
@@ -54,7 +55,7 @@ class UnsavedModifications {
     /**
      * The maximum number of document to update at once in a multi update.
      */
-    static final int BACKGROUND_MULTI_UPDATE_LIMIT = 10000;
+    static final int BACKGROUND_MULTI_UPDATE_LIMIT = 100;
 
     private final ConcurrentMap<String, Revision> map = MapFactory.getInstance().create();
 
@@ -166,62 +167,46 @@ class UnsavedModifications {
             lock.unlock();
         }
         stats.num = pending.size();
-        UpdateOp updateOp = null;
-        Revision lastRev = null;
-        PeekingIterator<String> paths = Iterators.peekingIterator(
-                pending.keySet().iterator());
-        int i = 0;
-        ArrayList<String> pathList = new ArrayList<String>();
-        while (paths.hasNext()) {
-            String p = paths.peek();
-            Revision r = pending.get(p);
-
-            int size = pathList.size();
-            if (updateOp == null) {
-                // create UpdateOp
-                Commit commit = new Commit(store, r, null);
-                updateOp = commit.getUpdateOperationForNode(p);
-                NodeDocument.setLastRev(updateOp, r);
-                lastRev = r;
-                pathList.add(p);
-                paths.next();
-                i++;
-            } else if (r.equals(lastRev)) {
-                // use multi update when possible
-                pathList.add(p);
-                paths.next();
-                i++;
+        List<UpdateOp> updates = Lists.newArrayList();
+        Map<String, Revision> pathToRevision = Maps.newHashMap();
+        for (Iterable<Map.Entry<String, Revision>> batch : Iterables.partition(
+                pending.entrySet(), BACKGROUND_MULTI_UPDATE_LIMIT)) {
+            for (Map.Entry<String, Revision> entry : batch) {
+                String p = entry.getKey();
+                Revision r = entry.getValue();
+                if (PathUtils.denotesRoot(entry.getKey())) {
+                    // update root individually at the end
+                    continue;
+                }
+                updates.add(newUpdateOp(store, p, r));
+                pathToRevision.put(p, r);
             }
-            // call update if any of the following is true:
-            // - this is the second-to-last or last path (update last path, the
-            //   root document, individually)
-            // - revision is not equal to last revision (size of ids didn't change)
-            // - the update limit is reached
-            if (i + 2 > pending.size()
-                    || size == pathList.size()
-                    || pathList.size() >= BACKGROUND_MULTI_UPDATE_LIMIT) {
-                List<String> ids = new ArrayList<String>();
-                for (String path : pathList) {
-                    ids.add(Utils.getIdFromPath(path));
-                }
-                store.getDocumentStore().update(NODES, ids, updateOp);
-                LOG.debug("Updated _lastRev to {} on {}", lastRev, ids);
-                for (String path : pathList) {
-                    map.remove(path, lastRev);
-                }
-                pathList.clear();
-                updateOp = null;
-                lastRev = null;
+            if (!updates.isEmpty()) {
+                store.getDocumentStore().createOrUpdate(NODES, updates);
+                stats.calls++;
+                for (Map.Entry<String, Revision> entry : pathToRevision.entrySet()) {
+                    map.remove(entry.getKey(), entry.getValue());
+                    LOG.debug("Updated _lastRev to {} on {}", entry.getValue(), entry.getKey());
+                }
+                // clean up for next batch
+                updates.clear();
+                pathToRevision.clear();
             }
         }
-        Revision writtenRootRev = pending.get("/");
-        if (writtenRootRev != null) {
-            int cid = writtenRootRev.getClusterId();
-            if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid)) != null) {
+        // finally update remaining root document
+        Revision rootRev = pending.get(ROOT_PATH);
+        if (rootRev != null) {
+            store.getDocumentStore().findAndUpdate(NODES, newUpdateOp(store, ROOT_PATH, rootRev));
+            stats.calls++;
+            map.remove(ROOT_PATH, rootRev);
+            LOG.debug("Updated _lastRev to {} on {}", rootRev, ROOT_PATH);
+
+            int cid = rootRev.getClusterId();
+            if (store.getDocumentStore().find(CLUSTER_NODES, String.valueOf(cid)) != null) {
                 UpdateOp update = new UpdateOp(String.valueOf(cid), false);
                 update.equals(Document.ID, null, String.valueOf(cid));
-                update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString());
-                store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+                update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, rootRev.toString());
+                store.getDocumentStore().findAndUpdate(CLUSTER_NODES, update);
             }
         }
 
@@ -234,9 +219,17 @@ class UnsavedModifications {
         return map.toString();
     }
 
+    private static UpdateOp newUpdateOp(DocumentNodeStore store,
+                                        String path, Revision r) {
+        Commit commit = new Commit(store, r, null);
+        UpdateOp updateOp = commit.getUpdateOperationForNode(path);
+        NodeDocument.setLastRev(updateOp, r);
+        return updateOp;
+    }
+
     private Revision getMostRecentRevision() {
         // use revision of root document
-        Revision rev = map.get("/");
+        Revision rev = map.get(ROOT_PATH);
         // otherwise find most recent
         if (rev == null) {
             for (Revision r : map.values()) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java?rev=1766838&r1=1766837&r2=1766838&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteTest.java Thu Oct 27 14:38:36 2016
@@ -19,9 +19,13 @@ package org.apache.jackrabbit.oak.plugin
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.junit.Test;
 
+import static com.google.common.collect.Iterables.all;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.IS_LAST_REV_UPDATE;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -60,11 +64,12 @@ public class BackgroundWriteTest {
     private static final class TestStore extends MemoryDocumentStore {
 
         @Override
-        public <T extends Document> void update(Collection<T> collection,
-                                                List<String> keys,
-                                                UpdateOp updateOp) {
-            assertTrue(keys.size() <= UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT);
-            super.update(collection, keys, updateOp);
+        public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
+                                                           List<UpdateOp> updateOps) {
+            if (all(updateOps, IS_LAST_REV_UPDATE)) {
+                assertTrue(updateOps.size() <= UnsavedModifications.BACKGROUND_MULTI_UPDATE_LIMIT);
+            }
+            return super.createOrUpdate(collection, updateOps);
         }
     }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1766838&r1=1766837&r2=1766838&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Thu Oct 27 14:38:36 2016
@@ -1982,20 +1982,16 @@ public class DocumentNodeStoreTest {
         final AtomicBoolean throttleUpdates = new AtomicBoolean(true);
         MemoryDocumentStore docStore = new MemoryDocumentStore() {
             @Override
-            public <T extends Document> void update(Collection<T> collection,
-                                                    List<String> keys,
-                                                    UpdateOp updateOp) {
-                
-                if ( throttleUpdates.get() ) {
-                    for (String k : keys) {
-                        try {
-                            updates.put(k);
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException(e);
-                        }
+            public <T extends Document> T createOrUpdate(Collection<T> collection,
+                                                         UpdateOp update) {
+                if (throttleUpdates.get() && TestUtils.isLastRevUpdate(update)) {
+                    try {
+                        updates.put(update.getId());
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
                     }
                 }
-                super.update(collection, keys, updateOp);
+                return super.createOrUpdate(collection, update);
             }
         };
         final DocumentNodeStore store = builderProvider.newBuilder()