You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2020/10/14 08:47:55 UTC

svn commit: r1882480 [1/2] - in /jackrabbit/oak/trunk/oak-store-document/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/util/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: stefanegli
Date: Wed Oct 14 08:47:54 2020
New Revision: 1882480

URL: http://svn.apache.org/viewvc?rev=1882480&view=rev
Log:
OAK-9176 : fix sweep to not miss _bc entries plus introducing sweep2, a one-time half-CPU-using repository traversal that fixes missing _bc entries that the original sweep was missing: A repository that was ever running with Oak older than 1.8 and that upgraded to a version without this OAK-9176 fix might have missing _bc entries - sweep2 will fix that. Sweep2 can be disabled if it causes problems: via a System Property -Doak.documentMK.disableSweep2=true

Added:
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Test.java   (with props)
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/Sweep2TestHelper.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java
    jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
    jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/V18BranchCommitTest.java

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Oct 14 08:47:54 2020
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -203,6 +204,10 @@ public final class DocumentNodeStore
     private final int createOrUpdateBatchSize = SystemPropertySupplier.create("oak.documentMK.createOrUpdateBatchSize", 1000)
             .loggingTo(LOG).get();
 
+    public static final String SYS_PROP_DISABLE_SWEEP2 = "oak.documentMK.disableSweep2";
+    private boolean disableSweep2 = SystemPropertySupplier.create(SYS_PROP_DISABLE_SWEEP2, Boolean.FALSE).loggingTo(LOG)
+            .get();
+
     /**
      * The document store without potentially lease checking wrapper.
      */
