You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2022/09/15 09:22:20 UTC

[jackrabbit-oak] branch 1.22 updated: OAK-9535: Support recovery of large branch merge

This is an automated email from the ASF dual-hosted git repository.

mreutegg pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/1.22 by this push:
     new eb9014c9fb OAK-9535: Support recovery of large branch merge
     new dfcc6f80be Merge pull request #701 from rishabhdaim/OAK-9933
eb9014c9fb is described below

commit eb9014c9fbe27bfa2f25d9852b63d0b67f23b070
Author: Marcel Reutegger <ma...@gmail.com>
AuthorDate: Mon Aug 30 16:42:21 2021 +0200

    OAK-9535: Support recovery of large branch merge
    
    Squashed commits by Stefan Egli and Marcel Reutegger
---
 .../oak/plugins/document/LastRevRecoveryAgent.java |  49 ++++-
 .../plugins/document/LargeMergeRecoveryTest.java   | 238 +++++++++++++++++++++
 .../oak/plugins/document/RecoveryTest.java         |  83 ++++++-
 3 files changed, 363 insertions(+), 7 deletions(-)

diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
index 1f42e76067..e1f920f5e8 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
@@ -78,6 +78,14 @@ public class LastRevRecoveryAgent {
 
     private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
 
+    // OAK-9535 : create (flush) a pseudo branch commit journal entry as soon as
+    // we see the (approximate) updateOp size of the recovery journal entry grow above 1 MB
+    // (1 MB being well within the 16 MB limit to account for 'approximate' nature of getting the size)
+    private static final int PSEUDO_BRANCH_COMMIT_UPDATE_OP_THRESHOLD_BYTES = 1 * 1024 * 1024;
+
+    // OAK-9535 : recalculate the journal entry size every 4096 elements
+    private static final int PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT = 4096;
+
     public LastRevRecoveryAgent(DocumentStore store,
                                 RevisionContext revisionContext,
                                 MissingLastRevSeeker seeker,
@@ -300,7 +308,7 @@ public class LastRevRecoveryAgent {
 
         //Map of known last rev of checked paths
         Map<Path, Revision> knownLastRevOrModification = MapFactory.getInstance().create();
-        final JournalEntry changes = JOURNAL.newDocument(store);
+        JournalEntry changes = JOURNAL.newDocument(store);
 
         Clock clock = revisionContext.getClock();
 
@@ -309,6 +317,8 @@ public class LastRevRecoveryAgent {
         long startOfScan = clock.getTime();
         long lastLog = startOfScan;
 
+        final List<Revision> pseudoBcRevs = new ArrayList<>();
+        int nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
         for (NodeDocument doc : suspects) {
             totalCount++;
             lastCount++;
@@ -364,7 +374,41 @@ public class LastRevRecoveryAgent {
                     unsavedParents.put(path, lastRevForParents);
                 }
             }
+            // avoid recalculating the size of the updateOp upon every single path
+            // but also avoid doing it only after we hit the 16MB limit
+            if (changes.getNumChangedNodes() >= nextFlushCheckCount) {
+                final Revision pseudoBcRev = Revision.newRevision(clusterId).asBranchRevision();
+                final UpdateOp pseudoBcUpdateOp = changes.asUpdateOp(pseudoBcRev);
+                final int approxPseudoBcUpdateOpSize = pseudoBcUpdateOp.toString().length();
+                if (approxPseudoBcUpdateOpSize >= PSEUDO_BRANCH_COMMIT_UPDATE_OP_THRESHOLD_BYTES) {
+                    // flush the (pseudo) journal entry
+                    // regarding 'pseudo' : this journal entry, while being a branch commit,
+                    // does not correspond to an actual branch commit that happened before the crash.
+                    // we might be able to in theory reconstruct the very original branch commits,
+                    // but that's a tedious job, and we were not doing that prior to OAK-9535 neither.
+                    // hence the optimization built-in here is that we create a journal entry
+                    // of type 'branch commit', but with a revision that is different from
+                    // what originally happened. Thx to the fact that the JournalEntry just
+                    // contains a list of branch commit journal ids, that should work fine.
+                    if (store.create(JOURNAL, singletonList(pseudoBcUpdateOp))) {
+                        log.info("recover : created intermediate pseudo-bc journal entry with rev {} and approx size {} bytes.",
+                                pseudoBcRev, approxPseudoBcUpdateOpSize);
+                        pseudoBcRevs.add(pseudoBcRev);
+                        changes = JOURNAL.newDocument(store);
+                        nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
+                    } else {
+                        log.warn("recover : could not create intermediate pseudo-bc journal entry with rev {}",
+                                pseudoBcRev);
+                        // retry a little later then, hence reduce the next counter by half an interval
+                        nextFlushCheckCount += changes.getNumChangedNodes() + (PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT / 2);
+                    }
+                } else {
+                    nextFlushCheckCount = changes.getNumChangedNodes() + PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
+                }
+            }
         }
+        // propagate the pseudoBcRevs to the changes
+        changes.branchCommit(pseudoBcRevs);
 
         for (Path parentPath : unsavedParents.getPaths()) {
             Revision calcLastRev = unsavedParents.get(parentPath);
@@ -432,6 +476,7 @@ public class LastRevRecoveryAgent {
             // thus it doesn't matter, where exactly the check is done
             // as to whether the recovered lastRev has already been
             // written to the journal.
+            final JournalEntry finalChanges = changes;
             unsaved.persist(store, new Supplier<Revision>() {
                 @Override
                 public Revision get() {
@@ -465,7 +510,7 @@ public class LastRevRecoveryAgent {
                     }
 
                     // otherwise store a new journal entry now
-                    if (store.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)))) {
+                    if (store.create(JOURNAL, singletonList(finalChanges.asUpdateOp(lastRootRev)))) {
                         log.info("Recovery created journal entry {}", id);
                     } else {
                         log.warn("Unable to create journal entry {} (already exists).", id);
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LargeMergeRecoveryTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LargeMergeRecoveryTest.java
new file mode 100644
index 0000000000..5f6611e0e8
--- /dev/null
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LargeMergeRecoveryTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.disposeQuietly;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class LargeMergeRecoveryTest extends AbstractTwoNodeTest {
+
+    public LargeMergeRecoveryTest(DocumentStoreFixture fixture) {
+        super(fixture);
+    }
+
+    private static NodeDocument getDocument(DocumentNodeStore nodeStore,
+                                            String path) {
+        return nodeStore.getDocumentStore().find(NODES, getIdFromPath(path));
+    }
+
+    @Parameterized.Parameters(name = "{0}")
+    public static java.util.Collection<Object[]> fixtures() throws IOException {
+        List<Object[]> fixtures = Lists.newArrayList();
+        // disabling MemoryFixture, as that runs into an OutOfMemoryError
+//        fixtures.add(new Object[] {new DocumentStoreFixture.MemoryFixture()});
+
+        DocumentStoreFixture rdb = new DocumentStoreFixture.RDBFixture("RDB-H2(file)", "jdbc:h2:file:./target/ds-test", "sa", "");
+        if (rdb.isAvailable()) {
+            fixtures.add(new Object[] { rdb });
+        }
+
+        DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+        if (mongo.isAvailable()) {
+            fixtures.add(new Object[] { mongo });
+        }
+        return fixtures;
+    }
+
+    /**
+     * Does 1 large, and 2 minor normal commits, that all require recovery
+     * Reproduces OAK-9535
+     */
+    @Test
+    @Ignore(value = "slow test, we have one that reproduces OAK-9535 enabled, so this one is not ran by default")
+    public void testMixedLargeBranchMergeRecovery() throws Exception {
+        doTestMixedLargeBranchMergeRecovery(DocumentNodeStoreBuilder.DEFAULT_UPDATE_LIMIT);
+    }
+
+    /**
+     * Does 1 large, and 2 minor normal commits, that all require recovery
+     * Depending on actual update.limit it might or might not reproduce OAK-9535
+     * (update limit must be 100'000 in order to reproduce the bug - but that
+     * results in a long test duration)
+     */
+    @Test
+    public void testMixedSmallBranchMergeRecovery() throws Exception {
+        doTestMixedLargeBranchMergeRecovery(DocumentNodeStoreBuilder.UPDATE_LIMIT);
+    }
+
+    void doTestMixedLargeBranchMergeRecovery(int updateLimit) throws Exception {
+        // 1. Create base structure /x/y
+        NodeBuilder b1 = ds1.getRoot().builder();
+        b1.child("x").child("y");
+        ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        ds1.runBackgroundOperations();
+        ds2.runBackgroundOperations();
+
+        // 2. create a branch in C2
+        final String childPrefix = "childWithMediumLengthJavaContentRepositoryNodeNameInUTFdashEight-";
+        NodeBuilder b2 = ds2.getRoot().builder();
+        NodeBuilder test = b2.child("x").child("y");
+        System.out.println(
+                "Creating large branch merge, can take over a minute... (limit = " + updateLimit + ")");
+
+        for (int i = 0; i < updateLimit * 3; i++) {
+            test.child(childPrefix + i);
+        }
+
+        assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+        // before merging, make another merge on C2 - non-conflicting
+        NodeBuilder b22 = ds2.getRoot().builder();
+        b22.child("a").child("b1");
+        ds2.merge(b22, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        // aand another one:
+        b22 = ds2.getRoot().builder();
+        b22.child("a").child("b2");
+        ds2.merge(b22, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+        ds1.runBackgroundOperations();
+        assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+        ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        long leaseTime = ds1.getClusterInfo().getLeaseTime();
+        ds1.runBackgroundOperations();
+
+        clock.waitUntil(clock.getTime() + leaseTime + 10);
+
+        // Renew the lease for C1
+        ds1.getClusterInfo().renewLease();
+
+        assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
+
+        Iterable<Integer> cids = ds1.getLastRevRecoveryAgent()
+                .getRecoveryCandidateNodes();
+        assertEquals(1, Iterables.size(cids));
+        assertEquals(c2Id, Iterables.get(cids, 0).intValue());
+
+        assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+        assertFalse(ds1.getRoot().getChildNode("a").hasChildNode("b1"));
+        assertFalse(ds1.getRoot().getChildNode("a").hasChildNode("b2"));
+
+        System.out.println("RECOVER...");
+        ds1.getLastRevRecoveryAgent().recover(Iterables.get(cids, 0));
+        System.out.println("RECOVER DONE");
+
+        ds1.runBackgroundOperations();
+
+        assertTrue(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+        assertTrue(ds1.getRoot().getChildNode("a").hasChildNode("b1"));
+        assertTrue(ds1.getRoot().getChildNode("a").hasChildNode("b2"));
+
+        // dispose ds2 quietly because it may now throw an exception
+        disposeQuietly(ds2);
+    }
+
+    /**
+     * Does 1 large branch commit that requires recovery
+     * Reproduces OAK-9535
+     */
+    @Test
+    @Ignore(value = "ignoring for now as it causes OutOfMemoryError on travis")
+    public void testOneLargeBranchMergeRecovery() throws Exception {
+        doTestOneLargeBranchMergeRecovery(DocumentNodeStoreBuilder.DEFAULT_UPDATE_LIMIT);
+    }
+
+    /**
+     * Does 1 large branch commit that requires recovery
+     * Depending on actual update.limit it might or might not reproduce OAK-9535
+     * (with default likely set to 100 for tests, this is a quick version to test just the logic)
+     */
+    @Test
+    public void testOneSmallBranchMergeRecovery() throws Exception {
+        doTestOneLargeBranchMergeRecovery(DocumentNodeStoreBuilder.UPDATE_LIMIT);
+    }
+
+    void doTestOneLargeBranchMergeRecovery(int updateLimit) throws Exception {
+        // 1. Create base structure /x/y
+        NodeBuilder b1 = ds1.getRoot().builder();
+        b1.child("x").child("y");
+        ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        ds1.runBackgroundOperations();
+        ds2.runBackgroundOperations();
+
+        final String childPrefix = "childWithMediumLengthJavaContentRepositoryNodeNameInUTFdashEight-";
+
+        // 2. create a branch in C2
+        NodeBuilder b2 = ds2.getRoot().builder();
+        NodeBuilder test = b2.child("x").child("y");
+        System.out.println("Creating large branch merge, can take over a minute... (limit = " + updateLimit + ")");
+        for (int i = 0; i < updateLimit * 3; i++) {
+            test.child(childPrefix + i);
+        }
+        System.out.println("Done. Merging...");
+
+        ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        System.out.println("Merged.");
+
+        Revision zlastRev2 = ds2.getHeadRevision().getRevision(ds2.getClusterId());
+
+        long leaseTime = ds1.getClusterInfo().getLeaseTime();
+        ds1.runBackgroundOperations();
+
+        clock.waitUntil(clock.getTime() + leaseTime + 10);
+
+        // Renew the lease for C1
+        ds1.getClusterInfo().renewLease();
+
+        assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
+
+        Iterable<Integer> cids = ds1.getLastRevRecoveryAgent()
+                .getRecoveryCandidateNodes();
+        assertEquals(1, Iterables.size(cids));
+        assertEquals(c2Id, Iterables.get(cids, 0).intValue());
+
+        assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+        System.out.println("RECOVER...");
+        ds1.getLastRevRecoveryAgent().recover(Iterables.get(cids, 0));
+        System.out.println("RECOVER DONE");
+
+        assertEquals(zlastRev2, getDocument(ds1, "/x/y").getLastRev().get(c2Id));
+        assertEquals(zlastRev2, getDocument(ds1, "/x").getLastRev().get(c2Id));
+        assertEquals(zlastRev2, getDocument(ds1, "/").getLastRev().get(c2Id));
+
+        ds1.runBackgroundOperations();
+        assertTrue(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+        // dispose ds2 quietly because it may now throw an exception
+        disposeQuietly(ds2);
+    }
+}
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java
index a32687d94d..09b3348fb8 100644
--- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture.MemoryFixture;
 import org.apache.jackrabbit.oak.plugins.migration.NodeStateTestUtils;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -28,30 +30,39 @@ import org.junit.Test;
 
 import static org.apache.jackrabbit.oak.plugins.document.TestUtils.disposeQuietly;
 import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.persistToBranch;
 import static org.apache.jackrabbit.oak.plugins.migration.NodeStateTestUtils.assertExists;
 import static org.apache.jackrabbit.oak.plugins.migration.NodeStateTestUtils.assertMissing;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 public class RecoveryTest extends AbstractTwoNodeTest {
 
     private FailingDocumentStore fds1;
 
+    private CountingDocumentStore cds2;
+
     public RecoveryTest(DocumentStoreFixture fixture) {
         super(fixture);
     }
 
     @Override
     protected DocumentStore customize(DocumentStore store) {
-        // wrap the first store with a FailingDocumentStore
-        FailingDocumentStore fds = new FailingDocumentStore(store);
+        DocumentStore ds;
         if (fds1 == null) {
-            fds1 = fds;
+            // wrap the first store with a FailingDocumentStore
+            fds1 = new FailingDocumentStore(store);
+            ds = fds1;
+        } else {
+            cds2 = new CountingDocumentStore(store);
+            ds = cds2;
         }
-        return fds;
+        return ds;
     }
 
     @Test
@@ -157,6 +168,68 @@ public class RecoveryTest extends AbstractTwoNodeTest {
         assertThat(diff.deleted, containsInAnyOrder("/parent/test/c1"));
     }
 
+    @Test
+    public void recoverLargeBranch() throws Exception {
+        // only run this on memory fixture, others take too long
+        assumeTrue(fixture instanceof MemoryFixture);
+
+        String nodePrefix = "long-node-name-with-many-characters-to-increase-the-size-of-the-journal";
+        NodeBuilder builder = ds1.getRoot().builder();
+        int numNodes = 0;
+        for (int i = 0; i < 100; i++) {
+            NodeBuilder child = builder.child(nodePrefix + i);
+            numNodes++;
+            for (int j = 0; j < 350; j++) {
+                child.child(nodePrefix + j);
+                if (numNodes++ % 100 == 0) {
+                    // create branch commit every 100 nodes
+                    persistToBranch(builder);
+                }
+            }
+        }
+        merge(ds1, builder);
+
+        // simulate crash of ds1
+        fds1.fail().after(1).eternally();
+        disposeQuietly(ds1);
+
+        // nodes must not be visible yet
+        assertFalse(ds2.getRoot().hasChildNode(nodePrefix + 0));
+
+        waitOneMinute();
+        ds2.runBackgroundOperations();
+        ds2.renewClusterIdLease();
+
+        waitOneMinute();
+        ds2.runBackgroundOperations();
+        ds2.renewClusterIdLease();
+
+        NodeState root1 = ds2.getRoot();
+
+        // clusterId 1 lease expired
+        assertTrue(ds2.getLastRevRecoveryAgent().isRecoveryNeeded());
+        cds2.resetCounters();
+        int numDocs = ds2.getLastRevRecoveryAgent().recover(1);
+        assertThat(numDocs, equalTo(101));
+        // recovery must create three journal entries
+        assertThat(cds2.getNumCreateOrUpdateCalls(Collection.JOURNAL), equalTo(3));
+
+        ds2.runBackgroundOperations();
+        // check some random nodes are present after recovery
+        Random rand = new Random();
+        for (int i = 0; i < 100; i++) {
+            NodeState node = ds2.getRoot()
+                    .getChildNode(nodePrefix + rand.nextInt(100))
+                    .getChildNode(nodePrefix + rand.nextInt(350));
+            assertTrue(node.exists());
+        }
+
+        NodeState root2 = ds2.getRoot();
+        TrackingDiff diff = new TrackingDiff();
+        root2.compareAgainstBaseState(root1, diff);
+        assertThat(diff.added.size(), equalTo(35100));
+    }
+
     private void waitOneMinute() throws Exception {
         clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
     }