You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2022/09/15 09:22:20 UTC
[jackrabbit-oak] branch 1.22 updated: OAK-9535: Support recovery of large branch merge
This is an automated email from the ASF dual-hosted git repository.
mreutegg pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/1.22 by this push:
new eb9014c9fb OAK-9535: Support recovery of large branch merge
new dfcc6f80be Merge pull request #701 from rishabhdaim/OAK-9933
eb9014c9fb is described below
commit eb9014c9fbe27bfa2f25d9852b63d0b67f23b070
Author: Marcel Reutegger <ma...@gmail.com>
AuthorDate: Mon Aug 30 16:42:21 2021 +0200
OAK-9535: Support recovery of large branch merge
Squashed commits by Stefan Egli and Marcel Reutegger
---
.../oak/plugins/document/LastRevRecoveryAgent.java | 49 ++++-
.../plugins/document/LargeMergeRecoveryTest.java | 238 +++++++++++++++++++++
.../oak/plugins/document/RecoveryTest.java | 83 ++++++-
3 files changed, 363 insertions(+), 7 deletions(-)
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
index 1f42e76067..e1f920f5e8 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
@@ -78,6 +78,14 @@ public class LastRevRecoveryAgent {
private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+ // OAK-9535 : create (flush) a pseudo branch commit journal entry as soon as
+ // we see the (approximate) updateOp size of the recovery journal entry grow above 1 MB
+ // (1 MB being well within the 16 MB limit to account for 'approximate' nature of getting the size)
+ private static final int PSEUDO_BRANCH_COMMIT_UPDATE_OP_THRESHOLD_BYTES = 1 * 1024 * 1024;
+
+ // OAK-9535 : recalculate the journal entry size every 4096 elements
+ private static final int PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT = 4096;
+
public LastRevRecoveryAgent(DocumentStore store,
RevisionContext revisionContext,
MissingLastRevSeeker seeker,
@@ -300,7 +308,7 @@ public class LastRevRecoveryAgent {
//Map of known last rev of checked paths
Map<Path, Revision> knownLastRevOrModification = MapFactory.getInstance().create();
- final JournalEntry changes = JOURNAL.newDocument(store);
+ JournalEntry changes = JOURNAL.newDocument(store);
Clock clock = revisionContext.getClock();
@@ -309,6 +317,8 @@ public class LastRevRecoveryAgent {
long startOfScan = clock.getTime();
long lastLog = startOfScan;
+ final List<Revision> pseudoBcRevs = new ArrayList<>();
+ int nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
for (NodeDocument doc : suspects) {
totalCount++;
lastCount++;
@@ -364,7 +374,41 @@ public class LastRevRecoveryAgent {
unsavedParents.put(path, lastRevForParents);
}
}
+ // avoid recalculating the size of the updateOp upon every single path
+ // but also avoid doing it only after we hit the 16MB limit
+ if (changes.getNumChangedNodes() >= nextFlushCheckCount) {
+ final Revision pseudoBcRev = Revision.newRevision(clusterId).asBranchRevision();
+ final UpdateOp pseudoBcUpdateOp = changes.asUpdateOp(pseudoBcRev);
+ final int approxPseudoBcUpdateOpSize = pseudoBcUpdateOp.toString().length();
+ if (approxPseudoBcUpdateOpSize >= PSEUDO_BRANCH_COMMIT_UPDATE_OP_THRESHOLD_BYTES) {
+ // flush the (pseudo) journal entry
+ // regarding 'pseudo' : this journal entry, while being a branch commit,
+ // does not correspond to an actual branch commit that happened before the crash.
+ // we might be able to in theory reconstruct the very original branch commits,
+ // but that's a tedious job, and we were not doing that prior to OAK-9535 neither.
+ // hence the optimization built-in here is that we create a journal entry
+ // of type 'branch commit', but with a revision that is different from
+ // what originally happened. Thx to the fact that the JournalEntry just
+ // contains a list of branch commit journal ids, that should work fine.
+ if (store.create(JOURNAL, singletonList(pseudoBcUpdateOp))) {
+ log.info("recover : created intermediate pseudo-bc journal entry with rev {} and approx size {} bytes.",
+ pseudoBcRev, approxPseudoBcUpdateOpSize);
+ pseudoBcRevs.add(pseudoBcRev);
+ changes = JOURNAL.newDocument(store);
+ nextFlushCheckCount = PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
+ } else {
+ log.warn("recover : could not create intermediate pseudo-bc journal entry with rev {}",
+ pseudoBcRev);
+ // retry a little later then, hence reduce the next counter by half an interval
+ nextFlushCheckCount += changes.getNumChangedNodes() + (PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT / 2);
+ }
+ } else {
+ nextFlushCheckCount = changes.getNumChangedNodes() + PSEUDO_BRANCH_COMMIT_FLUSH_CHECK_COUNT;
+ }
+ }
}
+ // propagate the pseudoBcRevs to the changes
+ changes.branchCommit(pseudoBcRevs);
for (Path parentPath : unsavedParents.getPaths()) {
Revision calcLastRev = unsavedParents.get(parentPath);
@@ -432,6 +476,7 @@ public class LastRevRecoveryAgent {
// thus it doesn't matter, where exactly the check is done
// as to whether the recovered lastRev has already been
// written to the journal.
+ final JournalEntry finalChanges = changes;
unsaved.persist(store, new Supplier<Revision>() {
@Override
public Revision get() {
@@ -465,7 +510,7 @@ public class LastRevRecoveryAgent {
}
// otherwise store a new journal entry now
- if (store.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)))) {
+ if (store.create(JOURNAL, singletonList(finalChanges.asUpdateOp(lastRootRev)))) {
log.info("Recovery created journal entry {}", id);
} else {
log.warn("Unable to create journal entry {} (already exists).", id);
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LargeMergeRecoveryTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LargeMergeRecoveryTest.java
new file mode 100644
index 0000000000..5f6611e0e8
--- /dev/null
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/LargeMergeRecoveryTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.disposeQuietly;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class LargeMergeRecoveryTest extends AbstractTwoNodeTest {
+
+ public LargeMergeRecoveryTest(DocumentStoreFixture fixture) {
+ super(fixture);
+ }
+
+ private static NodeDocument getDocument(DocumentNodeStore nodeStore,
+ String path) {
+ return nodeStore.getDocumentStore().find(NODES, getIdFromPath(path));
+ }
+
+ @Parameterized.Parameters(name = "{0}")
+ public static java.util.Collection<Object[]> fixtures() throws IOException {
+ List<Object[]> fixtures = Lists.newArrayList();
+ // disabling MemoryFixture, as that runs into an OutOfMemoryError
+// fixtures.add(new Object[] {new DocumentStoreFixture.MemoryFixture()});
+
+ DocumentStoreFixture rdb = new DocumentStoreFixture.RDBFixture("RDB-H2(file)", "jdbc:h2:file:./target/ds-test", "sa", "");
+ if (rdb.isAvailable()) {
+ fixtures.add(new Object[] { rdb });
+ }
+
+ DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+ if (mongo.isAvailable()) {
+ fixtures.add(new Object[] { mongo });
+ }
+ return fixtures;
+ }
+
+ /**
+ * Does 1 large, and 2 minor normal commits, that all require recovery
+ * Reproduces OAK-9535
+ */
+ @Test
+ @Ignore(value = "slow test, we have one that reproduces OAK-9535 enabled, so this one is not ran by default")
+ public void testMixedLargeBranchMergeRecovery() throws Exception {
+ doTestMixedLargeBranchMergeRecovery(DocumentNodeStoreBuilder.DEFAULT_UPDATE_LIMIT);
+ }
+
+ /**
+ * Does 1 large, and 2 minor normal commits, that all require recovery
+ * Depending on actual update.limit it might or might not reproduce OAK-9535
+ * (update limit must be 100'000 in order to reproduce the bug - but that
+ * results in a long test duration)
+ */
+ @Test
+ public void testMixedSmallBranchMergeRecovery() throws Exception {
+ doTestMixedLargeBranchMergeRecovery(DocumentNodeStoreBuilder.UPDATE_LIMIT);
+ }
+
+ void doTestMixedLargeBranchMergeRecovery(int updateLimit) throws Exception {
+ // 1. Create base structure /x/y
+ NodeBuilder b1 = ds1.getRoot().builder();
+ b1.child("x").child("y");
+ ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ ds1.runBackgroundOperations();
+ ds2.runBackgroundOperations();
+
+ // 2. create a branch in C2
+ final String childPrefix = "childWithMediumLengthJavaContentRepositoryNodeNameInUTFdashEight-";
+ NodeBuilder b2 = ds2.getRoot().builder();
+ NodeBuilder test = b2.child("x").child("y");
+ System.out.println(
+ "Creating large branch merge, can take over a minute... (limit = " + updateLimit + ")");
+
+ for (int i = 0; i < updateLimit * 3; i++) {
+ test.child(childPrefix + i);
+ }
+
+ assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+ // before merging, make another merge on C2 - non-conflicting
+ NodeBuilder b22 = ds2.getRoot().builder();
+ b22.child("a").child("b1");
+ ds2.merge(b22, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ // aand another one:
+ b22 = ds2.getRoot().builder();
+ b22.child("a").child("b2");
+ ds2.merge(b22, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+ ds1.runBackgroundOperations();
+ assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+ ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ long leaseTime = ds1.getClusterInfo().getLeaseTime();
+ ds1.runBackgroundOperations();
+
+ clock.waitUntil(clock.getTime() + leaseTime + 10);
+
+ // Renew the lease for C1
+ ds1.getClusterInfo().renewLease();
+
+ assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
+
+ Iterable<Integer> cids = ds1.getLastRevRecoveryAgent()
+ .getRecoveryCandidateNodes();
+ assertEquals(1, Iterables.size(cids));
+ assertEquals(c2Id, Iterables.get(cids, 0).intValue());
+
+ assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+ assertFalse(ds1.getRoot().getChildNode("a").hasChildNode("b1"));
+ assertFalse(ds1.getRoot().getChildNode("a").hasChildNode("b2"));
+
+ System.out.println("RECOVER...");
+ ds1.getLastRevRecoveryAgent().recover(Iterables.get(cids, 0));
+ System.out.println("RECOVER DONE");
+
+ ds1.runBackgroundOperations();
+
+ assertTrue(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+ assertTrue(ds1.getRoot().getChildNode("a").hasChildNode("b1"));
+ assertTrue(ds1.getRoot().getChildNode("a").hasChildNode("b2"));
+
+ // dispose ds2 quietly because it may now throw an exception
+ disposeQuietly(ds2);
+ }
+
+ /**
+ * Does 1 large branch commit that requires recovery
+ * Reproduces OAK-9535
+ */
+ @Test
+ @Ignore(value = "ignoring for now as it causes OutOfMemoryError on travis")
+ public void testOneLargeBranchMergeRecovery() throws Exception {
+ doTestOneLargeBranchMergeRecovery(DocumentNodeStoreBuilder.DEFAULT_UPDATE_LIMIT);
+ }
+
+ /**
+ * Does 1 large branch commit that requires recovery
+ * Depending on actual update.limit it might or might not reproduce OAK-9535
+ * (with default likely set to 100 for tests, this is a quick version to test just the logic)
+ */
+ @Test
+ public void testOneSmallBranchMergeRecovery() throws Exception {
+ doTestOneLargeBranchMergeRecovery(DocumentNodeStoreBuilder.UPDATE_LIMIT);
+ }
+
+ void doTestOneLargeBranchMergeRecovery(int updateLimit) throws Exception {
+ // 1. Create base structure /x/y
+ NodeBuilder b1 = ds1.getRoot().builder();
+ b1.child("x").child("y");
+ ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ ds1.runBackgroundOperations();
+ ds2.runBackgroundOperations();
+
+ final String childPrefix = "childWithMediumLengthJavaContentRepositoryNodeNameInUTFdashEight-";
+
+ // 2. create a branch in C2
+ NodeBuilder b2 = ds2.getRoot().builder();
+ NodeBuilder test = b2.child("x").child("y");
+ System.out.println("Creating large branch merge, can take over a minute... (limit = " + updateLimit + ")");
+ for (int i = 0; i < updateLimit * 3; i++) {
+ test.child(childPrefix + i);
+ }
+ System.out.println("Done. Merging...");
+
+ ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ System.out.println("Merged.");
+
+ Revision zlastRev2 = ds2.getHeadRevision().getRevision(ds2.getClusterId());
+
+ long leaseTime = ds1.getClusterInfo().getLeaseTime();
+ ds1.runBackgroundOperations();
+
+ clock.waitUntil(clock.getTime() + leaseTime + 10);
+
+ // Renew the lease for C1
+ ds1.getClusterInfo().renewLease();
+
+ assertTrue(ds1.getLastRevRecoveryAgent().isRecoveryNeeded());
+
+ Iterable<Integer> cids = ds1.getLastRevRecoveryAgent()
+ .getRecoveryCandidateNodes();
+ assertEquals(1, Iterables.size(cids));
+ assertEquals(c2Id, Iterables.get(cids, 0).intValue());
+
+ assertFalse(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+ System.out.println("RECOVER...");
+ ds1.getLastRevRecoveryAgent().recover(Iterables.get(cids, 0));
+ System.out.println("RECOVER DONE");
+
+ assertEquals(zlastRev2, getDocument(ds1, "/x/y").getLastRev().get(c2Id));
+ assertEquals(zlastRev2, getDocument(ds1, "/x").getLastRev().get(c2Id));
+ assertEquals(zlastRev2, getDocument(ds1, "/").getLastRev().get(c2Id));
+
+ ds1.runBackgroundOperations();
+ assertTrue(ds1.getRoot().getChildNode("x").getChildNode("y").hasChildNode(childPrefix + "0"));
+
+ // dispose ds2 quietly because it may now throw an exception
+ disposeQuietly(ds2);
+ }
+}
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java
index a32687d94d..09b3348fb8 100644
--- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RecoveryTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture.MemoryFixture;
import org.apache.jackrabbit.oak.plugins.migration.NodeStateTestUtils;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -28,30 +30,39 @@ import org.junit.Test;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.disposeQuietly;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.persistToBranch;
import static org.apache.jackrabbit.oak.plugins.migration.NodeStateTestUtils.assertExists;
import static org.apache.jackrabbit.oak.plugins.migration.NodeStateTestUtils.assertMissing;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
public class RecoveryTest extends AbstractTwoNodeTest {
private FailingDocumentStore fds1;
+ private CountingDocumentStore cds2;
+
public RecoveryTest(DocumentStoreFixture fixture) {
super(fixture);
}
@Override
protected DocumentStore customize(DocumentStore store) {
- // wrap the first store with a FailingDocumentStore
- FailingDocumentStore fds = new FailingDocumentStore(store);
+ DocumentStore ds;
if (fds1 == null) {
- fds1 = fds;
+ // wrap the first store with a FailingDocumentStore
+ fds1 = new FailingDocumentStore(store);
+ ds = fds1;
+ } else {
+ cds2 = new CountingDocumentStore(store);
+ ds = cds2;
}
- return fds;
+ return ds;
}
@Test
@@ -157,6 +168,68 @@ public class RecoveryTest extends AbstractTwoNodeTest {
assertThat(diff.deleted, containsInAnyOrder("/parent/test/c1"));
}
+ @Test
+ public void recoverLargeBranch() throws Exception {
+ // only run this on memory fixture, others take too long
+ assumeTrue(fixture instanceof MemoryFixture);
+
+ String nodePrefix = "long-node-name-with-many-characters-to-increase-the-size-of-the-journal";
+ NodeBuilder builder = ds1.getRoot().builder();
+ int numNodes = 0;
+ for (int i = 0; i < 100; i++) {
+ NodeBuilder child = builder.child(nodePrefix + i);
+ numNodes++;
+ for (int j = 0; j < 350; j++) {
+ child.child(nodePrefix + j);
+ if (numNodes++ % 100 == 0) {
+ // create branch commit every 100 nodes
+ persistToBranch(builder);
+ }
+ }
+ }
+ merge(ds1, builder);
+
+ // simulate crash of ds1
+ fds1.fail().after(1).eternally();
+ disposeQuietly(ds1);
+
+ // nodes must not be visible yet
+ assertFalse(ds2.getRoot().hasChildNode(nodePrefix + 0));
+
+ waitOneMinute();
+ ds2.runBackgroundOperations();
+ ds2.renewClusterIdLease();
+
+ waitOneMinute();
+ ds2.runBackgroundOperations();
+ ds2.renewClusterIdLease();
+
+ NodeState root1 = ds2.getRoot();
+
+ // clusterId 1 lease expired
+ assertTrue(ds2.getLastRevRecoveryAgent().isRecoveryNeeded());
+ cds2.resetCounters();
+ int numDocs = ds2.getLastRevRecoveryAgent().recover(1);
+ assertThat(numDocs, equalTo(101));
+ // recovery must create three journal entries
+ assertThat(cds2.getNumCreateOrUpdateCalls(Collection.JOURNAL), equalTo(3));
+
+ ds2.runBackgroundOperations();
+ // check some random nodes are present after recovery
+ Random rand = new Random();
+ for (int i = 0; i < 100; i++) {
+ NodeState node = ds2.getRoot()
+ .getChildNode(nodePrefix + rand.nextInt(100))
+ .getChildNode(nodePrefix + rand.nextInt(350));
+ assertTrue(node.exists());
+ }
+
+ NodeState root2 = ds2.getRoot();
+ TrackingDiff diff = new TrackingDiff();
+ root2.compareAgainstBaseState(root1, diff);
+ assertThat(diff.added.size(), equalTo(35100));
+ }
+
private void waitOneMinute() throws Exception {
clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
}