You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2017/03/23 11:04:43 UTC
svn commit: r1788218 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: mreutegg
Date: Thu Mar 23 11:04:43 2017
New Revision: 1788218
URL: http://svn.apache.org/viewvc?rev=1788218&view=rev
Log:
OAK-3712: Clean up old and uncommitted changes
Introduce NodeDocumentSweeper
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweepListener.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeperTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1788218&r1=1788217&r2=1788218&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Thu Mar 23 11:04:43 2017
@@ -91,7 +91,7 @@ public class MissingLastRevSeeker {
}
/**
- * Get the candidates with modified time after the specified
+ * Get the candidates with modified time greater than or equal the specified
* {@code startTime}.
*
* @param startTime the start time.
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweepListener.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweepListener.java?rev=1788218&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweepListener.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweepListener.java Thu Mar 23 11:04:43 2017
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Map;
+
+/**
+ * Receives callbacks from the {@link NodeDocumentSweeper} on what updates
+ * are required for the sweep ({@link #sweepUpdate(Map)} and required
+ * invalidation of documents.
+ */
+interface NodeDocumentSweepListener {
+
+ /**
+ * Called for a batch of sweep updates that should be performed.
+ *
+ * @param updates the update operations. The keys in the map are the paths
+ * of the documents to update.
+ * @throws DocumentStoreException if the operation fails.
+ */
+ void sweepUpdate(Map<String, UpdateOp> updates) throws DocumentStoreException;
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweepListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java?rev=1788218&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java Thu Mar 23 11:04:43 2017
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.partition;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.isDeletedEntry;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeCommitRoot;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.removeRevision;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setDeletedOnce;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.PROPERTY_OR_DELETED;
+
+/**
+ * The {@code NodeDocumentSweeper} is responsible for removing uncommitted
+ * changes from {@code NodeDocument}s for a given clusterId. The sweeper scans
+ * through documents modified after a given timestamp. This timestamp is derived
+ * from the sweep revisions available on the root document
+ * ({@link NodeDocument#getSweepRevisions()}). If no sweep revision is present
+ * for the current clusterId, the sweeper scans through the entire nodes
+ * collection. This is usually a one-time operation when the repository is
+ * upgraded from a version older than 1.8.
+ * <p>
+ * The sweeper can read from an eventually consistent store and does not require
+ * that it sees the most up-to-date state of the store. That is, running it off
+ * a MongoDB Secondary is fine and even recommended.
+ * <p>
+ * This class is not thread-safe.
+ */
+abstract class NodeDocumentSweeper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodeDocumentSweeper.class);
+
+ private static final int INVALIDATE_BATCH_SIZE = 100;
+
+ private final RevisionContext context;
+
+ private final int clusterId;
+
+ private Revision headRevision;
+
+ /**
+ * Creates a new sweeper for the given context.
+ *
+ * @param context the revision context.
+ *
+ */
+ NodeDocumentSweeper(RevisionContext context) {
+ this.context = checkNotNull(context);
+ this.clusterId = context.getClusterId();
+ }
+
+ /**
+ * Must be implemented by sub-class and return the current root document
+ * in the nodes collection. An implementation is permitted to return a root
+ * document that is not up-to-date, as long as it is consistency with
+ * {@link #getDocuments(long)}.
+ *
+ * @return the root document.
+ * @throws DocumentStoreException if an exception occurs reading the
+ * document.
+ */
+ @Nonnull
+ protected abstract NodeDocument getRoot() throws DocumentStoreException;
+
+ /**
+ * Returns all documents that have a {@link NodeDocument#MODIFIED_IN_SECS}
+ * field equal or greater than {@code modifiedInSecs}. An implementation is
+ * permitted to return documents that are not up-to-date, as long as they
+ * are consistent with {@link #getRoot()}.
+ * <p>
+ * See also {@link NodeDocument#getModifiedInSecs(long)}.
+ *
+ * @param modifiedInSecs a timestamp in seconds.
+ * @return matching documents.
+ * @throws DocumentStoreException if an exception occurs reading documents.
+ */
+ @Nonnull
+ protected abstract Iterable<NodeDocument> getDocuments(long modifiedInSecs)
+ throws DocumentStoreException;
+
+ /**
+ * Performs a sweep and reports the required updates to the given sweep
+ * listener. The returned revision is the new sweep revision for the
+ * clusterId associated with the revision context used to create this
+ * sweeper. The caller is responsible for storing the returned sweep
+ * revision on the root document. This method returns {@code null} if no
+ * update was needed or possible.
+ *
+ * @param listener the listener to receive required sweep update operations.
+ * @return the new sweep revision or {@code null} if no updates were done.
+ * @throws DocumentStoreException if reading from the store or writing to
+ * the store failed.
+ */
+ @CheckForNull
+ Revision sweep(NodeDocumentSweepListener listener)
+ throws DocumentStoreException {
+ return performSweep(listener);
+ }
+
+ //----------------------------< internal >----------------------------------
+
+ @CheckForNull
+ private Revision performSweep(NodeDocumentSweepListener listener)
+ throws DocumentStoreException {
+ NodeDocument rootDoc = getRoot();
+ RevisionVector head = getHeadRevision(rootDoc);
+ headRevision = head.getRevision(clusterId);
+ if (headRevision == null) {
+ LOG.warn("Head revision {} does not have an entry for " +
+ "clusterId {}. Sweeping of documents is skipped.",
+ head, clusterId);
+ return null;
+ }
+ RevisionVector sweepRevs = rootDoc.getSweepRevisions();
+ Revision lastSweepHead = sweepRevs.getRevision(clusterId);
+ if (lastSweepHead == null) {
+ // sweep all
+ lastSweepHead = new Revision(0, 0, clusterId);
+ }
+ // only sweep documents when the _modified time changed
+ long lastSweepTick = getModifiedInSecs(lastSweepHead.getTimestamp());
+ long currentTick = getModifiedInSecs(context.getClock().getTime());
+ if (lastSweepTick == currentTick) {
+ return null;
+ }
+
+ LOG.debug("Starting document sweep. Head: {}, starting at {}",
+ headRevision, lastSweepHead);
+
+ CloseableIterable<Map.Entry<String, UpdateOp>> ops = sweepOperations(lastSweepTick);
+ try {
+ for (List<Map.Entry<String, UpdateOp>> batch : partition(ops, INVALIDATE_BATCH_SIZE)) {
+ Map<String, UpdateOp> updates = newHashMap();
+ for (Map.Entry<String, UpdateOp> entry : batch) {
+ updates.put(entry.getKey(), entry.getValue());
+ }
+ listener.sweepUpdate(updates);
+ }
+ } finally {
+ try {
+ ops.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close sweep operations", e);
+ }
+ }
+ LOG.debug("Document sweep finished");
+ return headRevision;
+ }
+
+ private CloseableIterable<Map.Entry<String, UpdateOp>> sweepOperations(long modifiedInSecs) {
+ final Iterable<NodeDocument> docs = getDocuments(modifiedInSecs);
+ return CloseableIterable.wrap(filter(transform(docs,
+ new Function<NodeDocument, Map.Entry<String, UpdateOp>>() {
+ @Override
+ public Map.Entry<String, UpdateOp> apply(NodeDocument doc) {
+ return immutableEntry(doc.getPath(), sweepOne(doc));
+ }
+ }), new Predicate<Map.Entry<String, UpdateOp>>() {
+ @Override
+ public boolean apply(Map.Entry<String, UpdateOp> input) {
+ return input.getValue() != null;
+ }
+ }), new Closeable() {
+ @Override
+ public void close() throws IOException {
+ Utils.closeIfCloseable(docs);
+ }
+ });
+ }
+
+ @Nonnull
+ private RevisionVector getHeadRevision(NodeDocument rootDoc) {
+ return new RevisionVector(rootDoc.getLastRev().values());
+ }
+
+ private UpdateOp sweepOne(NodeDocument doc) throws DocumentStoreException {
+ UpdateOp op = createUpdateOp(doc);
+ for (String property : filter(doc.keySet(), PROPERTY_OR_DELETED)) {
+ Map<Revision, String> valueMap = doc.getLocalMap(property);
+ for (Map.Entry<Revision, String> entry : valueMap.entrySet()) {
+ Revision rev = entry.getKey();
+ // only consider change for this cluster node
+ if (rev.getClusterId() != clusterId) {
+ continue;
+ }
+ Revision cRev = getCommitRevision(doc, rev);
+ if (cRev == null) {
+ uncommitted(doc, property, rev, op);
+ } else if (cRev.equals(rev)) {
+ committed(property, rev, op);
+ } else {
+ committedBranch(property, rev, cRev, op);
+ }
+ }
+ }
+ return op.hasChanges() ? op : null;
+ }
+
+ private void uncommitted(NodeDocument doc,
+ String property,
+ Revision rev,
+ UpdateOp op) {
+ if (headRevision.compareRevisionTime(rev) < 0) {
+ // ignore changes that happen after the
+ // head we are currently looking at
+ return;
+ }
+ if (isV18BranchCommit(rev, doc)) {
+ // this is a not yet merged branch commit
+ // -> do nothing
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unmerged branch commit on {}, {} @ {}",
+ op.getId(), property, rev);
+ }
+ } else {
+ // this may be a not yet merged branch commit, but since it
+ // wasn't created by this Oak version, it must be a left over
+ // from an old branch which cannot be merged anyway.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Uncommitted change on {}, {} @ {}",
+ op.getId(), property, rev);
+ }
+ op.removeMapEntry(property, rev);
+ if (doc.getLocalCommitRoot().containsKey(rev)) {
+ removeCommitRoot(op, rev);
+ } else {
+ removeRevision(op, rev);
+ }
+ // set _deletedOnce if uncommitted change is a failed create
+ // node operation and doc does not have _deletedOnce yet
+ if (isDeletedEntry(property)
+ && !doc.wasDeletedOnce()
+ && "false".equals(doc.getLocalDeleted().get(rev))) {
+ setDeletedOnce(op);
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if the given revision is marked as a branch commit
+ * on the document. This method only checks local branch commit information
+ * available on the document ({@link NodeDocument#getLocalBranchCommits()}).
+ * If the given revision is related to a branch commit that was created
+ * prior to Oak 1.8, the method will return {@code false}.
+ *
+ * @param rev a revision.
+ * @param doc the document to check.
+ * @return {@code true} if the revision is marked as a branch commit;
+ * {@code false} otherwise.
+ */
+ private boolean isV18BranchCommit(Revision rev, NodeDocument doc) {
+ return doc.getLocalBranchCommits().contains(rev);
+ }
+
+ private void committed(String property,
+ Revision rev,
+ UpdateOp op) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committed change on {}, {} @ {}",
+ op.getId(), property, rev);
+ }
+ }
+
+ private void committedBranch(String property,
+ Revision rev,
+ Revision cRev,
+ UpdateOp op) {
+ boolean newerThanHead = cRev.compareRevisionTime(headRevision) > 0;
+ if (LOG.isDebugEnabled()) {
+ String msg = newerThanHead ? " (newer than head)" : "";
+ LOG.debug("Committed branch change on {}, {} @ {}/{}{}",
+ op.getId(), property, rev, cRev, msg);
+ }
+ }
+
+ private static UpdateOp createUpdateOp(NodeDocument doc) {
+ return new UpdateOp(doc.getId(), false);
+ }
+
+ @CheckForNull
+ private Revision getCommitRevision(final NodeDocument doc,
+ final Revision rev)
+ throws DocumentStoreException {
+ String cv = context.getCommitValue(rev, doc);
+ if (cv == null) {
+ return null;
+ }
+ return Utils.resolveCommitRevision(rev, cv);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeperTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeperTest.java?rev=1788218&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeperTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeperTest.java Thu Mar 23 11:04:43 2017
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COMMIT_ROOT;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setCommitRoot;
+import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.setModified;
+import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
+import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation.Type.REMOVE_MAP_ENTRY;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class NodeDocumentSweeperTest {
+
+ @Rule
+ public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
+
+ private Clock clock;
+ private DocumentNodeStore ns;
+ private DocumentMK mk;
+ private DocumentStore store;
+ private MissingLastRevSeeker seeker;
+
+ @Before
+ public void before() throws Exception {
+ clock = new Clock.Virtual();
+ clock.waitUntil(System.currentTimeMillis());
+ DocumentMK.Builder builder = builderProvider.newBuilder();
+ builder.clock(clock).setAsyncDelay(0);
+ Revision.setClock(clock);
+ mk = builder.open();
+ ns = builder.getNodeStore();
+ store = ns.getDocumentStore();
+ seeker = builder.createMissingLastRevSeeker();
+ }
+
+ @After
+ public void after() {
+ Revision.resetClockToDefault();
+ }
+
+ @Test
+ public void sweepUncommittedBeforeHead() throws Exception {
+ Revision uncommitted = ns.newRevision();
+ NodeBuilder b = ns.getRoot().builder();
+ b.child("test");
+ merge(ns, b);
+ ns.runBackgroundUpdateOperations();
+
+ UpdateOp op = new UpdateOp(getIdFromPath("/test"), false);
+ op.setMapEntry("foo", uncommitted, "value");
+ setCommitRoot(op, uncommitted, 0);
+ setModified(op, uncommitted);
+ assertNotNull(store.findAndUpdate(NODES, op));
+
+ List<UpdateOp> ops = Lists.newArrayList();
+ Revision nextSweepStart = sweep(ops);
+
+ assertEquals(ns.getHeadRevision().getRevision(ns.getClusterId()), nextSweepStart);
+ assertEquals(1, ops.size());
+ op = ops.get(0);
+ Map<Key, Operation> changes = op.getChanges();
+ assertEquals(2, changes.size());
+ Operation o = changes.get(new Key(COMMIT_ROOT, uncommitted));
+ assertNotNull(o);
+ assertEquals(REMOVE_MAP_ENTRY, o.type);
+ o = changes.get(new Key("foo", uncommitted));
+ assertNotNull(o);
+ assertEquals(REMOVE_MAP_ENTRY, o.type);
+ }
+
+ @Test
+ public void sweepUncommittedAfterHead() throws Exception {
+ NodeBuilder b = ns.getRoot().builder();
+ b.child("test");
+ merge(ns, b);
+ ns.runBackgroundUpdateOperations();
+
+ Revision uncommitted = ns.newRevision();
+ UpdateOp op = new UpdateOp(getIdFromPath("/test"), false);
+ op.setMapEntry("foo", uncommitted, "value");
+ setCommitRoot(op, uncommitted, 0);
+ setModified(op, uncommitted);
+ assertNotNull(store.findAndUpdate(NODES, op));
+
+ List<UpdateOp> ops = Lists.newArrayList();
+ Revision nextSweepStart = sweep(ops);
+
+ assertEquals(ns.getHeadRevision().getRevision(ns.getClusterId()), nextSweepStart);
+ assertEquals(0, ops.size());
+ }
+
+ @Test
+ public void sweepUnmergedBranchCommit() throws Exception {
+ NodeBuilder b = ns.getRoot().builder();
+ b.child("test");
+ merge(ns, b);
+
+ String branchRev = mk.branch(null);
+ mk.commit("/test", "^\"foo\":\"value\"", branchRev, null);
+
+ // force a new head revision newer than branch commit
+ b = ns.getRoot().builder();
+ b.child("bar");
+ merge(ns, b);
+ ns.runBackgroundUpdateOperations();
+
+ List<UpdateOp> ops = Lists.newArrayList();
+ Revision nextSweepStart = sweep(ops);
+
+ // must not touch branch
+ assertEquals(ns.getHeadRevision().getRevision(ns.getClusterId()), nextSweepStart);
+ assertEquals(0, ops.size());
+ }
+
+ @Test
+ public void sweepMergedBranch() throws Exception {
+ String branchRev = mk.branch(null);
+ branchRev = mk.commit("/", "+\"foo\":{}", branchRev, null);
+ branchRev = mk.commit("/", "+\"bar\":{}", branchRev, null);
+ branchRev = mk.commit("/", "+\"baz\":{}", branchRev, null);
+ mk.merge(branchRev, null);
+ ns.runBackgroundUpdateOperations();
+
+ List<UpdateOp> ops = Lists.newArrayList();
+ Revision nextSweepStart = sweep(ops);
+
+ assertEquals(ns.getHeadRevision().getRevision(ns.getClusterId()), nextSweepStart);
+ assertEquals(0, ops.size());
+ }
+
+ private Revision sweep(final List<UpdateOp> ops) throws Exception {
+ // sweeper only runs once every 5 seconds
+ // make sure it does run
+ clock.waitUntil(clock.getTime() + NodeDocument.MODIFIED_IN_SECS_RESOLUTION * 1000);
+ // now perform the sweep
+ NodeDocumentSweeper sweeper = new NodeDocumentSweeper(ns) {
+ @Nonnull
+ @Override
+ protected NodeDocument getRoot() throws DocumentStoreException {
+ return getRootDocument(store);
+ }
+
+ @Nonnull
+ @Override
+ protected Iterable<NodeDocument> getDocuments(long modifiedInSecs)
+ throws DocumentStoreException {
+ return seeker.getCandidates(TimeUnit.SECONDS.toMillis(modifiedInSecs));
+ }
+ };
+ return sweeper.sweep(new NodeDocumentSweepListener() {
+ @Override
+ public void sweepUpdate(Map<String, UpdateOp> updates)
+ throws DocumentStoreException {
+ ops.addAll(updates.values());
+ }
+ });
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentSweeperTest.java
------------------------------------------------------------------------------
svn:eol-style = native