You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2020/08/04 11:07:07 UTC

svn commit: r1880565 - in /jackrabbit/oak/trunk/oak-store-document/src: main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java

Author: stefanegli
Date: Tue Aug  4 11:07:07 2020
New Revision: 1880565

URL: http://svn.apache.org/viewvc?rev=1880565&view=rev
Log:
OAK-9149 : reverting svn rev 1880432

Modified:
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1880565&r1=1880564&r2=1880565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Aug  4 11:07:07 2020
@@ -48,7 +48,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -127,7 +126,6 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -324,8 +322,6 @@ public final class DocumentNodeStore
      */
     private Thread backgroundUpdateThread;
 
-    BackgroundUpdateOperation backgroundUpdateOperation;
-
     /**
      * Monitor object to synchronize background writes.
      */
@@ -691,7 +687,7 @@ public final class DocumentNodeStore
                 "DocumentNodeStore background read thread " + threadNamePostfix);
         backgroundReadThread.setDaemon(true);
         backgroundUpdateThread = new Thread(
-                backgroundUpdateOperation = new BackgroundUpdateOperation(this, isDisposed),
+                new BackgroundUpdateOperation(this, isDisposed),
                 "DocumentNodeStore background update thread " + threadNamePostfix);
         backgroundUpdateThread.setDaemon(true);
         backgroundSweepThread = new Thread(
@@ -2382,10 +2378,7 @@ public final class DocumentNodeStore
 
     private void backgroundSplit() {
         Set<Path> invalidatedPaths = new HashSet<>();
-        Set<Path> pathsToinvalidate = new HashSet<>();
         RevisionVector head = getHeadRevision();
-        List<UpdateOp> splitOps = new LinkedList<>();
-        List<String> removeCandiates = new LinkedList<>();
         for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext();) {
             String id = it.next();
             NodeDocument doc = store.find(Collection.NODES, id);
@@ -2399,65 +2392,43 @@ public final class DocumentNodeStore
                 // already has a pending _lastRev update or an invalidation
                 // entry was already added in this backgroundSplit() call
                 if (unsavedLastRevisions.get(path) == null
-                        && !invalidatedPaths.contains(path)) {
-                    pathsToinvalidate.add(path);
+                        && invalidatedPaths.add(path)) {
+                    // create journal entry for cache invalidation
+                    JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+                    entry.modified(path);
+                    Revision r = newRevision().asBranchRevision();
+                    UpdateOp journalOp = entry.asUpdateOp(r);
+                    if (store.create(JOURNAL, singletonList(journalOp))) {
+                        changes.invalidate(singletonList(r));
+                        LOG.debug("Journal entry {} created for split of document {}",
+                                journalOp.getId(), path);
+                    } else {
+                        String msg = "Unable to create journal entry " +
+                                journalOp.getId() + " for document invalidation. " +
+                                "Will be retried with next background split " +
+                                "operation.";
+                        throw new DocumentStoreException(msg);
+                    }
+                }
+                // apply the split operations
+                NodeDocument before = null;
+                if (!op.isNew() ||
+                        !store.create(Collection.NODES, Collections.singletonList(op))) {
+                    before = store.createOrUpdate(Collection.NODES, op);
                 }
-                splitOps.add(op);
-            }
-            removeCandiates.add(id);
-            if (splitOps.size() >= getCreateOrUpdateBatchSize()) {
-                batchSplit(splitOps, pathsToinvalidate);
-                splitOps.clear();
-                invalidatedPaths.addAll(pathsToinvalidate);
-                pathsToinvalidate.clear();
-                splitCandidates.keySet().removeAll(removeCandiates);
-                removeCandiates.clear();
-            }
-        }
-
-        if (splitOps.size() > 0) {
-            batchSplit(splitOps, pathsToinvalidate);
-            splitCandidates.keySet().removeAll(removeCandiates);
-        }
-    }
-
-    private void batchSplit(List<UpdateOp> splitOps, Set<Path> pathsToinvalidate) {
-        if (!pathsToinvalidate.isEmpty()) {
-            // create journal entry for cache invalidation
-            JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
-            entry.modified(pathsToinvalidate);
-            Revision r = newRevision().asBranchRevision();
-            UpdateOp journalOp = entry.asUpdateOp(r);
-            if (store.create(JOURNAL, singletonList(journalOp))) {
-                changes.invalidate(singletonList(r));
-                LOG.debug("Journal entry {} created for split of document(s) {}",
-                        journalOp.getId(), pathsToinvalidate);
-            } else {
-                String msg = "Unable to create journal entry " +
-                        journalOp.getId() + " for document invalidation. " +
-                        "Will be retried with next background split " +
-                        "operation.";
-                throw new DocumentStoreException(msg);
-            }
-        }
-        // apply the split operations
-        List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, splitOps);
-        if (LOG.isDebugEnabled()) {
-            // this is rather expensive - but given we were doing log.debug before
-            // the batchSplit mechanism, so this somewhat negates the batch improvement indeed
-            for (int i = 0; i < splitOps.size(); i++) {
-                UpdateOp op = splitOps.get(i);
-                NodeDocument before = beforeList.size() > i ? beforeList.get(i) : null;
                 if (before != null) {
-                    NodeDocument after = store.find(Collection.NODES, op.getId());
-                    if (after != null) {
-                        LOG.debug("Split operation on {}. Size before: {}, after: {}",
-                                op.getId(), before.getMemory(), after.getMemory());
+                    if (LOG.isDebugEnabled()) {
+                        NodeDocument after = store.find(Collection.NODES, op.getId());
+                        if (after != null) {
+                            LOG.debug("Split operation on {}. Size before: {}, after: {}",
+                                    id, before.getMemory(), after.getMemory());
+                        }
                     }
                 } else {
                     LOG.debug("Split operation created {}", op.getId());
                 }
             }
+            it.remove();
         }
     }
 