@@ -350,6 +355,12 @@ public final class DocumentNodeStore
     private Thread backgroundSweepThread;
 
     /**
+     * Extra, one-off sweep2 background task for fixing OAK-9176 ie missing _bc
+     * on parents and root.
+     */
+    private Thread backgroundSweep2Thread;
+
+    /**
      * The sweep revision vector. Revisions for trunk commits older than this
      * can safely be considered committed without looking up the commit value
      * on the commit root document.
@@ -713,6 +724,56 @@ public final class DocumentNodeStore
             // triggering sweep
             runBackgroundUpdateOperations();
 
+            // check if we need a sweep2 *before* doing a backgroundSweep.
+            // this enables us to detect a direct Oak <= 1.6 upgrade situation,
+            // where a sweep2 is *not* needed.
+
+            // there are 3 different cases with sweep[1]/sweep2:
+            // 1) Oak <= 1.6 direct upgrade:
+            //    -> no sweep2 is needed as a sweep1 is needed anyway and sweep2
+            //       from now on happens as part of it (with the OAK-9176 fix)
+            // 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
+            //    -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
+            //       never ran <= 1.6)
+            // 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
+            //    -> A (full) sweep2 is needed. This is the main case of OAK-9176.
+
+            // In case 3 there is a valid, recent "_sweepRev" - and
+            // we can go ahead and do a "quick" backgroundSweep() here
+            // before continuing, to unblock the startup.
+            // After that, an async/background task is started for sweep2.
+
+            // which of cases 1-3 we have is determined via 'sweep2LockIfNecessary'
+            // and recorded in the settings collection.
+
+            // except for this case detection (which acquires a "sweep2 lock" if needed)
+            // we can otherwise continue normally. That means, a sweep1 can
+            // be considered as usual.
+            // Note that for case 3, doing this normal sweep1 can now also
+            // fix some "_bc" - which before OAK-9176 were missing
+            // and which sweep2 would separately fix as well - but this is not a problem.
+
+            // Note that by setting the SYS_PROP_DISABLE_SWEEP2 system property
+            // the sweep2 is bypassed and the sweep2 status is explicitly stored as "swept".
+            final long sweep2Lock;
+            if (disableSweep2) {
+                try {
+                    final Sweep2StatusDocument sweep2Status = Sweep2StatusDocument.readFrom(store);
+                    if (sweep2Status == null || !sweep2Status.isSwept()) {
+                        // setting the disableSweep2 flag stores this in the repository
+                        Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId);
+                    }
+                } catch(Exception e) {
+                    LOG.warn("<init> sweep2 is diabled as instructed by system property ("
+                            + SYS_PROP_DISABLE_SWEEP2 + "=true) - however, got an Exception"
+                                    + " while storing sweep2 status in the settings collection: " + e, e);
+                }
+                sweep2Lock = -1;
+            } else {
+                // So: unless sweep2 is disabled: acquire sweep2 if one is (maybe) necessary
+                sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
+            }
+
             // perform an initial document sweep if needed
             // this may be long running if there is no sweep revision
             // for this clusterId (upgrade from Oak <= 1.6).
@@ -721,6 +782,15 @@ public final class DocumentNodeStore
 
             backgroundUpdateThread.start();
             backgroundSweepThread.start();
+
+            if (sweep2Lock >= 0) {
+                // sweep2 is necessary - so start a sweep2 background task
+                backgroundSweep2Thread = new Thread(
+                        new BackgroundSweep2Operation(this, isDisposed, sweep2Lock),
+                        "DocumentNodeStore background sweep2 thread " + threadNamePostfix);
+                backgroundSweep2Thread.setDaemon(true);
+                backgroundSweep2Thread.start();
+            }
         }
 
         persistentCache = builder.getPersistentCache();
@@ -757,7 +827,8 @@ public final class DocumentNodeStore
 
         Utils.joinQuietly(backgroundReadThread,
                 backgroundUpdateThread,
-                backgroundSweepThread);
+                backgroundSweepThread,
+                backgroundSweep2Thread);
 
         DocumentStoreException ex = null;
 
@@ -2496,6 +2567,158 @@ public final class DocumentNodeStore
 
     }
 
+    /**
+     * Executes the sweep2 (from within a background thread)
+     * @param sweep2Lock the lock value originally acquired
+     * @return true if sweep2 is done or no longer needed, false otherwise (in which case it should be retried)
+     * @throws DocumentStoreException
+     */
+    boolean backgroundSweep2(long sweep2Lock) throws DocumentStoreException {
+        if (sweep2Lock == 0) {
+            sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
+            if (sweep2Lock == 0) {
+                // still not well defined, retry in a minute (done in BackgroundSweep2Operation)
+                return false;
+            }
+            if (sweep2Lock == -1) {
+                // then we're done
+                return true;
+            }
+        }
+        // sweep2Lock > 0, the local instance holds the lock
+        Sweep2StatusDocument statusDoc = Sweep2StatusDocument.readFrom(store);
+        if (statusDoc != null /*should never be null as we hold the lock, but let's play safe anyway .. */
+                && statusDoc.isChecking()) {
+            // very likely no 'isSweep2Necessary' might not have been done yet, let's do it now
+            LOG.info("backgroundSweep2: checking whether sweep2 is necessary...");
+            if (!Sweep2Helper.isSweep2Necessary(store)) {
+                LOG.info("backgroundSweep2: sweep2 check determined a sweep2 is NOT necessary. Marking sweep2 status as 'swept'.");
+                if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId)) {
+                    LOG.error("backgroundSweep2 : failed to update the sweep2 status to 'swept'. Aborting sweep2 for now.");
+                }
+                return true;
+            }
+            LOG.info("backgroundSweep2: sweep2 check determined a sweep2 IS necessary. Marking sweep2 status as 'sweeping'.");
+        }
+        // update the lock status one more time to ensure no check can conclude sweep2 is not necessary anymore
+        sweep2Lock = Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId,
+                true /* this true here is what's relevant: locks-in the 'sweeping' status */);
+        if (sweep2Lock == 0) {
+            // something came in between, retry later
+            LOG.info("backgroundSweep2: could not update the sweep2 lock to sweeping, someone got in between. Retry later.");
+            return false;
+        } else if (sweep2Lock == -1) {
+            // odd, someone else concluded we're done
+            LOG.info("backgroundSweep2: meanwhile, someone else concluded sweep2 is done.");
+            return true;
+        }
+
+        // compile the list of clusterIds for which sweep2 should be done.
+        // in an ideal situation sweep(1) would have been done for all clusterIds,
+        // and in that case "_sweepRev" will contain all clusterIds ever used.
+        // However, it is a supported situation that not sweep(1) was not run for all clusterIds,
+        // and in this case sweep2 must also not be done for all, but only for those clusterIds.
+        List<Integer> includedClusterIds = new LinkedList<>();
+        for (Revision aSweepRev : sweepRevisions) {
+            includedClusterIds.add(aSweepRev.getClusterId());
+        }
+
+        // at this point we did properly acquire a lock and can go ahead doing sweep2
+        LOG.info("backgroundSweep2: starting sweep2 (includedClusterIds={})", includedClusterIds);
+        int num = forceBackgroundSweep2(includedClusterIds);
+        LOG.info("backgroundSweep2: finished sweep2, num swept=" + num);
+
+        // release the lock.
+        // Note that in theory someone else could have released our lock, or that
+        // the sweep2 status was deleted - that actually doesn't matter:
+        // we just went through a full, successful sweep2 and we want to record it
+        // that way - irrespective of any interference with the status
+        // -> hence the 'force' aspect of releasing here
+        if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId)) {
+            LOG.error("backgroundSweep2 : sweep2 finished but we failed to update the sweep2 status accordingly");
+        }
+        return true;
+    }
+
+    /**
+     * Executes a sweep2 either only for the provided or for all clusterIds otherwise
+     * (which case applies depends on the status of sweep(1) in the repository).
+     * @param includedClusterIds restrict sweep2 to only these clusterIds - or do it for
+     * all clusterIds if this list is empty or null.
+     * @return number of documents swept
+     * @throws DocumentStoreException
+     */
+    int forceBackgroundSweep2(List<Integer> includedClusterIds) throws DocumentStoreException {
+        final RevisionVector emptySweepRevision = new RevisionVector();
+        CommitValueResolver cvr = new CachingCommitValueResolver(
+                0 /* disable caching for sweep2 as caching has a risk of propagating wrong values */,
+                () -> emptySweepRevision);
+        MissingBcSweeper2 sweeper = new MissingBcSweeper2(this, cvr, includedClusterIds, isDisposed);
+        LOG.info("Starting document sweep2. Head: {}, starting at 0", getHeadRevision());
+        Iterable<NodeDocument> docs = lastRevSeeker.getCandidates(0);
+        try {
+            final AtomicInteger numUpdates = new AtomicInteger();
+
+            sweeper.sweep2(docs, new NodeDocumentSweepListener() {
+                @Override
+                public void sweepUpdate(final Map<Path, UpdateOp> updates)
+                        throws DocumentStoreException {
+                    // create a synthetic commit. this commit does not have any
+                    // changes, we just use it to create a journal entry for
+                    // cache invalidation and apply the sweep updates
+                    backgroundOperationLock.readLock().lock();
+                    try {
+                        boolean success = false;
+                        Revision r = commitQueue.createRevision();
+                        try {
+                            commitQueue.done(r, new CommitQueue.Callback() {
+                                @Override
+                                public void headOfQueue(@NotNull Revision revision) {
+                                    writeUpdates(updates, revision);
+                                }
+                            });
+                            success = true;
+                        } finally {
+                            if (!success && commitQueue.contains(r)) {
+                                commitQueue.canceled(r);
+                            }
+                        }
+                    } finally {
+                        backgroundOperationLock.readLock().unlock();
+                    }
+                }
+
+                private void writeUpdates(Map<Path, UpdateOp> updates,
+                                          Revision revision)
+                        throws DocumentStoreException {
+                    // create journal entry
+                    JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+                    entry.modified(updates.keySet());
+                    Revision r = newRevision().asBranchRevision();
+                    if (!store.create(JOURNAL, singletonList(entry.asUpdateOp(r)))) {
+                        String msg = "Unable to create journal entry for " +
+                                "document invalidation. Will be retried with " +
+                                "next background sweep2 operation.";
+                        throw new DocumentStoreException(msg);
+                    }
+                    changes.invalidate(Collections.singleton(r));
+                    unsavedLastRevisions.put(ROOT, revision);
+                    RevisionVector newHead = getHeadRevision().update(revision);
+                    setRoot(newHead);
+                    commitQueue.headRevisionChanged();
+
+                    store.createOrUpdate(NODES, Lists.newArrayList(updates.values()));
+                    numUpdates.addAndGet(updates.size());
+                    LOG.debug("Background sweep2 updated {}", updates.keySet());
+                }
+            });
+
+            return numUpdates.get();
+        } finally {
+            Utils.closeIfCloseable(docs);
+        }
+    }
+
     private int backgroundSweep() throws DocumentStoreException {
         // decide if it needs to run
         Revision head = getHeadRevision().getRevision(clusterId);
@@ -3296,6 +3519,50 @@ public final class DocumentNodeStore
         }
     }
 
