You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by st...@apache.org on 2020/10/14 08:47:55 UTC
svn commit: r1882480 [1/2] - in /jackrabbit/oak/trunk/oak-store-document/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/util/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: stefanegli
Date: Wed Oct 14 08:47:54 2020
New Revision: 1882480
URL: http://svn.apache.org/viewvc?rev=1882480&view=rev
Log:
OAK-9176 : fix sweep to not miss _bc entries plus introducing sweep2, a one-time half-CPU-using repository traversal that fixes missing _bc entries that the original sweep was missing: A repository that was ever running with Oak older than 1.8 and that upgraded to a version without this OAK-9176 fix might have missing _bc entries - sweep2 will fix that. Sweep2 can be disabled if it causes problems: via a System Property -Doak.documentMK.disableSweep2=true
Added:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Test.java (with props)
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/Sweep2TestHelper.java (with props)
Modified:
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java
jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/V18BranchCommitTest.java
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Oct 14 08:47:54 2020
@@ -48,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -203,6 +204,10 @@ public final class DocumentNodeStore
private final int createOrUpdateBatchSize = SystemPropertySupplier.create("oak.documentMK.createOrUpdateBatchSize", 1000)
.loggingTo(LOG).get();
+ public static final String SYS_PROP_DISABLE_SWEEP2 = "oak.documentMK.disableSweep2";
+ private boolean disableSweep2 = SystemPropertySupplier.create(SYS_PROP_DISABLE_SWEEP2, Boolean.FALSE).loggingTo(LOG)
+ .get();
+
/**
* The document store without potentially lease checking wrapper.
*/
@@ -350,6 +355,12 @@ public final class DocumentNodeStore
private Thread backgroundSweepThread;
/**
+ * Extra, one-off sweep2 background task for fixing OAK-9176 ie missing _bc
+ * on parents and root.
+ */
+ private Thread backgroundSweep2Thread;
+
+ /**
* The sweep revision vector. Revisions for trunk commits older than this
* can safely be considered committed without looking up the commit value
* on the commit root document.
@@ -713,6 +724,56 @@ public final class DocumentNodeStore
// triggering sweep
runBackgroundUpdateOperations();
+ // check if we need a sweep2 *before* doing a backgroundSweep.
+ // this enables us to detect a direct Oak <= 1.6 upgrade situation,
+ // where a sweep2 is *not* needed.
+
+ // there are 3 different cases with sweep[1]/sweep2:
+ // 1) Oak <= 1.6 direct upgrade:
+ // -> no sweep2 is needed as a sweep1 is needed anyway and sweep2
+ // from now on happens as part of it (with the OAK-9176 fix)
+ // 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
+ // -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
+ // never ran <= 1.6)
+ // 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
+ // -> A (full) sweep2 is needed. This is the main case of OAK-9176.
+
+ // In case 3 there is a valid, recent "_sweepRev" - and
+ // we can go ahead and do a "quick" backgroundSweep() here
+ // before continuing, to unblock the startup.
+ // After that, an async/background task is started for sweep2.
+
+ // which of cases 1-3 we have is determined via 'sweep2LockIfNecessary'
+ // and recorded in the settings collection.
+
+ // except for this case detection (which acquires a "sweep2 lock" if needed)
+ // we can otherwise continue normally. That means, a sweep1 can
+ // be considered as usual.
+ // Note that for case 3, doing this normal sweep1 can now also
+ // fix some "_bc" - which before OAK-9176 were missing
+ // and which sweep2 would separately fix as well - but this is not a problem.
+
+ // Note that by setting the SYS_PROP_DISABLE_SWEEP2 system property
+ // the sweep2 is bypassed and the sweep2 status is explicitly stored as "swept".
+ final long sweep2Lock;
+ if (disableSweep2) {
+ try {
+ final Sweep2StatusDocument sweep2Status = Sweep2StatusDocument.readFrom(store);
+ if (sweep2Status == null || !sweep2Status.isSwept()) {
+ // setting the disableSweep2 flag stores this in the repository
+ Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId);
+ }
+ } catch(Exception e) {
+ LOG.warn("<init> sweep2 is diabled as instructed by system property ("
+ + SYS_PROP_DISABLE_SWEEP2 + "=true) - however, got an Exception"
+ + " while storing sweep2 status in the settings collection: " + e, e);
+ }
+ sweep2Lock = -1;
+ } else {
+ // So: unless sweep2 is disabled: acquire sweep2 if one is (maybe) necessary
+ sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
+ }
+
// perform an initial document sweep if needed
// this may be long running if there is no sweep revision
// for this clusterId (upgrade from Oak <= 1.6).
@@ -721,6 +782,15 @@ public final class DocumentNodeStore
backgroundUpdateThread.start();
backgroundSweepThread.start();
+
+ if (sweep2Lock >= 0) {
+ // sweep2 is necessary - so start a sweep2 background task
+ backgroundSweep2Thread = new Thread(
+ new BackgroundSweep2Operation(this, isDisposed, sweep2Lock),
+ "DocumentNodeStore background sweep2 thread " + threadNamePostfix);
+ backgroundSweep2Thread.setDaemon(true);
+ backgroundSweep2Thread.start();
+ }
}
persistentCache = builder.getPersistentCache();
@@ -757,7 +827,8 @@ public final class DocumentNodeStore
Utils.joinQuietly(backgroundReadThread,
backgroundUpdateThread,
- backgroundSweepThread);
+ backgroundSweepThread,
+ backgroundSweep2Thread);
DocumentStoreException ex = null;
@@ -2496,6 +2567,158 @@ public final class DocumentNodeStore
}
+ /**
+ * Executes the sweep2 (from within a background thread)
+ * @param sweep2Lock the lock value originally acquired
+ * @return true if sweep2 is done or no longer needed, false otherwise (in which case it should be retried)
+ * @throws DocumentStoreException
+ */
+ boolean backgroundSweep2(long sweep2Lock) throws DocumentStoreException {
+ if (sweep2Lock == 0) {
+ sweep2Lock = Sweep2Helper.acquireSweep2LockIfNecessary(store, clusterId);
+ if (sweep2Lock == 0) {
+ // still not well defined, retry in a minute (done in BackgroundSweep2Operation)
+ return false;
+ }
+ if (sweep2Lock == -1) {
+ // then we're done
+ return true;
+ }
+ }
+ // sweep2Lock > 0, the local instance holds the lock
+ Sweep2StatusDocument statusDoc = Sweep2StatusDocument.readFrom(store);
+ if (statusDoc != null /*should never be null as we hold the lock, but let's play safe anyway .. */
+ && statusDoc.isChecking()) {
+ // very likely no 'isSweep2Necessary' might not have been done yet, let's do it now
+ LOG.info("backgroundSweep2: checking whether sweep2 is necessary...");
+ if (!Sweep2Helper.isSweep2Necessary(store)) {
+ LOG.info("backgroundSweep2: sweep2 check determined a sweep2 is NOT necessary. Marking sweep2 status as 'swept'.");
+ if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId)) {
+ LOG.error("backgroundSweep2 : failed to update the sweep2 status to 'swept'. Aborting sweep2 for now.");
+ }
+ return true;
+ }
+ LOG.info("backgroundSweep2: sweep2 check determined a sweep2 IS necessary. Marking sweep2 status as 'sweeping'.");
+ }
+ // update the lock status one more time to ensure no check can conclude sweep2 is not necessary anymore
+ sweep2Lock = Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId,
+ true /* this true here is what's relevant: locks-in the 'sweeping' status */);
+ if (sweep2Lock == 0) {
+ // something came in between, retry later
+ LOG.info("backgroundSweep2: could not update the sweep2 lock to sweeping, someone got in between. Retry later.");
+ return false;
+ } else if (sweep2Lock == -1) {
+ // odd, someone else concluded we're done
+ LOG.info("backgroundSweep2: meanwhile, someone else concluded sweep2 is done.");
+ return true;
+ }
+
+ // compile the list of clusterIds for which sweep2 should be done.
+ // in an ideal situation sweep(1) would have been done for all clusterIds,
+ // and in that case "_sweepRev" will contain all clusterIds ever used.
+ // However, it is a supported situation that not sweep(1) was not run for all clusterIds,
+ // and in this case sweep2 must also not be done for all, but only for those clusterIds.
+ List<Integer> includedClusterIds = new LinkedList<>();
+ for (Revision aSweepRev : sweepRevisions) {
+ includedClusterIds.add(aSweepRev.getClusterId());
+ }
+
+ // at this point we did properly acquire a lock and can go ahead doing sweep2
+ LOG.info("backgroundSweep2: starting sweep2 (includedClusterIds={})", includedClusterIds);
+ int num = forceBackgroundSweep2(includedClusterIds);
+ LOG.info("backgroundSweep2: finished sweep2, num swept=" + num);
+
+ // release the lock.
+ // Note that in theory someone else could have released our lock, or that
+ // the sweep2 status was deleted - that actually doesn't matter:
+ // we just went through a full, successful sweep2 and we want to record it
+ // that way - irrespective of any interference with the status
+ // -> hence the 'force' aspect of releasing here
+ if (!Sweep2StatusDocument.forceReleaseSweep2LockAndMarkSwept(store, clusterId)) {
+ LOG.error("backgroundSweep2 : sweep2 finished but we failed to update the sweep2 status accordingly");
+ }
+ return true;
+ }
+
+ /**
+ * Executes a sweep2 either only for the provided or for all clusterIds otherwise
+ * (which case applies depends on the status of sweep(1) in the repository).
+ * @param includedClusterIds restrict sweep2 to only these clusterIds - or do it for
+ * all clusterIds if this list is empty or null.
+ * @return number of documents swept
+ * @throws DocumentStoreException
+ */
+ int forceBackgroundSweep2(List<Integer> includedClusterIds) throws DocumentStoreException {
+ final RevisionVector emptySweepRevision = new RevisionVector();
+ CommitValueResolver cvr = new CachingCommitValueResolver(
+ 0 /* disable caching for sweep2 as caching has a risk of propagating wrong values */,
+ () -> emptySweepRevision);
+ MissingBcSweeper2 sweeper = new MissingBcSweeper2(this, cvr, includedClusterIds, isDisposed);
+ LOG.info("Starting document sweep2. Head: {}, starting at 0", getHeadRevision());
+ Iterable<NodeDocument> docs = lastRevSeeker.getCandidates(0);
+ try {
+ final AtomicInteger numUpdates = new AtomicInteger();
+
+ sweeper.sweep2(docs, new NodeDocumentSweepListener() {
+ @Override
+ public void sweepUpdate(final Map<Path, UpdateOp> updates)
+ throws DocumentStoreException {
+ // create a synthetic commit. this commit does not have any
+ // changes, we just use it to create a journal entry for
+ // cache invalidation and apply the sweep updates
+ backgroundOperationLock.readLock().lock();
+ try {
+ boolean success = false;
+ Revision r = commitQueue.createRevision();
+ try {
+ commitQueue.done(r, new CommitQueue.Callback() {
+ @Override
+ public void headOfQueue(@NotNull Revision revision) {
+ writeUpdates(updates, revision);
+ }
+ });
+ success = true;
+ } finally {
+ if (!success && commitQueue.contains(r)) {
+ commitQueue.canceled(r);
+ }
+ }
+ } finally {
+ backgroundOperationLock.readLock().unlock();
+ }
+ }
+
+ private void writeUpdates(Map<Path, UpdateOp> updates,
+ Revision revision)
+ throws DocumentStoreException {
+ // create journal entry
+ JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+ entry.modified(updates.keySet());
+ Revision r = newRevision().asBranchRevision();
+ if (!store.create(JOURNAL, singletonList(entry.asUpdateOp(r)))) {
+ String msg = "Unable to create journal entry for " +
+ "document invalidation. Will be retried with " +
+ "next background sweep2 operation.";
+ throw new DocumentStoreException(msg);
+ }
+ changes.invalidate(Collections.singleton(r));
+ unsavedLastRevisions.put(ROOT, revision);
+ RevisionVector newHead = getHeadRevision().update(revision);
+ setRoot(newHead);
+ commitQueue.headRevisionChanged();
+
+ store.createOrUpdate(NODES, Lists.newArrayList(updates.values()));
+ numUpdates.addAndGet(updates.size());
+ LOG.debug("Background sweep2 updated {}", updates.keySet());
+ }
+ });
+
+ return numUpdates.get();
+ } finally {
+ Utils.closeIfCloseable(docs);
+ }
+ }
+
private int backgroundSweep() throws DocumentStoreException {
// decide if it needs to run
Revision head = getHeadRevision().getRevision(clusterId);
@@ -3296,6 +3519,50 @@ public final class DocumentNodeStore
}
}
+ /**
+ * Background sweep2 operation (also see OAK-9176 for details/context).
+ */
+ private static class BackgroundSweep2Operation extends NodeStoreTask {
+
+ private final long sweep2Lock;
+ private int retryCount = 0;
+
+ BackgroundSweep2Operation(DocumentNodeStore nodeStore,
+ AtomicBoolean isDisposed,
+ long sweep2Lock) {
+ // default asyncDelay is 1000ms == 1sec
+ // the sweep2 is fine to run every 60sec by default as it is not time critical
+ // to achieve this we're doing a Math.min(60sec, 60 * getAsyncDelay())
+ super(nodeStore, isDisposed,
+ Suppliers.ofInstance(Math.min(60000, 60 * nodeStore.getAsyncDelay())));
+ if (sweep2Lock < 0) {
+ throw new IllegalArgumentException("sweep2Lock must not be negative");
+ }
+ this.sweep2Lock = sweep2Lock;
+ }
+
+ @Override
+ protected void execute(@NotNull DocumentNodeStore nodeStore) {
+ if (retryCount > 0) {
+ LOG.info("BackgroundSweep2Operation.execute: retrying sweep2. retryCount=" + retryCount);
+ }
+ if (nodeStore.backgroundSweep2(sweep2Lock)) {
+ done();
+ }
+ // log the fact that we noticed an ongoing sweep2 - is only logged once every 5min
+ // until either the other instance finishes or crashes, at which point we or another
+ // instance will pick up
+ if (++retryCount % 5 == 0) {
+ LOG.info("BackgroundSweep2Operation.execute: another instance is currently (still) executing sweep2. "
+ + "Waiting for its outcome. retryCount=" + retryCount);
+ }
+ }
+
+ private void done() {
+ ref.clear();
+ }
+ }
+
private static class BackgroundLeaseUpdate extends NodeStoreTask {
/** OAK-4859 : log if time between two renewClusterIdLease calls is too long **/
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java?rev=1882480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java Wed Oct 14 08:47:54 2020
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.partition;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.COMMITROOT_OR_REVISIONS;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+
+/**
+ * The {@code MissingBcSweeper2} is used for the so-called sweep2, which is
+ * a repository traversal updating documents that have missing branch commit ("_bc")
+ * properties (details see OAK-9176).
+ * This class is similar to NodeDocumentSweeper as it is based on the same principles,
+ * with a few notable exceptions (like it only looks at _commitRoot and _revisions etc).
+ * And due to these exceptions the class is forked rather than modified/subclasses
+ * (also to enable later refactoring of the NodeDocumentSweeper itself).
+ * <p>
+ * This class is not thread-safe.
+ */
+final class MissingBcSweeper2 {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MissingBcSweeper2.class);
+
+ private static final int YIELD_SIZE = 500;
+
+ private static final int INVALIDATE_BATCH_SIZE = 100;
+
+ private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+
+ private final RevisionContext context;
+
+ private final CommitValueResolver commitValueResolver;
+
+ private final int executingClusterId;
+
+ private final List<Integer> includedClusterIds;
+
+ private final RevisionVector headRevision;
+
+ private final AtomicBoolean isDisposed;
+
+ private long totalCount;
+ private long lastCount;
+ private long startOfScan;
+ private long lastLog;
+
+ /**
+ * Creates a new sweeper v2 for the given context..
+ *
+ * @param context the revision context.
+ */
+ MissingBcSweeper2(RevisionContext context,
+ CommitValueResolver commitValueResolver,
+ List<Integer> includedClusterIds,
+ AtomicBoolean isDisposed) {
+ this.context = checkNotNull(context);
+ this.commitValueResolver = checkNotNull(commitValueResolver);
+ this.executingClusterId = context.getClusterId();
+ this.includedClusterIds = includedClusterIds == null ? new LinkedList<>() : Collections.unmodifiableList(includedClusterIds);
+ this.headRevision= context.getHeadRevision();
+ this.isDisposed = isDisposed;
+ }
+
+ /**
+ * Performs a sweep2 and reports the required updates to the given sweep
+ * listener.
+ *
+ * @param documents the documents to sweep
+ * @param listener the listener to receive required sweep update operations.
+ * @throws DocumentStoreException if reading from the store or writing to
+ * the store failed.
+ */
+ void sweep2(@NotNull Iterable<NodeDocument> documents,
+ @NotNull NodeDocumentSweepListener listener)
+ throws DocumentStoreException {
+ performSweep2(documents, checkNotNull(listener));
+ }
+
+ //----------------------------< internal >----------------------------------
+
+ private void performSweep2(Iterable<NodeDocument> documents,
+ NodeDocumentSweepListener listener)
+ throws DocumentStoreException {
+ totalCount = 0;
+ lastCount = 0;
+ startOfScan = context.getClock().getTime();
+ lastLog = startOfScan;
+
+ Iterable<Map.Entry<Path, UpdateOp>> ops = sweepOperations(documents);
+ for (List<Map.Entry<Path, UpdateOp>> batch : partition(ops, INVALIDATE_BATCH_SIZE)) {
+ Map<Path, UpdateOp> updates = newHashMap();
+ for (Map.Entry<Path, UpdateOp> entry : batch) {
+ updates.put(entry.getKey(), entry.getValue());
+ }
+ listener.sweepUpdate(updates);
+ if (isDisposed.get()) {
+ throw new DocumentStoreException("sweep2 interrupted by shutdown");
+ }
+ }
+ LOG.debug("Document sweep2 finished");
+ }
+
+ private Iterable<Map.Entry<Path, UpdateOp>> sweepOperations(
+ final Iterable<NodeDocument> docs) {
+ return filter(transform(docs,
+ new Function<NodeDocument, Map.Entry<Path, UpdateOp>>() {
+
+ int yieldCnt = 0;
+ long lastYield = context.getClock().getTime();
+
+ @Override
+ public Map.Entry<Path, UpdateOp> apply(NodeDocument doc) {
+ if (++yieldCnt >= YIELD_SIZE) {
+ try {
+ final long now = context.getClock().getTime();
+ final long timeSinceLastYield = now - lastYield;
+ // wait the same amount of time that passed since last yield
+ // that corresponds to roughly 50% throttle (ignoring the min 1ms sleep)
+ final long waitUntil = now + Math.max(1, timeSinceLastYield);
+ context.getClock().waitUntil(waitUntil);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ lastYield = context.getClock().getTime();
+ yieldCnt = 0;
+ }
+ return immutableEntry(doc.getPath(), sweepOne(doc));
+ }
+ }), new Predicate<Map.Entry<Path, UpdateOp>>() {
+ @Override
+ public boolean apply(Map.Entry<Path, UpdateOp> input) {
+ return input.getValue() != null;
+ }
+ });
+ }
+
+ private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
+ UpdateOp op = null;
+ // the blueprint NodeDocumentSweeper.sweepOne goes through
+ // PROPERTY_OR_DELETED_OR_COMMITROOT here, as that's the new full sweep1.
+ // the sweep2 though, only goes through COMMITROOT_OR_REVISIONS,
+ // as that's what was left out by the original sweep1:
+ // - COMMITROOT : for new child (parent)
+ // - REVISIONS : for commit roots (root for branch commits)
+ for (String property : filter(doc.keySet(), COMMITROOT_OR_REVISIONS)) {
+ Map<Revision, String> valueMap = doc.getLocalMap(property);
+ for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+ Revision rev = entry.getKey();
+
+ if (!includedClusterIds.isEmpty() && !includedClusterIds.contains(rev.getClusterId())) {
+ // sweep2 is only done for those clusterIds that went through sweep1 (before 1.8).
+ // includedClusterIds contains the latter list of clusterIds.
+ // For testing, the code also supports going through all clusterIds.
+ // (in this case the includedClusterIds list is empty).
+ // Note that sweep2 is never triggered if "_sweepRev" is empty,
+ // so the empty case here never applies for production.
+ continue;
+ }
+
+ Revision cRev = getCommitRevision(doc, rev);
+ if (cRev == null) {
+ // originally the uncommitted case
+ // however, sweep2 doesn't touch anything uncommitted
+ } else if (cRev.equals(rev)) {
+ // originally the committed case
+ // however, that was only logging and sweep2 is not about that,
+ // hence nothing to be done here
+ } else {
+ // this is what sweep2 is about : adding _bc for _commitRoot and _revisions
+ if (op == null) {
+ op = createUpdateOp(doc);
+ }
+ committedBranch(doc, property, rev, cRev, op);
+ }
+ }
+ }
+
+ totalCount++;
+ lastCount++;
+ long now = context.getClock().getTime();
+ long lastElapsed = now - lastLog;
+
+ if (lastElapsed >= LOGINTERVALMS) {
+ TimeDurationFormatter df = TimeDurationFormatter.forLogging();
+
+ long totalElapsed = now - startOfScan;
+ long totalRateMin = (totalCount * TimeUnit.MINUTES.toMillis(1)) / totalElapsed;
+ long lastRateMin = (lastCount * TimeUnit.MINUTES.toMillis(1)) / lastElapsed;
+
+ String restrictionMsg;
+ if (includedClusterIds.isEmpty()) {
+ restrictionMsg = "unrestricted, ie for all clusterIds";
+ } else {
+ restrictionMsg = "restricted to clusterIds " + includedClusterIds;
+ }
+ String message = String.format(
+ "Sweep2 executed by cluster node [%d] (%s): %d nodes scanned in %s (~%d/m) - last interval %d nodes in %s (~%d/m)",
+ executingClusterId, restrictionMsg, totalCount, df.format(totalElapsed, TimeUnit.MILLISECONDS), totalRateMin, lastCount,
+ df.format(lastElapsed, TimeUnit.MILLISECONDS), lastRateMin);
+
+ LOG.info(message);
+ lastLog = now;
+ lastCount = 0;
+ }
+
+ return op == null ? null : op.hasChanges() ? op : null;
+ }
+
+ /**
+ * Returns {@code true} if the given revision is marked as a branch commit
+ * on the document. This method only checks local branch commit information
+ * available on the document ({@link NodeDocument#getLocalBranchCommits()}).
+ * If the given revision is related to a branch commit that was created
+ * prior to Oak 1.8, the method will return {@code false}.
+ *
+ * @param rev a revision.
+ * @param doc the document to check.
+ * @return {@code true} if the revision is marked as a branch commit;
+ * {@code false} otherwise.
+ */
+ private boolean isV18BranchCommit(Revision rev, NodeDocument doc) {
+ return doc.getLocalBranchCommits().contains(rev);
+ }
+
+ private void committedBranch(NodeDocument doc,
+ String property,
+ Revision rev,
+ Revision cRev,
+ UpdateOp op) {
+ boolean newerThanHead = headRevision.isRevisionNewer(cRev);
+ if (LOG.isDebugEnabled()) {
+ String msg = newerThanHead ? " (newer than head)" : "";
+ LOG.debug("Committed branch change on {}, {} @ {}/{}{}",
+ op.getId(), property, rev, cRev, msg);
+ }
+ if (!isV18BranchCommit(rev, doc)) {
+ NodeDocument.setBranchCommit(op, rev);
+ }
+ }
+
+ private static UpdateOp createUpdateOp(NodeDocument doc) {
+ return new UpdateOp(doc.getId(), false);
+ }
+
+ private String getCommitValue(@NotNull Revision changeRevision,
+ @NotNull NodeDocument doc) {
+ return commitValueResolver.resolve(changeRevision, doc);
+ }
+
+ @Nullable
+ private Revision getCommitRevision(final NodeDocument doc,
+ final Revision rev)
+ throws DocumentStoreException {
+ String cv = /*context.*/getCommitValue(rev, doc);
+ if (cv == null) {
+ return null;
+ }
+ return Utils.resolveCommitRevision(rev, cv);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingBcSweeper2.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Wed Oct 14 08:47:54 2020
@@ -231,7 +231,7 @@ public final class NodeDocument extends
* and all previous non-branch revisions by this cluster node can be
* considered committed.
*/
- private static final String SWEEP_REV = "_sweepRev";
+ static final String SWEEP_REV = "_sweepRev";
//~----------------------------< Split Document Types >
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java Wed Oct 14 08:47:54 2020
@@ -40,7 +40,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeCommitRoot;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeRevision;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setDeletedOnce;
-import static org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS;
/**
* The {@code NodeDocumentSweeper} is responsible for removing uncommitted
@@ -56,6 +56,9 @@ final class NodeDocumentSweeper {
private static final long LOGINTERVALMS = TimeUnit.MINUTES.toMillis(1);
+ /** holds the Predicate actually used in sweepOne. This is modifiable ONLY FOR TESTING PURPOSE */
+ static Predicate<String> SWEEP_ONE_PREDICATE = PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS;
+
private final RevisionContext context;
private final int clusterId;
@@ -178,7 +181,12 @@ final class NodeDocumentSweeper {
private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
UpdateOp op = createUpdateOp(doc);
- for (String property : filter(doc.keySet(), PROPERTY_OR_DELETED)) {
+ // go through PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS, whereas :
+ // - PROPERTY : for content changes
+ // - DELETED : for new node (this)
+ // - COMMITROOT : for new child (parent)
+ // - REVISIONS : for commit roots (root for branch commits)
+ for (String property : filter(doc.keySet(), SWEEP_ONE_PREDICATE)) {
Map<Revision, String> valueMap = doc.getLocalMap(property);
for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
Revision rev = entry.getKey();
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java?rev=1882480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java Wed Oct 14 08:47:54 2020
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper methods for sweep2 functionality introduced with OAK-9176.
+ * Kept separate from DocumentNodeStore to limit its size.
+ */
+public class Sweep2Helper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Sweep2Helper.class);
+
+ static boolean isSweep2Necessary(DocumentStore store) {
+ NodeDocument rootNodeDoc = store.find(Collection.NODES, Utils.getIdFromPath("/"));
+ if (rootNodeDoc == null) {
+ // that'd be very weird
+ LOG.warn("isSweep2Necessary : cannot get root node - assuming no sweep2 needed");
+ return false;
+ }
+
+ if (rootNodeDoc.get("_sweepRev") == null) {
+ // this indicates a pre 1.8 repository upgrade (case 1).
+ // no sweep2 is needed as it is embedded in the normal sweep[1].
+ return false;
+ }
+
+ // in this case we have a post (>=) 1.8 repository
+ // which might or might not have previously been a pre (<) 1.8
+ // and we need to distinguish those 2 cases - which, to repeat, are:
+ // 2) Oak >= 1.8 which never did an Oak <= 1.6 upgrade:
+ // -> no sweep2 is needed as OAK-9176 doesn't apply (the repository
+ // never ran <= 1.6)
+ // 3) Oak >= 1.8 which was previously doing an Oak <= 1.6 upgrade:
+ // -> A (full) sweep2 is needed. This is the main case of OAK-9176.
+ Map<Revision, String> bcValueMap = rootNodeDoc.getValueMap("_bc");
+ Map<Revision, String> valueMap = rootNodeDoc.getValueMap(NodeDocument.REVISIONS);
+ for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+ Revision rev = entry.getKey();
+
+ // consider all clusterIds..
+ String cv = entry.getValue();
+ if (cv == null) {
+ // skip
+ continue;
+ }
+ Revision cRev = Utils.resolveCommitRevision(rev, cv);
+ if (cRev.equals(rev)) {
+ // fine
+ continue;
+ } else {
+ // then rev should be in the branch commit list
+ if (bcValueMap.containsKey(rev)) {
+ // all good
+ continue;
+ }
+ // otherwise the "_bc" does *not* contain a branch commit
+ // which we suspect it should contain.
+ // that is an indicator of requiring a sweep2
+ // it might not, however, be sufficient, ie it might (really?)
+ // be that it got garbage collected away - in which
+ // case we'd be doing an unnecessary sweep2.
+ // so this is case 3)
+ return true;
+ }
+ }
+
+ // this is case 2
+ return false;
+ }
+
+ /**
+ * Acquires a cluster singleton lock for doing a sweep2 unless a sweep2 was already done.
+ * <p/>
+ * 'If necessary' refers to the sweep2 status in the settings collection - no further
+ * check is done based on content in the nodes collection in this method.
+ * @return <ul>
+ * <li>
+ * >0: the lock was successfully acquired and a sweep2 must now be done
+ * by the local instance. The returned value represents a simple lock value
+ * which needs to be provided for a successful unlock later on.
+ * </li>
+ * <li>
+ * 0: a sweep2 maybe must be done, but cannot at this point. A later retry should be done.
+ * This can happen when another instance is busy doing a sweep2 (and we
+ * monitor that other instance until it is done) - or because no discovery-lite
+ * status is available yet (so we don't know if the current owner of the sweep2 status
+ * crashed or not and how the local instance fits into the picture)
+ * </li>
+ * <li>
+ * -1: no sweep2 is necessary
+ * </li>
+ * </ul>
+ */
+ static long acquireSweep2LockIfNecessary(DocumentStore store, int clusterId) {
+ Sweep2StatusDocument status = Sweep2StatusDocument.readFrom(store);
+ if (status != null && status.isSwept()) {
+ // nothing left to do.
+ // this should be the most frequent case.
+ return -1;
+ }
+
+ if (status == null) {
+ return Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId, false);
+ }
+ // otherwise a status could have been set by ourselves or by another instance
+
+ int lockClusterId = status.getLockClusterId();
+ if (lockClusterId == clusterId) {
+ // the local instance was the originator of the sweeping lock, but likely crashed
+ // hence we need to redo the work from scratch as we can't know if we finished it properly
+ LOG.info("acquireSweep2LockIfNecessary : sweep2 status was sweeping, locked by own instance ({}). "
+ + "Another sweep2 is required.",
+ clusterId);
+ return status.getLockValue();
+ }
+
+ // another instance marked as sweeping - check to see if it is still active or it might have crashed
+ if (ClusterNodeInfoDocument.all(store).stream()
+ .anyMatch(info -> info.getClusterId() == lockClusterId && info.isActive())) {
+ // then another instance is busy sweep2-ing, which is fine.
+ // but we should continue monitoring until that other instance is done
+ LOG.debug("acquireSweep2LockIfNecessary : another instance (id {}) is (still) busy doing a sweep2.",
+ lockClusterId);
+ return 0;
+ }
+
+ // otherwise the lockClusterId is no longer active, so we
+ // try to overwrite/reacquire the lock for us
+ return Sweep2StatusDocument.acquireOrUpdateSweep2Lock(store, clusterId, false);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2Helper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java?rev=1882480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java (added)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java Wed Oct 14 08:47:54 2020
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.ArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the sweep2 status as recorded in the settings collection.
+ */
+public class Sweep2StatusDocument {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Sweep2StatusDocument.class);
+
+ static final String SWEEP2_STATUS_ID = "sweep2Status";
+
+ private static final String STATUS_PROPERTY = "status";
+ private static final String STATUS_VALUE_CHECKING = "checking";
+ private static final String STATUS_VALUE_SWEEPING = "sweeping";
+ private static final String STATUS_VALUE_SWEPT = "swept";
+
+ private static final String LOCK_PROPERTY = "lock";
+ private static final String MOD_COUNT_PROPERTY = "_modCount";
+
+ private static final String SWEPT_BY_PROPERTY = "sweptBy";
+
+ public static Sweep2StatusDocument readFrom(DocumentStore documentStore) {
+ Document doc = documentStore.find(Collection.SETTINGS, SWEEP2_STATUS_ID,
+ -1 /* -1; avoid caching */);
+ if (doc == null) {
+ return null;
+ } else {
+ return new Sweep2StatusDocument(doc);
+ }
+ }
+
+ /**
+ * Acquires the sweep2 lock.
+ * @param documentNodeStore
+ * @param clusterId
+ * @param forceSweepingStatus if false uses the default way to set the sweep2-status,
+ * if set to true (force-)sets sweep2-status to 'sweeping'.
+ * the latter can be used if the caller knows a sweeping is necessary and wants to skip the 'checking' phase explicitly
+ * @return <ul>
+ * <li>
+ * -1 if a sweep2 is definitely not necessary (already swept)
+ * </li>
+ * <li>
+ * 0 if the lock could not be acquired (another instance got in between)
+ * <li>
+ * > 0 if a lock was acquired (in which case this returned value is the lock value, which is always > 0)
+ * </li>
+ * </ul>
+ */
+ public static long acquireOrUpdateSweep2Lock(DocumentStore documentStore,
+ int clusterId, boolean forceSweepingStatus) {
+ Document existingStatusDoc = documentStore.find(Collection.SETTINGS, SWEEP2_STATUS_ID,
+ -1 /* -1; avoid caching */);
+ UpdateOp updateOp = new UpdateOp(SWEEP2_STATUS_ID, true);
+ updateOp.set(LOCK_PROPERTY, clusterId);
+ ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+ updateOps.add(updateOp);
+ if (existingStatusDoc == null) {
+ updateOp.setNew(true);
+ // the first lock is always created in 'checking' state by default
+ // (unless the caller knows otherwise, which would be odd)
+ if (!forceSweepingStatus) {
+ updateOp.set(STATUS_PROPERTY, STATUS_VALUE_CHECKING);
+ } else {
+ // this is unusual - let's log for debugging
+ LOG.warn("acquireOrUpdateSweep2Lock: forced new sweep2 lock directly to state sweeping");
+ updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEEPING);
+ }
+ updateOp.set(MOD_COUNT_PROPERTY, 1L);
+ if (!documentStore.create(Collection.SETTINGS, updateOps)) {
+ LOG.info("acquireOrUpdateSweep2Lock: another instance just acquired the (new) sweep2 lock a few moments ago.");
+ return 0;
+ } else {
+ LOG.info("acquireOrUpdateSweep2Lock: sweep2 status set to "
+ + (forceSweepingStatus ? "sweeping" : "checking") + ", locked for clusterId=" + clusterId);
+ return 1;
+ }
+ } else {
+ final Sweep2StatusDocument existingStatus = new Sweep2StatusDocument(existingStatusDoc);
+ if (existingStatus.isSwept()) {
+ // not needed => -1
+ return -1;
+ }
+ if (existingStatus.getLockClusterId() == clusterId) {
+ // the local instance already has the lock - let's check if the phase is correct
+ if (!forceSweepingStatus || existingStatus.isSweeping()) {
+ // then either the caller wants to go ahead with the default
+ // or with SWEEPING - which is already the case.
+ // In both these cases the lock is just fine, nothing to fiddle.
+ return existingStatus.getLockValue();
+ }
+ }
+ // otherwise we need to adjust the lock - either for phase or lock
+ updateOp.setNew(false);
+ if (forceSweepingStatus) {
+ updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEEPING);
+ }
+ updateOp.equals(MOD_COUNT_PROPERTY, existingStatusDoc.getModCount());
+ final long newModCount = existingStatusDoc.getModCount() + 1;
+ updateOp.set(MOD_COUNT_PROPERTY, newModCount);
+ if (documentStore.findAndUpdate(Collection.SETTINGS, updateOp) == null) {
+ LOG.info("acquireOrUpdateSweep2Lock: another instance just acquired the (expired) sweep2 lock a few moments ago");
+ return 0;
+ } else {
+ if (forceSweepingStatus) {
+ LOG.info("acquireOrUpdateSweep2Lock: sweep2 status set to sweeping, relocked for clusterId=" + clusterId);
+ } else {
+ LOG.info("acquireOrUpdateSweep2Lock: sweep2 status unchanged (is "
+ + existingStatusDoc.get(STATUS_PROPERTY) + "), relocked for clusterId=" + clusterId);
+ }
+ return newModCount;
+ }
+ }
+ }
+
+ /**
+ * Release the sweep2 lock and record swept2 successful.
+ * Note that the clusterId is only for recording purpose - this method
+ * makes no checks on the current owner of the lock
+ * @param documentStore
+ * @param clusterId
+ * @return true if the sweep2 status is now marked swept(2) - false if that failed
+ * (in the latter case the caller can consider retrying the acquire/sweep2/release sequence)
+ */
+ public static boolean forceReleaseSweep2LockAndMarkSwept(DocumentStore documentStore, int clusterId) {
+ Document existing = documentStore.find(Collection.SETTINGS, SWEEP2_STATUS_ID,
+ -1 /* -1; avoid caching */);
+
+ if (existing == null) {
+ // we directly mark the sweep2 as done if no sweep2 is even necessary.
+ // so it is legal that we have no existingSweep2Doc yet
+ // lock is ignored when there was no sweep2Status yet
+ UpdateOp updateOp = new UpdateOp(SWEEP2_STATUS_ID, true);
+ updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEPT);
+ updateOp.setNew(true);
+ updateOp.set(MOD_COUNT_PROPERTY, 1L);
+ updateOp.set(SWEPT_BY_PROPERTY, clusterId);
+ ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+ updateOps.add(updateOp);
+ if (!documentStore.create(Collection.SETTINGS, updateOps)) {
+ LOG.info("forceReleaseSweep2LockAndMarkSwept: another instance just wanted to mark sweep2 as done a few moments ago too.");
+ return false;
+ } else {
+ LOG.info("forceReleaseSweep2LockAndMarkSwept: sweep2 status set to swept by clusterId=" + clusterId);
+ return true;
+ }
+ } else {
+ // there was a lock (probably) - at least there was a sweep2 status
+ // we don't care about what that status was, we only
+ // (force) mark it as done.
+ // unless it was already marked as swept - in that case we leave it as is.
+ if (new Sweep2StatusDocument(existing).isSwept()) {
+ LOG.info("forceReleaseSweep2LockAndMarkSwept: sweep2 status was already marked swept previously");
+ return true;
+ }
+ UpdateOp updateOp = new UpdateOp(SWEEP2_STATUS_ID, false);
+ updateOp.set(STATUS_PROPERTY, STATUS_VALUE_SWEPT);
+ updateOp.set(MOD_COUNT_PROPERTY, existing.getModCount() + 1);
+ updateOp.set(SWEPT_BY_PROPERTY, clusterId);
+ if (existing.keySet().contains(LOCK_PROPERTY)) {
+ updateOp.remove(LOCK_PROPERTY);
+ }
+ if (documentStore.findAndUpdate(Collection.SETTINGS, updateOp) == null) {
+ LOG.info("forceReleaseSweep2LockAndMarkSwept: another instance just wanted to mark sweep2 as done a few moments ago too.");
+ Sweep2StatusDocument status = readFrom(documentStore);
+ if (status == null) {
+ LOG.warn("forceReleaseSweep2LockAndMarkSwept: no existing sweep2 status after updating failed");
+ return false;
+ } else {
+ // so, someone else force-marked as swept in between, if that succeeded
+ // the status is now swept - in that case we consider the job done anyway
+ return status.isSwept();
+ }
+ } else {
+ LOG.info("forceReleaseSweep2LockAndMarkSwept: sweep2 status set to swept by clusterId=" + clusterId);
+ return true;
+ }
+ }
+ }
+
+ private final Document doc;
+
+ private Sweep2StatusDocument(Document doc) {
+ this.doc = doc;
+ }
+
+ public boolean isSwept() {
+ return STATUS_VALUE_SWEPT.equals(doc.get(STATUS_PROPERTY));
+ }
+
+ public boolean isSweeping() {
+ return STATUS_VALUE_SWEEPING.equals(doc.get(STATUS_PROPERTY));
+ }
+
+ public boolean isChecking() {
+ return STATUS_VALUE_CHECKING.equals(doc.get(STATUS_PROPERTY));
+ }
+
+ public int getLockClusterId() {
+ return Integer.parseInt(String.valueOf(doc.get(LOCK_PROPERTY)));
+ }
+
+ public Integer getSweptById() {
+ Object value = doc.get(SWEPT_BY_PROPERTY);
+ if (value == null) {
+ return null;
+ }
+ return Integer.parseInt(String.valueOf(value));
+ }
+
+ public long getLockValue() {
+ return doc.getModCount();
+ }
+
+ @Override
+ public String toString() {
+ return "Sweep2StatusDocument(status=" + doc.get(STATUS_PROPERTY) + ",lockClusterId=" + getLockClusterId() + ",lockValue=" + getLockValue()+")";
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Sweep2StatusDocument.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Oct 14 08:47:54 2020
@@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.transform;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isCommitRootEntry;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isRevisionsEntry;
/**
* Utility methods.
@@ -106,6 +108,26 @@ public class Utils {
}
};
+ /**
+ * A predicate for property, _deleted, _commitRoot or _revisions names.
+ */
+ public static final Predicate<String> PROPERTY_OR_DELETED_OR_COMMITROOT_OR_REVISIONS = new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return Utils.isPropertyName(input) || isDeletedEntry(input) || isCommitRootEntry(input) || isRevisionsEntry(input);
+ }
+ };
+
+ /**
+ * A predicate for _commitRoot and _revisions names.
+ */
+ public static final Predicate<String> COMMITROOT_OR_REVISIONS = new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return isCommitRootEntry(input) || isRevisionsEntry(input);
+ }
+ };
+
public static int pathDepth(String path) {
if (path.equals("/")) {
return 0;
@@ -983,7 +1005,9 @@ public class Utils {
public static void joinQuietly(Thread... threads) {
for (Thread t : threads) {
try {
- t.join();
+ if (t != null) {
+ t.join();
+ }
} catch (InterruptedException e) {
// ignore
}
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchStateTest.java Wed Oct 14 08:47:54 2020
@@ -68,6 +68,7 @@ public class BranchStateTest {
}
assertNotNull(store.find(Collection.NODES, testId));
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
}
private static final class FailingHook implements CommitHook {
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BranchTest.java Wed Oct 14 08:47:54 2020
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.plugins.document.Branch.BranchCommit;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -31,13 +32,16 @@ import org.apache.jackrabbit.oak.spi.sta
import org.junit.Rule;
import org.junit.Test;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.asDocumentState;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.persistToBranch;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -88,6 +92,135 @@ public class BranchTest {
}
@Test
+ public void rootBranchCommitChildTest() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ DocumentNodeStore ns = builderProvider.newBuilder()
+ .setClusterId(1)
+ .setDocumentStore(store).build();
+
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child("a");
+ persistToBranch(builder);
+
+ merge(ns, builder);
+
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+ }
+
+ @Test
+ public void childBranchCommitChildTest() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ DocumentNodeStore ns = builderProvider.newBuilder()
+ .setClusterId(1)
+ .setDocumentStore(store).build();
+
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child("a");
+ merge(ns, builder);
+
+ builder = ns.getRoot().builder();
+ builder.child("a").child("b");
+ persistToBranch(builder);
+
+ merge(ns, builder);
+
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+ }
+
+ @Test
+ public void manyBranchCommitsDepthTest() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ DocumentNodeStore ns = builderProvider.newBuilder()
+ .setClusterId(1)
+ .setDocumentStore(store).build();
+
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child("a");
+ persistToBranch(builder);
+
+ builder.getChildNode("a").child("b");
+ persistToBranch(builder);
+
+ builder.getChildNode("a").getChildNode("b").child("c");
+ persistToBranch(builder);
+
+ builder.getChildNode("a").getChildNode("b").getChildNode("c").child("d");
+ persistToBranch(builder);
+
+ builder.getChildNode("a").getChildNode("b").getChildNode("c").getChildNode("d").child("e");
+ persistToBranch(builder);
+
+ merge(ns, builder);
+
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+ }
+
+ @Test
+ public void manyBranchCommitsWidthTest() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ DocumentNodeStore ns = builderProvider.newBuilder()
+ .setClusterId(1)
+ .setDocumentStore(store).build();
+
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child("a");
+ persistToBranch(builder);
+
+ builder.child("b");
+ persistToBranch(builder);
+
+ builder.child("c");
+ persistToBranch(builder);
+
+ builder.child("d");
+ persistToBranch(builder);
+
+ builder.child("e");
+ persistToBranch(builder);
+
+ merge(ns, builder);
+
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+ }
+
+ @Test
+ public void mixedPre18BranchTest() throws Exception {
+ MemoryDocumentStore store = new MemoryDocumentStore();
+ DocumentNodeStore ns = builderProvider.newBuilder()
+ .setClusterId(1)
+ .setDocumentStore(store).build();
+ // step 1 : create an initial structure /a/b/c
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child("a").child("b").child("c").setProperty("commit", "1");
+ merge(ns, builder);
+
+ // step 2 : create a branch commit incl. with a child under an existing node, ie /a/b/c/d..
+ builder = ns.getRoot().builder();
+ builder.child("a").setProperty("commit", "2");
+ builder.child("a").child("b").child("c").child("d").child("e").setProperty("commit", "2");
+ persistToBranch(builder);
+ merge(ns, builder);
+
+ // step 3 : verify that "_bc" are set correctly : on all except /a/b (where nothing was changed)
+ assertNotNull(store.find(NODES, Utils.getIdFromPath("/")).get("_bc"));
+ assertNotNull(store.find(NODES, Utils.getIdFromPath("/a")).get("_bc"));
+ assertNull(store.find(NODES, Utils.getIdFromPath("/a/b")).get("_bc"));
+ assertNotNull(store.find(NODES, Utils.getIdFromPath("/a/b/c")).get("_bc"));
+ assertNotNull(store.find(NODES, Utils.getIdFromPath("/a/b/c/d")).get("_bc"));
+ assertNotNull(store.find(NODES, Utils.getIdFromPath("/a/b/c/d/e")).get("_bc"));
+
+ // step 4 : verify the "_bc" are set correctly by backgroundSweep()/forceBackgroundSweep()
+
+ // step 4b: /a/b/c did not get a "_bc" because it only contained "_commitRoot" but no other changes, and
+ // https://github.com/apache/jackrabbit-oak/blob/d35346d4d446908c7019e931cb54d88824c1a637/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java#L179
+ // only went through document that had changes in properties in PROPERTY_OR_DELETED
+
+ // step 4c: / also failed with a similar reason as the above, except the root
+ // only has changes in "_revisions" (not "_commitRoot")
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
+ }
+
+ @Test
public void orphanedBranchTest() {
MemoryDocumentStore store = new MemoryDocumentStore();
DocumentNodeStore ns = builderProvider.newBuilder()
@@ -101,6 +234,8 @@ public class BranchTest {
ns = builderProvider.newBuilder()
.setDocumentStore(store).build();
assertFalse(ns.getRoot().hasProperty("p"));
+
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
}
@Test
@@ -136,6 +271,8 @@ public class BranchTest {
DocumentNodeState state = root.getNodeAtRevision(ns, br, null);
assertNotNull(state);
assertEquals("a", state.getString("p"));
+
+ Sweep2TestHelper.testPre18UpgradeSimulations(ns, builderProvider);
}
private void assertModifiedPaths(Iterable<Path> actual, String... expected) {
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java Wed Oct 14 08:47:54 2020
@@ -105,7 +105,7 @@ public class DocumentNodeStoreSweepIT ex
}
- private static boolean isClean(DocumentNodeStore ns, String path) {
+ static boolean isClean(DocumentNodeStore ns, String path) {
// use find that also reads from the cache
NodeDocument doc = ns.getDocumentStore().find(NODES, Utils.getIdFromPath(path));
for (Revision c : doc.getAllChanges()) {
@@ -144,7 +144,7 @@ public class DocumentNodeStoreSweepIT ex
store.fail().never();
}
- private String createUncommittedChanges(DocumentNodeStore ns,
+ static String createUncommittedChanges(DocumentNodeStore ns,
FailingDocumentStore store) throws Exception {
ns.setMaxBackOffMillis(0);
NodeBuilder builder = ns.getRoot().builder();
Modified: jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java?rev=1882480&r1=1882479&r2=1882480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/FailingDocumentStore.java Wed Oct 14 08:47:54 2020
@@ -28,6 +28,8 @@ import org.apache.jackrabbit.oak.plugins
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
+import java.util.LinkedList;
+
/**
* Wraps a document store and can be instructed to fail operations.
*/
@@ -43,6 +45,8 @@ class FailingDocumentStore extends Docum
private Type exceptionType = Type.GENERIC;
+ private List<Collection<? extends Document>> collectionIncludeList;
+
class Fail {
private Fail() {
@@ -65,6 +69,7 @@ class FailingDocumentStore extends Docum
numFailures.set(0);
failAfter.set(Long.MAX_VALUE);
exceptionType = Type.GENERIC;
+ collectionIncludeList = null;
}
void once() {
@@ -79,6 +84,14 @@ class FailingDocumentStore extends Docum
p = probability;
return this;
}
+
+ Fail on(Collection<? extends Document> collectionInclude) {
+ if (collectionIncludeList == null) {
+ collectionIncludeList = new LinkedList<>();
+ }
+ collectionIncludeList.add(collectionInclude);
+ return this;
+ }
}
FailingDocumentStore(DocumentStore store, long seed) {
@@ -101,7 +114,7 @@ class FailingDocumentStore extends Docum
@Override
public <T extends Document> void remove(Collection<T> collection,
String key) {
- maybeFail();
+ maybeFail(collection);
super.remove(collection, key);
}
@@ -120,7 +133,7 @@ class FailingDocumentStore extends Docum
int num = 0;
// remove individually
for (Map.Entry<String, Long> rm : toRemove.entrySet()) {
- maybeFail();
+ maybeFail(collection);
num += super.remove(collection, singletonMap(rm.getKey(), rm.getValue()));
}
return num;
@@ -132,7 +145,7 @@ class FailingDocumentStore extends Docum
long startValue,
long endValue)
throws DocumentStoreException {
- maybeFail();
+ maybeFail(collection);
return super.remove(collection, indexedProperty, startValue, endValue);
}
@@ -141,7 +154,7 @@ class FailingDocumentStore extends Docum
List<UpdateOp> updateOps) {
// create individually
for (UpdateOp op : updateOps) {
- maybeFail();
+ maybeFail(collection);
if (!super.create(collection, singletonList(op))) {
return false;
}
@@ -152,7 +165,7 @@ class FailingDocumentStore extends Docum
@Override
public <T extends Document> T createOrUpdate(Collection<T> collection,
UpdateOp update) {
- maybeFail();
+ maybeFail(collection);
return super.createOrUpdate(collection, update);
}
@@ -170,12 +183,13 @@ class FailingDocumentStore extends Docum
@Override
public <T extends Document> T findAndUpdate(Collection<T> collection,
UpdateOp update) {
- maybeFail();
+ maybeFail(collection);
return super.findAndUpdate(collection, update);
}
- private void maybeFail() {
- if (random.nextFloat() < p || failAfter.getAndDecrement() <= 0) {
+ private <T extends Document> void maybeFail(Collection<T> collection) {
+ if ((collectionIncludeList == null || collectionIncludeList.contains(collection)) &&
+ (random.nextFloat() < p || failAfter.getAndDecrement() <= 0)) {
if (numFailures.getAndDecrement() > 0) {
throw new DocumentStoreException("write operation failed", null, exceptionType);
}