@@ -3189,23 +3160,6 @@ public final class DocumentNodeStore
         }
     }
 
-    /**
-     * FOR TESTING ONLY :
-     * stops the backgroundUpdateThread (by overwriting its
-     * isDisposed flag) and optionally waits for the thread to
-     * terminate.
-     * @param timeoutMillis optional amount of millis to wait for the thread to terminate at max
-     * @return true if thread is no longer running
-     */
-    @TestOnly
-    boolean stopBackgroundUpdateThread(long timeoutMillis) throws InterruptedException {
-        backgroundUpdateOperation.forceStop();
-        if (timeoutMillis > 0) {
-            backgroundUpdateThread.join(timeoutMillis);
-        }
-        return !backgroundUpdateThread.isAlive();
-    }
-
     public DocumentNodeStoreMBean getMBean() {
         return mbean;
     }
@@ -3222,7 +3176,7 @@ public final class DocumentNodeStore
 
     private static abstract class NodeStoreTask implements Runnable {
         final WeakReference<DocumentNodeStore> ref;
-        private AtomicBoolean isDisposed;
+        private final AtomicBoolean isDisposed;
         private final Supplier<Integer> delaySupplier;
         private boolean failing;
 
@@ -3248,10 +3202,6 @@ public final class DocumentNodeStore
             this(nodeStore, isDisposed, null);
         }
 