+    /**
+     * Background sweep2 operation (also see OAK-9176 for details/context).
+     */
+    private static class BackgroundSweep2Operation extends NodeStoreTask {
+
+        private final long sweep2Lock;
+        private int retryCount = 0;
+
+        BackgroundSweep2Operation(DocumentNodeStore nodeStore,
+                                 AtomicBoolean isDisposed,
+                                 long sweep2Lock) {
+            // default asyncDelay is 1000ms == 1sec
+            // the sweep2 is fine to run every 60sec by default as it is not time critical
+            // to achieve this we're doing a Math.min(60sec, 60 * getAsyncDelay())
+            super(nodeStore, isDisposed,
+                    Suppliers.ofInstance(Math.min(60000, 60 * nodeStore.getAsyncDelay())));
+            if (sweep2Lock < 0) {
+                throw new IllegalArgumentException("sweep2Lock must not be negative");
+            }
+            this.sweep2Lock = sweep2Lock;
+        }
+
+        @Override
+        protected void execute(@NotNull DocumentNodeStore nodeStore) {
+            if (retryCount > 0) {
+                LOG.info("BackgroundSweep2Operation.execute: retrying sweep2. retryCount=" + retryCount);
+            }
+            if (nodeStore.backgroundSweep2(sweep2Lock)) {
+                done();
+            }
+            // log the fact that we noticed an ongoing sweep2 - is only logged once every 5min
+            // until either the other instance finishes or crashes, at which point we or another
+            // instance will pick up
+            if (++retryCount % 5 == 0) {
+                LOG.info("BackgroundSweep2Operation.execute: another instance is currently (still) executing sweep2. "
+                        + "Waiting for its outcome. retryCount=" + retryCount);
+            }
+        }
+
+        private void done() {
+            ref.clear();
+        }
+    }
+
     private static class BackgroundLeaseUpdate extends NodeStoreTask {
 
         /** OAK-4859 : log if time between two renewClusterIdLease calls is too long **/

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java?rev=1882480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java Wed Oct 14 08:47:54 2020
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.partition;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.COMMITROOT_OR_REVISIONS;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+
+/**
+ * The {@code MissingBcSweeper2} is used for the so-called sweep2, which is
+ * a repository traversal updating documents that have missing branch commit ("_bc") 
+ * properties (details see OAK-9176).
+ * This class is similar to NodeDocumentSweeper as it is based on the same principles,
+ * with a few notable exceptions (like it only looks at _commitRoot and _revisions etc).
+ * And due to these exceptions the class is forked rather than modified/subclasses
+ * (also to enable later refactoring of the NodeDocumentSweeper itself).
+ * <p>
+ * This class is not thread-safe.
+ */
+final class MissingBcSweeper2 {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MissingBcSweeper2.class);
+
+    private static final int YIELD_SIZE = 500;
+
+    private static final int INVALIDATE_BATCH_SIZE = 100;
+
+    private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+
+    private final RevisionContext context;
+
+    private final CommitValueResolver commitValueResolver;
+
+    private final int executingClusterId;
+
+    private final List<Integer> includedClusterIds;
+
+    private final RevisionVector headRevision;
+
+    private final AtomicBoolean isDisposed;
+
+    private long totalCount;
+    private long lastCount;
+    private long startOfScan;
+    private long lastLog;
+
+    /**
+     * Creates a new sweeper v2 for the given context..
+     *
+     * @param context the revision context.
+     */
+    MissingBcSweeper2(RevisionContext context,
+                    CommitValueResolver commitValueResolver,
+                    List<Integer> includedClusterIds,
+                    AtomicBoolean isDisposed) {
+        this.context = checkNotNull(context);
+        this.commitValueResolver = checkNotNull(commitValueResolver);
+        this.executingClusterId = context.getClusterId();
+        this.includedClusterIds = includedClusterIds == null ? new LinkedList<>() : Collections.unmodifiableList(includedClusterIds);
+        this.headRevision= context.getHeadRevision();
+        this.isDisposed = isDisposed;
+    }
+
+    /**
+     * Performs a sweep2 and reports the required updates to the given sweep
+     * listener.
+     *
+     * @param documents the documents to sweep
+     * @param listener the listener to receive required sweep update operations.
+     * @throws DocumentStoreException if reading from the store or writing to
+     *          the store failed.
+     */
+    void sweep2(@NotNull Iterable<NodeDocument> documents,
+                   @NotNull NodeDocumentSweepListener listener)
+            throws DocumentStoreException {
+        performSweep2(documents, checkNotNull(listener));
+    }
+
+    //----------------------------< internal >----------------------------------
+
+    private void performSweep2(Iterable<NodeDocument> documents,
+                                  NodeDocumentSweepListener listener)
+            throws DocumentStoreException {
+        totalCount = 0;
+        lastCount = 0;
+        startOfScan = context.getClock().getTime();
+        lastLog = startOfScan;
+
+        Iterable<Map.Entry<Path, UpdateOp>> ops = sweepOperations(documents);
+        for (List<Map.Entry<Path, UpdateOp>> batch : partition(ops, INVALIDATE_BATCH_SIZE)) {
+            Map<Path, UpdateOp> updates = newHashMap();
+            for (Map.Entry<Path, UpdateOp> entry : batch) {
+                updates.put(entry.getKey(), entry.getValue());
+            }
+            listener.sweepUpdate(updates);
+            if (isDisposed.get()) {
+                throw new DocumentStoreException("sweep2 interrupted by shutdown");
+            }
+        }
+        LOG.debug("Document sweep2 finished");
+    }
+
+    private Iterable<Map.Entry<Path, UpdateOp>> sweepOperations(
+            final Iterable<NodeDocument> docs) {
+        return filter(transform(docs,
+                new Function<NodeDocument, Map.Entry<Path, UpdateOp>>() {
+
+            int yieldCnt = 0;
+            long lastYield = context.getClock().getTime();
+
+            @Override
+            public Map.Entry<Path, UpdateOp> apply(NodeDocument doc) {
+                if (++yieldCnt >= YIELD_SIZE) {
+                    try {
+                        final long now = context.getClock().getTime();
+                        final long timeSinceLastYield = now - lastYield;
+                        // wait the same amount of time that passed since last yield
+                        // that corresponds to roughly 50% throttle (ignoring the min 1ms sleep)
+                        final long waitUntil = now + Math.max(1, timeSinceLastYield);
+                        context.getClock().waitUntil(waitUntil);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                    lastYield = context.getClock().getTime();
+                    yieldCnt = 0;
+                }
+                return immutableEntry(doc.getPath(), sweepOne(doc));
+            }
+        }), new Predicate<Map.Entry<Path, UpdateOp>>() {
+            @Override
+            public boolean apply(Map.Entry<Path, UpdateOp> input) {
+                return input.getValue() != null;
+            }
+        });
+    }
+
+    private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
+        UpdateOp op = null;
+        // the blueprint NodeDocumentSweeper.sweepOne goes through
+        // PROPERTY_OR_DELETED_OR_COMMITROOT here, as that's the new full sweep1.
+        // the sweep2 though, only goes through COMMITROOT_OR_REVISIONS,
+        // as that's what was left out by the original sweep1:
+        // - COMMITROOT : for new child (parent)
+        // - REVISIONS : for commit roots (root for branch commits)
+        for (String property : filter(doc.keySet(), COMMITROOT_OR_REVISIONS)) {
+            Map<Revision, String> valueMap = doc.getLocalMap(property);
+            for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+                Revision rev = entry.getKey();
+
+                if (!includedClusterIds.isEmpty() && !includedClusterIds.contains(rev.getClusterId())) {
+                    // sweep2 is only done for those clusterIds that went through sweep1 (before 1.8).
+                    // includedClusterIds contains the latter list of clusterIds.
+                    // For testing, the code also supports going through all clusterIds.
+                    // (in this case the includedClusterIds list is empty).
+                    // Note that sweep2 is never triggered if "_sweepRev" is empty,
+                    // so the empty case here never applies for production.
+                    continue;
+                }
+
+                Revision cRev = getCommitRevision(doc, rev);
+                if (cRev == null) {
+                    // originally the uncommitted case
+                    // however, sweep2 doesn't touch anything uncommitted
+                } else if (cRev.equals(rev)) {
+                    // originally the committed case
+                    // however, that was only logging and sweep2 is not about that,
+                    // hence nothing to be done here
+                } else {
+                    // this is what sweep2 is about : adding _bc for _commitRoot and _revisions
+                    if (op == null) {
+                        op = createUpdateOp(doc);
+                    }
+                    committedBranch(doc, property, rev, cRev, op);
+                }
+            }
+        }
+
+        totalCount++;
+        lastCount++;
+        long now = context.getClock().getTime();
+        long lastElapsed = now - lastLog;
+
+        if (lastElapsed >= LOGINTERVALMS) {
+            TimeDurationFormatter df = TimeDurationFormatter.forLogging();
+
+            long totalElapsed = now - startOfScan;
+            long totalRateMin = (totalCount * TimeUnit.MINUTES.toMillis(1)) / totalElapsed;
+            long lastRateMin = (lastCount * TimeUnit.MINUTES.toMillis(1)) / lastElapsed;
+
+            String restrictionMsg;
+            if (includedClusterIds.isEmpty()) {
+                restrictionMsg = "unrestricted, ie for all clusterIds";
+            } else {
+                restrictionMsg = "restricted to clusterIds " + includedClusterIds;
+            }
+            String message = String.format(
+                    "Sweep2 executed by cluster node [%d] (%s): %d nodes scanned in %s (~%d/m) - last interval %d nodes in %s (~%d/m)",
+                    executingClusterId, restrictionMsg, totalCount, df.format(totalElapsed, TimeUnit.MILLISECONDS), totalRateMin, lastCount,
+                    df.format(lastElapsed, TimeUnit.MILLISECONDS), lastRateMin);
+
+            LOG.info(message);
+            lastLog = now;
+            lastCount = 0;
+        }
+
+        return op == null ? null : op.hasChanges() ? op : null;
+    }
+
+    /**
+     * Returns {@code true} if the given revision is marked as a branch commit
+     * on the document. This method only checks local branch commit information
+     * available on the document ({@link NodeDocument#getLocalBranchCommits()}).
+     * If the given revision is related to a branch commit that was created
+     * prior to Oak 1.8, the method will return {@code false}.
+     *
+     * @param rev a revision.
+     * @param doc the document to check.
+     * @return {@code true} if the revision is marked as a branch commit;
+     *          {@code false} otherwise.
+     */
+    private boolean isV18BranchCommit(Revision rev, NodeDocument doc) {
+        return doc.getLocalBranchCommits().contains(rev);
+    }
+
+    private void committedBranch(NodeDocument doc,
+                                 String property,
+                                 Revision rev,
+                                 Revision cRev,
+                                 UpdateOp op) {
+        boolean newerThanHead = headRevision.isRevisionNewer(cRev);
+        if (LOG.isDebugEnabled()) {
+            String msg = newerThanHead ? " (newer than head)" : "";
+            LOG.debug("Committed branch change on {}, {} @ {}/{}{}",
+                    op.getId(), property, rev, cRev, msg);
+        }
+        if (!isV18BranchCommit(rev, doc)) {
+            NodeDocument.setBranchCommit(op, rev);
+        }
+    }
+
+    private static UpdateOp createUpdateOp(NodeDocument doc) {
+        return new UpdateOp(doc.getId(), false);
+    }
+
+    private String getCommitValue(@NotNull Revision changeRevision,
+            @NotNull NodeDocument doc) {
+        return commitValueResolver.resolve(changeRevision, doc);
+    }
+
+    @Nullable
+    private Revision getCommitRevision(final NodeDocument doc,
+                                       final Revision rev)
+            throws DocumentStoreException {
+        String cv = /*context.*/getCommitValue(rev, doc);
+        if (cv == null) {
+            return null;
+        }
+        return Utils.resolveCommitRevision(rev, cv);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Wed Oct 14 08:47:54 2020
@@ -231,7 +231,7 @@ public final class NodeDocument extends
      * and all previous non-branch revisions by this cluster node can be
      * considered committed.
      */
-    private static final String SWEEP_REV = "_sweepRev";
+    static final String SWEEP_REV = "_sweepRev";
 
     //~----------------------------< Split Document Types >
 

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java Wed Oct 14 08:47:54 2020
@@ -40,7 +40,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeCommitRoot;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeRevision;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setDeletedOnce;
-import static org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS;
 
 /**
  * The {@code NodeDocumentSweeper} is responsible for removing uncommitted
@@ -56,6 +56,9 @@ final class NodeDocumentSweeper {
 
     private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
 
+    /** holds the Predicate actually used in sweepOne. This is modifiable ONLY FOR TESTING PURPOSE */
+    static Predicate<String> SWEEP_ONE_PREDICATE = PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS;
+
     private final RevisionContext context;
 
     private final int clusterId;
@@ -178,7 +181,12 @@ final class NodeDocumentSweeper {
 
     private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
         UpdateOp op = createUpdateOp(doc);
-        for (String property : filter(doc.keySet(), PROPERTY_OR_DELETED)) {
+        // go through PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS, whereas :
+        // - PROPERTY : for content changes
+        // - DELETED : for new node (this)
+        // - COMMITROOT : for new child (parent)
+        // - REVISIONS : for commit roots (root for branch commits)
+        for (String property : filter(doc.keySet(), SWEEP_ONE_PREDICATE)) {
             Map<Revision, String> valueMap = doc.getLocalMap(property);
             for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
                 Revision rev = entry.getKey();

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java?rev=1882480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java Wed Oct 14 08:47:54 2020
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper methods for sweep2 functionality introduced with OAK-9176.
+ * Kept separate from DocumentNodeStore to limit its size.
+ */
+public class Sweep2Helper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Sweep2Helper.class);
+
+    static boolean isSweep2Necessary(DocumentStore store) {
+        NodeDocument rootNodeDoc = store.find(Collection.NODES, Utils.getIdFromPath("/"));
+        if (rootNodeDoc == null) {
+            // that'd be very weird
+            LOG.warn("isSweep2Necessary : cannot get root node - assuming no sweep2 needed");
+            return false;
+        }
+    
+        if (rootNodeDoc.get("_sweepRev") == null) {
+            // this indicates a pre 1.8 repository upgrade (case 1).
+            // no sweep2 is needed as it is embedded in the normal sweep[1].
+            return false;
+        }
+    
+        // in this case we have a post (>=) 1.8 repository
+        // which might or might not have previously been a pre (<) 1.8
+        // and we need to distinguish those 2 cases - which, to repeat, are:
+        // 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
+        //    -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
+        //       never ran <= 1.6)
+        // 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
+        //    -> A (full) sweep2 is needed. This is the main case of OAK-9176.
+        Map<Revision, String> bcValueMap = rootNodeDoc.getValueMap("_bc");
+        Map<Revision, String> valueMap = rootNodeDoc.getValueMap(NodeDocument.REVISIONS);
+        for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+            Revision rev = entry.getKey();
+    
+            // consider all clusterIds..
+            String cv = entry.getValue();
+            if (cv == null) {
+                // skip
+                continue;
+            }
+            Revision cRev = Utils.resolveCommitRevision(rev, cv);
+            if (cRev.equals(rev)) {
+                // fine
+                continue;
+            } else {
+                // then rev should be in the branch commit list
+                if (bcValueMap.containsKey(rev)) {
+                    // all good
+                    continue;
+                }
+                // otherwise the "_bc" does *not* contain a branch commit
+                // which we suspect it should contain.
+                // that is an indicator of requiring a sweep2
+                // it might not, however, be sufficient, ie it might (really?)
+                // be that it got garbage collected away - in which
+                // case we'd be doing an unnecessary sweep2.
+                // so this is case 3)
+                return true;
+            }
+        }
+    
+        // this is case 2
+        return false;
+    }
+
+    /**
+     * Acquires a cluster singleton lock for doing a sweep2 unless a sweep2 was already done.
+     * <p/>
+     * 'If necessary' refers to the sweep2 status in the settings collection - no further
+     * check is done based on content in the nodes collection in this method.
+     * @return <ul>
+     * <li>
+     * &gt;0: the lock was successfully acquired and a sweep2 must now be done
+     * by the local instance. The returned value represents a simple lock value
+     * which needs to be provided for a successful unlock later on.
+     * </li>
+     * <li>
+     * 0: a sweep2 maybe must be done, but cannot at this point. A later retry should be done.
+     * This can happen when another instance is busy doing a sweep2 (and we
+     * monitor that other instance until it is done) - or because no discovery-lite
+     * status is available yet (so we don't know if the current owner of the sweep2 status
+     * crashed or not and how the local instance fits into the picture)
+     * </li>
+     * <li>
+     * -1: no sweep2 is necessary
+     * </li>
+     * </ul>
+     */
+    static long acquireSweep2LockIfNecessary(DocumentStore store, int clusterId) {
+        Sweep2StatusDocument status = Sweep2StatusDocument.readFrom(store);
+        if (status != null && status.isSwept()) {
+            // nothing left to do.
+            // this should be the most frequent case.
+            return -1;
+        }
+    
+        if (status == null) {
+            return Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId, false);
+        }
+        // otherwise a status could have been set by ourselves or by another instance
+    
+        int lockClusterId = status.getLockClusterId();
+        if (lockClusterId == clusterId) {
+            // the local instance was the originator of the sweeping lock, but likely crashed
+            // hence we need to redo the work from scratch as we can't know if we finished it properly
+            LOG.info("acquireSweep2LockIfNecessary : sweep2 status was sweeping, locked by own instance ({}). "
+                    + "Another sweep2 is required.",
+                    clusterId);
+            return status.getLockValue();
+        }
+    
+        // another instance marked as sweeping - check to see if it is still active or it might have crashed
+        if (ClusterNodeInfoDocument.all(store).stream()
+                .anyMatch(info -> info.getClusterId() == lockClusterId && info.isActive())) {
+            // then another instance is busy sweep2-ing, which is fine.
+            // but we should continue monitoring until that other instance is done
+            LOG.debug("acquireSweep2LockIfNecessary : another instance (id {}) is (still) busy doing a sweep2.",
+                    lockClusterId);
+            return 0;
+        }
+    
+        // otherwise the lockClusterId is no longer active, so we
+        // try to overwrite/reacquire the lock for us
+        return Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId, false);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java?rev=1882480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java Wed Oct 14 08:47:54 2020
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the sweep2 status as recorded in the settings collection.
+ */
+public class Sweep2StatusDocument {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Sweep2StatusDocument.class);
+
+    static final String SWEEP2_STATUS_ID = "sweep2Status";
+
+    private static final String STATUS_PROPERTY = "status";
+    private static final String STATUS_VALUE_CHECKING = "checking";
+    private static final String STATUS_VALUE_SWEEPING = "sweeping";
+    private static final String STATUS_VALUE_SWEPT = "swept";
+
+    private static final String LOCK_PROPERTY = "lock";
+    private static final String MOD_COUNT_PROPERTY = "_modCount";
+
+    private static final String SWEPT_BY_PROPERTY = "sweptBy";
+
+    public static Sweep2StatusDocument readFrom(DocumentStore documentStore) {
+        Document doc = documentStore.find(Collection.SETTINGS, SWEEP2_STATUS_ID,
+                -1 /* -1; avoid caching */);
+        if (doc == null) {
+            return null;
+        } else {
+            return new Sweep2StatusDocument(doc);
+        }
+    }
+
+    /**
+     * Acquires the sweep2 lock.
+     * @param documentNodeStore
+     * @param clusterId
+     * @param forceSweepingStatus if false uses the default way to set the sweep2-status,
+     * if set to true (force-)sets sweep2-status to 'sweeping'.
+     * the latter can be used if the caller knows a sweeping is necessary and wants to skip the 'checking' phase explicitly
+     * @return <ul>
+     * <li>
+     * -1 if a sweep2 is definitely not necessary (already swept)
+     * </li>
+     * <li>
+     * 0 if the lock could not be acquired (another instance got in between)
+     * <li>
+     * &gt; 0 if a lock was acquired (in which case this returned value is the lock value, which is always &gt; 0)
+     * </li>
+     * </ul>
+     */
+    public static long acquireOrUpdateSweep2Lock(DocumentStore documentStore,
+            int clusterId, boolean forceSweepingStatus) {
+        Document existingStatusDoc = documentStore.find(Collection.SETTINGS, SWEEP2_STATUS_ID,
+                -1 /* -1; avoid caching */);
+        UpdateOp updateOp = new UpdateOp(SWEEP2_STATUS_ID, true);
+        updateOp.set(LOCK_PROPERTY, clusterId);
+        ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+        updateOps.add(updateOp);
+        if (existingStatusDoc == null) {
+            updateOp.setNew(true);
+            // the first lock is always created in 'checking' state by default
+            // (unless the caller knows otherwise, which would be odd)
+            if (!forceSweepingStatus) {
+                updateOp.set(STATUS_PROPERTY, STATUS_VALUE_CHECKING);
+            } else {
+                // this is unusual - let's log for debugging
+                LOG.warn("acquireOrUpdateSweep2Lock: forced new sweep2 lock directly to state sweeping");
+                updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEEPING);
+            }
+            updateOp.set(MOD_COUNT_PROPERTY, 1L);
+            if (!documentStore.create(Collection.SETTINGS, updateOps)) {
+                LOG.info("acquireOrUpdateSweep2Lock: another instance just acquired the (new) sweep2 lock a few moments ago.");
+                return 0;
+            } else {
+                LOG.info("acquireOrUpdateSweep2Lock: sweep2 status set to "
+                        + (forceSweepingStatus ? "sweeping" : "checking") + ", locked for clusterId=" + clusterId);
+                return 1;
+            }
+        } else {
+            final Sweep2StatusDocument existingStatus = new Sweep2StatusDocument(existingStatusDoc);
+            if (existingStatus.isSwept()) {
+                // not needed => -1
+                return -1;
+            }
+            if (existingStatus.getLockClusterId() == clusterId) {
+                // the local instance already has the lock - let's check if the phase is correct
+                if (!forceSweepingStatus || existingStatus.isSweeping()) {
+                    // then either the caller wants to go ahead with the default
+                    // or with SWEEPING - which is already the case.
+                    // In both these cases the lock is just fine, nothing to fiddle.
+                    return existingStatus.getLockValue();
+                }
+            }
+            // otherwise we need to adjust the lock - either for phase or lock
+            updateOp.setNew(false);
+            if (forceSweepingStatus) {
+                updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEEPING);
+            }
+            updateOp.equals(MOD_COUNT_PROPERTY, existingStatusDoc.getModCount());
+            final long newModCount = existingStatusDoc.getModCount() + 1;
+            updateOp.set(MOD_COUNT_PROPERTY, newModCount);
+            if (documentStore.findAndUpdate(Collection.SETTINGS, updateOp) == null) {
+                LOG.info("acquireOrUpdateSweep2Lock: another instance just acquired the (expired) sweep2 lock a few moments ago");
+                return 0;
+            } else {
+                if (forceSweepingStatus) {
+                    LOG.info("acquireOrUpdateSweep2Lock: sweep2 status set to sweeping, relocked for clusterId=" + clusterId);
+                } else {
+                    LOG.info("acquireOrUpdateSweep2Lock: sweep2 status unchanged (is "
+                            + existingStatusDoc.get(STATUS_PROPERTY) + "), relocked for clusterId=" + clusterId);
+                }
+                return newModCount;
+            }
+        }
+    }
+
+    /**
+     * Release the sweep2 lock and record swept2 successful.
+     * Note that the clusterId is only for recording purpose - this method
+     * makes no checks on the current owner of the lock
+     * @param documentStore
+     * @param clusterId
+     * @return true if the sweep2 status is now marked swept(2) - false if that failed
+     * (in the latter case the caller can consider retrying the acquire/sweep2/release sequence)
+     */
+    public static boolean forceReleaseSweep2LockAndMarkSwept(DocumentStore documentStore, int clusterId) {
+        Document existing = documentStore.find(Collection.SETTINGS, SWEEP2_STATUS_ID,
+                -1 /* -1; avoid caching */);
+
+        if (existing == null) {
+            // we directly mark the sweep2 as done if no sweep2 is even necessary.
+            // so it is legal that we have no existingSweep2Doc yet
+            // lock is ignored when there was no sweep2Status yet
+            UpdateOp updateOp = new UpdateOp(SWEEP2_STATUS_ID, true);
+            updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEPT);
+            updateOp.setNew(true);
+            updateOp.set(MOD_COUNT_PROPERTY, 1L);
+            updateOp.set(SWEPT_BY_PROPERTY, clusterId);
+            ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+            updateOps.add(updateOp);
+            if (!documentStore.create(Collection.SETTINGS, updateOps)) {
+                LOG.info("forceReleaseSweep2LockAndMarkSwept: another instance just wanted to mark sweep2 as done a few moments ago too.");
+                return false;
+            } else {
+                LOG.info("forceReleaseSweep2LockAndMarkSwept: sweep2 status set to swept by clusterId=" + clusterId);
+                return true;
+            }
+        } else {
+            // there was a lock (probably) - at least there was a sweep2 status
+            // we don't care about what that status was, we only
+            // (force) mark it as done.
+            // unless it was already marked as swept - in that case we leave it as is.
+            if (new Sweep2StatusDocument(existing).isSwept()) {
+                LOG.info("forceReleaseSweep2LockAndMarkSwept: sweep2 status was already marked swept previously");
+                return true;
+            }
+            UpdateOp updateOp = new UpdateOp(SWEEP2_STATUS_ID, false);
+            updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEPT);
+            updateOp.set(MOD_COUNT_PROPERTY, existing.getModCount() + 1);
+            updateOp.set(SWEPT_BY_PROPERTY, clusterId);
+            if (existing.keySet().contains(LOCK_PROPERTY)) {
+                updateOp.remove(LOCK_PROPERTY);
+            }
+            if (documentStore.findAndUpdate(Collection.SETTINGS, updateOp) == null) {
+                LOG.info("forceReleaseSweep2LockAndMarkSwept: another instance just wanted to mark sweep2 as done a few moments ago too.");
+                Sweep2StatusDocument status = readFrom(documentStore);
+                if (status == null) {
+                    LOG.warn("forceReleaseSweep2LockAndMarkSwept: no existing sweep2 status after updating failed");
+                    return false;
+                } else {
+                    // so, someone else force-marked as swept in between, if that succeeded
+                    // the status is now swept - in that case we consider the job done anyway
+                    return status.isSwept();
+                }
+            } else {
+                LOG.info("forceReleaseSweep2LockAndMarkSwept: sweep2 status set to swept by clusterId=" + clusterId);
+                return true;
+            }
+        }
+    }
+
+    private final Document doc;
+
+    private Sweep2StatusDocument(Document doc) {
+        this.doc = doc;
+    }
+
+    public boolean isSwept() {
+        return STATUS_VALUE_SWEPT.equals(doc.get(STATUS_PROPERTY));
+    }
+
+    public boolean isSweeping() {
+        return STATUS_VALUE_SWEEPING.equals(doc.get(STATUS_PROPERTY));
+    }
+
+    public boolean isChecking() {
+        return STATUS_VALUE_CHECKING.equals(doc.get(STATUS_PROPERTY));
+    }
+
+    public int getLockClusterId() {
+        return Integer.parseInt(String.valueOf(doc.get(LOCK_PROPERTY)));
+    }
+
+    public Integer getSweptById() {
+        Object value = doc.get(SWEPT_BY_PROPERTY);
+        if (value == null) {
+            return null;
+        }
+        return Integer.parseInt(String.valueOf(value));
+    }
+
+    public long getLockValue() {
+        return doc.getModCount();
+    }
+
+    @Override
+    public String toString() {
+        return "Sweep2StatusDocument(status=" + doc.get(STATUS_PROPERTY) + ",lockClusterId=" + getLockClusterId() + ",lockValue=" + getLockValue()+")";
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Oct 14 08:47:54 2020
@@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.transform;
 import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isCommitRootEntry;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isRevisionsEntry;
 
 /**
  * Utility methods.
@@ -106,6 +108,26 @@ public class Utils {
         }
     };
 
+    /**
+     * A predicate for property, _deleted, _commitRoot or _revisions names.
+     */
+    public static final Predicate<String> PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS = new Predicate<String>() {
+        @Override
+        public boolean apply(@Nullable String input) {
+            return Utils.isPropertyName(input) || isDeletedEntry(input) || isCommitRootEntry(input) || isRevisionsEntry(input);
+        }
+    };
+
+    /**
+     * A predicate for _commitRoot and _revisions names.
+     */
+    public static final Predicate<String> COMMITROOT_OR_REVISIONS = new Predicate<String>() {
+        @Override
+        public boolean apply(@Nullable String input) {
+            return isCommitRootEntry(input) || isRevisionsEntry(input);
+        }
+    };
+
     public static int pathDepth(String path) {
         if (path.equals("/")) {
             return 0;
@@ -983,7 +1005,9 @@ public class Utils {
     public static void joinQuietly(Thread... threads) {
         for (Thread t : threads) {
             try {
-                t.join();
+                if (t != null) {
+                    t.join();
+                }
             } catch (InterruptedException e) {
                 // ignore
             }

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java Wed Oct 14 08:47:54 2020
@@ -68,6 +68,7 @@ public class BranchStateTest {
         }
         assertNotNull(store.find(Collection.NODES, testId));
 
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
     }
 
     private static final class FailingHook implements CommitHook {

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java Wed Oct 14 08:47:54 2020
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.document.Branch.BranchCommit;
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -31,13 +32,16 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.junit.Rule;
 import org.junit.Test;
 
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.TestUtils.asDocumentState;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
 import static org.apache.jackrabbit.oak.plugins.document.TestUtils.persistToBranch;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -88,6 +92,135 @@ public class BranchTest {
     }
 
     @Test
+    public void rootBranchCommitChildTest() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns = builderProvider.newBuilder()
+                .setClusterId(1)
+                .setDocumentStore(store).build();
+
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("a");
+        persistToBranch(builder);
+
+        merge(ns, builder);
+
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+    }
+
+    @Test
+    public void childBranchCommitChildTest() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns = builderProvider.newBuilder()
+                .setClusterId(1)
+                .setDocumentStore(store).build();
+
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("a");
+        merge(ns, builder);
+
+        builder = ns.getRoot().builder();
+        builder.child("a").child("b");
+        persistToBranch(builder);
+
+        merge(ns, builder);
+
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+    }
+
+    @Test
+    public void manyBranchCommitsDepthTest() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns = builderProvider.newBuilder()
+                .setClusterId(1)
+                .setDocumentStore(store).build();
+
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("a");
+        persistToBranch(builder);
+
+        builder.getChildNode("a").child("b");
+        persistToBranch(builder);
+
+        builder.getChildNode("a").getChildNode("b").child("c");
+        persistToBranch(builder);
+
+        builder.getChildNode("a").getChildNode("b").getChildNode("c").child("d");
+        persistToBranch(builder);
+
+        builder.getChildNode("a").getChildNode("b").getChildNode("c").getChildNode("d").child("e");
+        persistToBranch(builder);
+
+        merge(ns, builder);
+
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+    }
+
+    @Test
+    public void manyBranchCommitsWidthTest() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns = builderProvider.newBuilder()
+                .setClusterId(1)
+                .setDocumentStore(store).build();
+
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("a");
+        persistToBranch(builder);
+
+        builder.child("b");
+        persistToBranch(builder);
+
+        builder.child("c");
+        persistToBranch(builder);
+
+        builder.child("d");
+        persistToBranch(builder);
+
+        builder.child("e");
+        persistToBranch(builder);
+
+        merge(ns, builder);
+
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+    }
+
+    @Test
+    public void mixedPre18BranchTest() throws Exception {
+        MemoryDocumentStore store = new MemoryDocumentStore();
+        DocumentNodeStore ns = builderProvider.newBuilder()
+                .setClusterId(1)
+                .setDocumentStore(store).build();
+        // step 1 : create an initial structure /a/b/c
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("a").child("b").child("c").setProperty("commit", "1");
+        merge(ns, builder);
+
+        // step 2 : create a branch commit incl. with a child under an existing node, ie /a/b/c/d..
+        builder = ns.getRoot().builder();
+        builder.child("a").setProperty("commit", "2");
+        builder.child("a").child("b").child("c").child("d").child("e").setProperty("commit", "2");
+        persistToBranch(builder);
+        merge(ns, builder);
+
+        // step 3 : verify that "_bc" are set correctly : on all except /a/b (where nothing was changed)
+        assertNotNull(store.find(NODES, Utils.getIdFromPath("/")).get("_bc"));
+        assertNotNull(store.find(NODES, Utils.getIdFromPath("/a")).get("_bc"));
+        assertNull(store.find(NODES, Utils.getIdFromPath("/a/b")).get("_bc"));
+        assertNotNull(store.find(NODES, Utils.getIdFromPath("/a/b/c")).get("_bc"));
+        assertNotNull(store.find(NODES, Utils.getIdFromPath("/a/b/c/d")).get("_bc"));
+        assertNotNull(store.find(NODES, Utils.getIdFromPath("/a/b/c/d/e")).get("_bc"));
+
+        // step 4 : verify the "_bc" are set correctly by backgroundSweep()/forceBackgroundSweep()
+
+        // step 4b: /a/b/c did not get a "_bc" because it only contained "_commitRoot" but no other changes, and
+        //          https://github.com/apache/jackrabbit-oak/blob/d35346d4d446908c7019e931cb54d88824c1a637/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java#L179
+        //          only went through document that had changes in properties in PROPERTY_OR_DELETED
+
+        // step 4c: / also failed with a similar reason as the above, except the root
+        //          only has changes in "_revisions" (not "_commitRoot")
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+    }
+
+    @Test
     public void orphanedBranchTest() {
         MemoryDocumentStore store = new MemoryDocumentStore();
         DocumentNodeStore ns = builderProvider.newBuilder()
@@ -101,6 +234,8 @@ public class BranchTest {
         ns = builderProvider.newBuilder()
                 .setDocumentStore(store).build();
         assertFalse(ns.getRoot().hasProperty("p"));
+
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
     }
 
     @Test
@@ -136,6 +271,8 @@ public class BranchTest {
         DocumentNodeState state = root.getNodeAtRevision(ns, br, null);
         assertNotNull(state);
         assertEquals("a", state.getString("p"));
+
+        Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
     }
 
     private void assertModifiedPaths(Iterable<Path> actual, String... expected) {

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java Wed Oct 14 08:47:54 2020
@@ -105,7 +105,7 @@ public class DocumentNodeStoreSweepIT ex
     }
 
 
-    private static boolean isClean(DocumentNodeStore ns, String path) {
+    static boolean isClean(DocumentNodeStore ns, String path) {
         // use find that also reads from the cache
         NodeDocument doc = ns.getDocumentStore().find(NODES, Utils.getIdFromPath(path));
         for (Revision c : doc.getAllChanges()) {
@@ -144,7 +144,7 @@ public class DocumentNodeStoreSweepIT ex
         store.fail().never();
     }
 
-    private String createUncommittedChanges(DocumentNodeStore ns,
+    static String createUncommittedChanges(DocumentNodeStore ns,
                                           FailingDocumentStore store) throws Exception {
         ns.setMaxBackOffMillis(0);
         NodeBuilder builder = ns.getRoot().builder();

Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java Wed Oct 14 08:47:54 2020
@@ -28,6 +28,8 @@ import org.apache.jackrabbit.oak.plugins
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 
+import java.util.LinkedList;
+
 /**
  * Wraps a document store and can be instructed to fail operations.
  */
@@ -43,6 +45,8 @@ class FailingDocumentStore extends Docum
 
     private Type exceptionType = Type.GENERIC;
 
+    private List<Collection<? extends Document>> collectionIncludeList;
+
     class Fail {
 
         private Fail() {
@@ -65,6 +69,7 @@ class FailingDocumentStore extends Docum
             numFailures.set(0);
             failAfter.set(Long.MAX_VALUE);
             exceptionType = Type.GENERIC;
+            collectionIncludeList = null;
         }
 
         void once() {
@@ -79,6 +84,14 @@ class FailingDocumentStore extends Docum
             p = probability;
             return this;
         }
+
+        Fail on(Collection<? extends Document> collectionInclude) {
+            if (collectionIncludeList == null) {
+                collectionIncludeList = new LinkedList<>();
+            }
+            collectionIncludeList.add(collectionInclude);
+            return this;
+        }
     }
 
     FailingDocumentStore(DocumentStore store, long seed) {
@@ -101,7 +114,7 @@ class FailingDocumentStore extends Docum
     @Override
     public <T extends Document> void remove(Collection<T> collection,
                                             String key) {
-        maybeFail();
+        maybeFail(collection);
         super.remove(collection, key);
     }
 
@@ -120,7 +133,7 @@ class FailingDocumentStore extends Docum
         int num = 0;
         // remove individually
         for (Map.Entry<String, Long> rm : toRemove.entrySet()) {
-            maybeFail();
+            maybeFail(collection);
             num += super.remove(collection, singletonMap(rm.getKey(), rm.getValue()));
         }
         return num;
@@ -132,7 +145,7 @@ class FailingDocumentStore extends Docum
                                            long startValue,
                                            long endValue)
             throws DocumentStoreException {
-        maybeFail();
+        maybeFail(collection);
         return super.remove(collection, indexedProperty, startValue, endValue);
     }
 
@@ -141,7 +154,7 @@ class FailingDocumentStore extends Docum
                                                List<UpdateOp> updateOps) {
         // create individually
         for (UpdateOp op : updateOps) {
-            maybeFail();
+            maybeFail(collection);
             if (!super.create(collection, singletonList(op))) {
                 return false;
             }
@@ -152,7 +165,7 @@ class FailingDocumentStore extends Docum
     @Override
     public <T extends Document> T createOrUpdate(Collection<T> collection,
                                                  UpdateOp update) {
-        maybeFail();
+        maybeFail(collection);
         return super.createOrUpdate(collection, update);
     }
 
@@ -170,12 +183,13 @@ class FailingDocumentStore extends Docum
     @Override
     public <T extends Document> T findAndUpdate(Collection<T> collection,
                                                 UpdateOp update) {
-        maybeFail();
+        maybeFail(collection);
         return super.findAndUpdate(collection, update);
     }
 
-    private void maybeFail() {
-        if (random.nextFloat() < p || failAfter.getAndDecrement() <= 0) {
+    private <T extends Document> void maybeFail(Collection<T> collection) {
+        if ((collectionIncludeList == null || collectionIncludeList.contains(collection)) &&
+                (random.nextFloat() < p || failAfter.getAndDecrement() <= 0)) {
             if (numFailures.getAndDecrement() > 0) {
                 throw new DocumentStoreException("write operation failed", null, exceptionType);
             }