-        void forceStop() {
-            isDisposed = new AtomicBoolean(true);
-        }
-
         protected abstract void execute(@NotNull DocumentNodeStore nodeStore);
 
         @Override

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java?rev=1880565&r1=1880564&r2=1880565&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java Tue Aug  4 11:07:07 2020
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
@@ -36,7 +35,6 @@ import org.apache.jackrabbit.oak.api.Pro
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
-import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
@@ -47,11 +45,7 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.jetbrains.annotations.NotNull;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -87,118 +81,6 @@ import static org.junit.Assert.fail;
  */
 public class DocumentSplitTest extends BaseDocumentMKTest {
 
-    private static final Logger LOG = LoggerFactory.getLogger(DocumentSplitTest.class);
-
-    private String createOrUpdateBatchSize;
-    private boolean createOrUpdateBatchSizeIsNull;
-
-    @Before
-    public void backupProperty() {
-        createOrUpdateBatchSize = System.getProperty("oak.documentMK.createOrUpdateBatchSize");
-        if (createOrUpdateBatchSize == null) {
-            createOrUpdateBatchSizeIsNull = true;
-        }
-    }
-
-    @After
-    public void restoreProperty() {
-        if (createOrUpdateBatchSize != null) {
-            System.setProperty("oak.documentMK.createOrUpdateBatchSize", createOrUpdateBatchSize);
-        } else if (createOrUpdateBatchSizeIsNull) {
-            System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
-        }
-    }
-
-    @Test
-    public void largeBatchSplitTest() throws Exception {
-        for(int i=1; i<21; i+=5) {
-            batchSplitTest(1000, i * 1000);
-        }
-    }
-
-    @Test
-    public void mediumBatchSplitTest() throws Exception {
-        batchSplitTest(50, 1000);
-    }
-
-    @Test
-    public void smallBatchSplitTest() throws Exception {
-        batchSplitTest(1, 1000);
-    }
-
-    private void batchSplitTest(int batchSize, int splitDocCnt) throws Exception {
-        LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt = " + splitDocCnt);
-        // this tests wants to use CountingDocumentStore - hence creating a fresh DocumentMk
-        // plus it wants to set the batchSize
-        if (mk != null) {
-            mk.dispose();
-            mk = null;
-        }
-
-        System.setProperty("oak.documentMK.createOrUpdateBatchSize", String.valueOf(batchSize));
-        DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
-        MemoryDocumentStore delegateStore = new MemoryDocumentStore();
-        CountingDocumentStore store = new CountingDocumentStore(delegateStore);
-        mkBuilder.setDocumentStore(store);
-        mk = mkBuilder.open();
-        DocumentNodeStore ns = mk.getNodeStore();
-        assertTrue(ns.stopBackgroundUpdateThread(10000));
-        assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
-
-        NodeBuilder builder = ns.getRoot().builder();
-        for(int child = 0; child < 100; child++) {
-            builder.child("testchild-" + child);
-        }
-        ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
-        for(int i=0; i<2; i++) {
-            builder = ns.getRoot().builder();
-            for(int child = 0; child < splitDocCnt; child++) {
-                PropertyState binary = binaryProperty("prop", randomBytes(5 * 1024));
-                builder.child("testchild-" + child).setProperty(binary);
-            }
-            ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-        }
-        store.resetCounters();
-        ns.runBackgroundUpdateOperations();
-        int createOrUpdates = store.getNumCreateOrUpdateCalls(NODES);
-        int minBatchSplitCalls = Math.max(1, splitDocCnt / Math.max(1, batchSize / 2));
-        int maxBatchSplitCalls = minBatchSplitCalls + Math.max(1, splitDocCnt % batchSize);
-        // backgroundWrite could issue another 2 writes, so:
-        maxBatchSplitCalls += 2;
-        assertTrue("batchSize = " + batchSize
-                + ", splitDocCnt = " + splitDocCnt
-                + ", minBatchSplitCalls=" + minBatchSplitCalls
-                + ", createOrUpdates=" + createOrUpdates,
-                minBatchSplitCalls <= createOrUpdates);
-        assertTrue("batchSize = " + batchSize
-                + ", splitDocCnt = " + splitDocCnt
-                + ", minBatchSplitCalls=" + minBatchSplitCalls
-                + ", maxBatchSplitCalls=" + maxBatchSplitCalls
-                + ", createOrUpdates="+createOrUpdates,
-                maxBatchSplitCalls >= createOrUpdates);
-        VersionGarbageCollector gc = ns.getVersionGarbageCollector();
-
-        int actualSplitDocGCCount = 0;
-        long timeout = ns.getClock().getTime() + 10000;
-        while(actualSplitDocGCCount != splitDocCnt && ns.getClock().getTime() < timeout) {
-            VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
-            actualSplitDocGCCount += stats.splitDocGCCount;
-            if (actualSplitDocGCCount != splitDocCnt) {
-                LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual " + actualSplitDocGCCount);
-                // advance time a bit to ensure gc does clean up the split docs
-                ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
-                ns.runBackgroundUpdateOperations();
-            }
-        }
-
-        // make sure those splitDocCnt split docs are deleted
-        assertEquals("gc not as expected: expected " + splitDocCnt
-                + ", got " + actualSplitDocGCCount, splitDocCnt, actualSplitDocGCCount);
-        mk.dispose();
-        mk = null;
-    }
-
     @Test
     public void splitRevisions() throws Exception {
         DocumentStore store = mk.getDocumentStore();