You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2017/03/22 09:12:52 UTC

svn commit: r1788073 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/document/util/ test/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/ja...

Author: mreutegg
Date: Wed Mar 22 09:12:51 2017
New Revision: 1788073

URL: http://svn.apache.org/viewvc?rev=1788073&view=rev
Log:
OAK-5964: Invalidate documents through journal

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Mar 22 09:12:51 2017
@@ -31,9 +31,9 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
 import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.MANY_CHILDREN_THRESHOLD;
-import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.alignWithExternalRevisions;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.pathToId;
 import static org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
@@ -44,7 +44,6 @@ import java.io.InputStream;
 import java.lang.ref.WeakReference;
 import java.text.SimpleDateFormat;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -83,7 +82,6 @@ import com.google.common.util.concurrent
 import org.apache.jackrabbit.api.stats.TimeSeries;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
-import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.core.SimpleCommitContext;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
@@ -101,7 +99,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.commons.json.JsopStream;
 import org.apache.jackrabbit.oak.commons.json.JsopWriter;
-import org.apache.jackrabbit.oak.commons.sort.StringSort;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -305,6 +302,9 @@ public final class DocumentNodeStore
      */
     private final Object backgroundReadMonitor = new Object();
 
+    /**
+     * Background thread performing updates of _lastRev entries.
+     */
     private Thread backgroundUpdateThread;
 
     /**
@@ -2003,7 +2003,7 @@ public final class DocumentNodeStore
      *          cluster node.
      */
     @Nonnull
-    RevisionVector getMinExternalRevisions() {
+    private RevisionVector getMinExternalRevisions() {
         return new RevisionVector(transform(filter(clusterNodes.values(),
                 new Predicate<ClusterNodeInfoDocument>() {
                     @Override
@@ -2023,75 +2023,21 @@ public final class DocumentNodeStore
      * Perform a background read and make external changes visible.
      */
     private BackgroundReadStats backgroundRead() {
-        BackgroundReadStats stats = new BackgroundReadStats();
-        long time = clock.getTime();
-        String id = Utils.getIdFromPath("/");
-        NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
-        if (doc == null) {
-            return stats;
-        }
-        alignWithExternalRevisions(doc);
-
-        StringSort externalSort = JournalEntry.newSorter();
-
-        Map<Integer, Revision> lastRevMap = doc.getLastRev();
-        try {
-            ChangeSetBuilder changeSetBuilder = newChangeSetBuilder();
-            JournalPropertyHandler journalPropertyHandler = journalPropertyHandlerFactory.newHandler();
-            RevisionVector headRevision = getHeadRevision();
-            Set<Revision> externalChanges = Sets.newHashSet();
-            for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
-                int machineId = e.getKey();
-                if (machineId == clusterId) {
-                    // ignore own lastRev
-                    continue;
-                }
-                Revision r = e.getValue();
-                Revision last = headRevision.getRevision(machineId);
-                if (last == null) {
-                    // make sure we see all changes when a cluster node joins
-                    last = new Revision(0, 0, machineId);
-                }
-                if (r.compareRevisionTime(last) > 0) {
-                    // OAK-2345
-                    // only consider as external change if
-                    // the revision changed for the machineId
-                    externalChanges.add(r);
-                    // collect external changes
-                    if (externalSort != null) {
-                        // add changes for this particular clusterId to the externalSort
-                        try {
-                            fillExternalChanges(externalSort, PathUtils.ROOT_PATH, last, r, store, changeSetBuilder, journalPropertyHandler);
-                        } catch (Exception e1) { // OAK-5601 : catch any Exception, not only IOException
-                            LOG.error("backgroundRead: Exception while reading external changes from journal: " + e1, e1);
-                            IOUtils.closeQuietly(externalSort);
-                            externalSort = null;
-                        }
-                    }
-                }
+        return new ExternalChange(this) {
+            @Override
+            void invalidateCache(@Nonnull Iterable<String> paths) {
+                stats.cacheStats = store.invalidateCache(pathToId(paths));
             }
 
-            stats.readHead = clock.getTime() - time;
-            time = clock.getTime();
-
-            if (!externalChanges.isEmpty()) {
-                // invalidate caches
-                if (externalSort == null) {
-                    // if no externalSort available, then invalidate the classic way: everything
-                    stats.cacheStats = store.invalidateCache();
-                } else {
-                    try {
-                        externalSort.sort();
-                        stats.numExternalChanges = externalSort.getSize();
-                        stats.cacheStats = store.invalidateCache(pathToId(externalSort));
-                    } catch (Exception ioe) {
-                        LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
-                        stats.cacheStats = store.invalidateCache();
-                    }
-                }
-                stats.cacheInvalidationTime = clock.getTime() - time;
-                time = clock.getTime();
+            @Override
+            void invalidateCache() {
+                stats.cacheStats = store.invalidateCache();
+            }
 
+            @Override
+            void updateHead(@Nonnull Set<Revision> externalChanges,
+                            @Nullable Iterable<String> changedPaths) {
+                long time = clock.getTime();
                 // make sure no local commit is in progress
                 backgroundOperationLock.writeLock().lock();
                 try {
@@ -2105,32 +2051,29 @@ public final class DocumentNodeStore
                     setRoot(newHead);
                     commitQueue.headRevisionChanged();
                     time = clock.getTime();
-                    if (externalSort != null) {
+                    if (changedPaths != null) {
                         // then there were external changes and reading them
                         // was successful -> apply them to the diff cache
                         try {
-                            JournalEntry.applyTo(externalSort, diffCache,
+                            JournalEntry.applyTo(changedPaths, diffCache,
                                     PathUtils.ROOT_PATH, oldHead, newHead);
                         } catch (Exception e1) {
-                            LOG.error("backgroundRead: Exception while processing external changes from journal: {}", e1, e1);
+                            LOG.error("backgroundRead: Exception while processing external changes from journal: " + e1, e1);
                         }
                     }
                     stats.populateDiffCache = clock.getTime() - time;
                     time = clock.getTime();
 
-                    ChangeSet changeSet = changeSetBuilder.build();
+                    ChangeSet changeSet = getChangeSetBuilder().build();
                     LOG.debug("Dispatching external change with ChangeSet {}", changeSet);
-                    dispatcher.contentChanged(getRoot().fromExternalChange(), newCommitInfo(changeSet, journalPropertyHandler));
+                    dispatcher.contentChanged(getRoot().fromExternalChange(),
+                            newCommitInfo(changeSet, getJournalPropertyHandler()));
                 } finally {
                     backgroundOperationLock.writeLock().unlock();
                 }
                 stats.dispatchChanges = clock.getTime() - time;
             }
-        } finally {
-            IOUtils.closeQuietly(externalSort);
-        }
-
-        return stats;
+        }.process();
     }
 
     private static CommitInfo newCommitInfo(@Nonnull ChangeSet changeSet, JournalPropertyHandler journalPropertyHandler) {
@@ -2330,52 +2273,15 @@ public final class DocumentNodeStore
     private void initializeRootState(NodeDocument rootDoc) {
         checkState(root == null);
 
-        alignWithExternalRevisions(rootDoc);
-        RevisionVector headRevision = new RevisionVector(
-                rootDoc.getLastRev().values()).update(newRevision());
-        setRoot(headRevision);
-    }
-
-    /**
-     * Makes sure the current time is after the most recent external revision
-     * timestamp in the _lastRev map of the given root document. If necessary
-     * the current thread waits until {@link #clock} is after the external
-     * revision timestamp.
-     *
-     * @param rootDoc the root document.
-     */
-    private void alignWithExternalRevisions(@Nonnull NodeDocument rootDoc) {
-        Map<Integer, Revision> lastRevMap = checkNotNull(rootDoc).getLastRev();
         try {
-            long externalTime = Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
-            long localTime = clock.getTime();
-            if (localTime < externalTime) {
-                LOG.warn("Detected clock differences. Local time is '{}', " +
-                                "while most recent external time is '{}'. " +
-                                "Current _lastRev entries: {}",
-                        new Date(localTime), new Date(externalTime), lastRevMap.values());
-                double delay = ((double) externalTime - localTime) / 1000d;
-                String fmt = "Background read will be delayed by %.1f seconds. " +
-                        "Please check system time on cluster nodes.";
-                String msg = String.format(fmt, delay);
-                LOG.warn(msg);
-                while (localTime + 60000 < externalTime) {
-                    clock.waitUntil(localTime + 60000);
-                    localTime = clock.getTime();
-                    delay = ((double) externalTime - localTime) / 1000d;
-                    LOG.warn(String.format(fmt, delay));
-                }
-                clock.waitUntil(externalTime + 1);
-            } else if (localTime == externalTime) {
-                // make sure local time is past external time
-                // but only log at debug
-                LOG.debug("Local and external time are equal. Waiting until local" +
-                        "time is more recent than external reported time.");
-                clock.waitUntil(externalTime + 1);
-            }
+            alignWithExternalRevisions(rootDoc, clock, clusterId);
         } catch (InterruptedException e) {
-            throw new RuntimeException("Background read interrupted", e);
+            throw new DocumentStoreException("Interrupted while aligning with " +
+                    "external revision: " + rootDoc.getLastRev());
         }
+        RevisionVector headRevision = new RevisionVector(
+                rootDoc.getLastRev().values()).update(newRevision());
+        setRoot(headRevision);
     }
 
     @Nonnull

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java?rev=1788073&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java Wed Mar 22 09:12:51 2017
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.sort.StringSort;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSetBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
+import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.newSorter;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.alignWithExternalRevisions;
+
+/**
+ * Utility class to pull in external changes in the DocumentNodeStore and
+ * process journal entries.
+ */
+abstract class ExternalChange {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ExternalChange.class);
+
+    private final DocumentNodeStore store;
+
+    protected final BackgroundReadStats stats;
+
+    private final ChangeSetBuilder changeSetBuilder;
+
+    private final JournalPropertyHandler journalPropertyHandler;
+
+    ExternalChange(DocumentNodeStore store) {
+        this.store = store;
+        this.stats = new BackgroundReadStats();
+        this.changeSetBuilder = new ChangeSetBuilder(
+                store.getChangeSetMaxItems(), store.getChangeSetMaxDepth());
+        this.journalPropertyHandler = store.getJournalPropertyHandlerFactory().newHandler();
+    }
+
+    /**
+     * Called when when cache entries related to nodes with the given paths
+     * must be invalidated.
+     *
+     * @param paths the paths of affected nodes.
+     */
+    abstract void invalidateCache(@Nonnull Iterable<String> paths);
+
+    /**
+     * Called when all cache entries must be invalidated.
+     */
+    abstract void invalidateCache();
+
+    /**
+     * Called when the current head should be updated with revisions of external
+     * changes.
+     *
+     * @param externalChanges the head revision of other cluster nodes that
+     *                        changed and should now be considered visible.
+     * @param changedPaths paths of nodes that are affected by those external
+     *                     changes.
+     */
+    abstract void updateHead(@Nonnull Set<Revision> externalChanges,
+                             @Nullable Iterable<String> changedPaths);
+
+    /**
+     * Processes external changes if there are any.
+     *
+     * @return statistics about the background read operation.
+     */
+    BackgroundReadStats process() {
+        Clock clock = store.getClock();
+        int clusterId = store.getClusterId();
+        long time = clock.getTime();
+        String id = Utils.getIdFromPath("/");
+        NodeDocument doc = store.getDocumentStore().find(NODES, id, store.getAsyncDelay());
+        if (doc == null) {
+            return stats;
+        }
+        try {
+            alignWithExternalRevisions(doc, clock, clusterId);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Background read interrupted", e);
+        }
+
+        StringSort externalSort = newSorter();
+        StringSort invalidate = newSorter();
+
+        Map<Integer, Revision> lastRevMap = doc.getLastRev();
+        try {
+            RevisionVector headRevision = store.getHeadRevision();
+            Set<Revision> externalChanges = newHashSet();
+            for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
+                int machineId = e.getKey();
+                if (machineId == clusterId) {
+                    // ignore own lastRev
+                    continue;
+                }
+                Revision r = e.getValue();
+                Revision last = headRevision.getRevision(machineId);
+                if (last == null) {
+                    // make sure we see all changes when a cluster node joins
+                    last = new Revision(0, 0, machineId);
+                }
+                if (r.compareRevisionTime(last) > 0) {
+                    // OAK-2345
+                    // only consider as external change if
+                    // the revision changed for the machineId
+                    externalChanges.add(r);
+                    // collect external changes
+                    if (externalSort != null) {
+                        // add changes for this particular clusterId to the externalSort
+                        try {
+                            fillExternalChanges(externalSort, invalidate,
+                                    PathUtils.ROOT_PATH, last, r,
+                                    store.getDocumentStore(),
+                                    changeSetBuilder, journalPropertyHandler);
+                        } catch (Exception e1) {
+                            LOG.error("backgroundRead: Exception while reading external changes from journal: " + e1, e1);
+                            closeQuietly(externalSort);
+                            closeQuietly(invalidate);
+                            externalSort = null;
+                            invalidate = null;
+                        }
+                    }
+                }
+            }
+
+            stats.readHead = clock.getTime() - time;
+            time = clock.getTime();
+
+            // invalidate cache
+            if (cacheInvalidationNeeded(externalSort, invalidate)) {
+                // invalidate caches
+                if (externalSort == null) {
+                    // if no externalSort available, then invalidate everything
+                    invalidateCache();
+                } else {
+                    stats.numExternalChanges = externalSort.getSize();
+                    try {
+                        sortAndInvalidate(externalSort);
+                        sortAndInvalidate(invalidate);
+                    } catch (Exception ioe) {
+                        LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+                        invalidateCache();
+                    }
+                }
+                stats.cacheInvalidationTime = clock.getTime() - time;
+            }
+
+            // update head
+            if (!externalChanges.isEmpty()) {
+                updateHead(externalChanges, externalSort);
+            }
+        } finally {
+            closeQuietly(externalSort);
+            closeQuietly(invalidate);
+        }
+
+        return stats;
+    }
+
+    ChangeSetBuilder getChangeSetBuilder() {
+        return changeSetBuilder;
+    }
+
+    JournalPropertyHandler getJournalPropertyHandler() {
+        return journalPropertyHandler;
+    }
+
+    //-------------------------< internal >-------------------------------------
+
+    private boolean cacheInvalidationNeeded(StringSort externalSort,
+                                            StringSort invalidate) {
+        return externalSort == null || invalidate == null
+                || !externalSort.isEmpty() || !invalidate.isEmpty();
+    }
+
+    private void sortAndInvalidate(StringSort paths) throws IOException {
+        if (paths.isEmpty()) {
+            return;
+        }
+        paths.sort();
+        invalidateCache(paths);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java Wed Mar 22 09:12:51 2017
@@ -83,6 +83,7 @@ public final class FormatVersion impleme
      * Changes introduced with this version:
      * <ul>
      *     <li>SplitDocType.DEFAULT_NO_BRANCH (OAK-5869)</li>
+     *     <li>journal entries with invalidate-only changes (OAK-5964)</li>
      * </ul>
      */
     static final FormatVersion V1_8 = new FormatVersion(1, 8, 0);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java Wed Mar 22 09:12:51 2017
@@ -159,7 +159,13 @@ class JournalDiffLoader implements DiffC
                 // use revision with a timestamp of zero
                 from = new Revision(0, 0, to.getClusterId());
             }
-            stats.numJournalEntries += fillExternalChanges(changes, path, from, to, ns.getDocumentStore(), null, null);
+            StringSort invalidateOnly = JournalEntry.newSorter();
+            try {
+                stats.numJournalEntries += fillExternalChanges(changes, invalidateOnly,
+                        path, from, to, ns.getDocumentStore(), null, null);
+            } finally {
+                invalidateOnly.close();
+            }
         }
         // do we need to include changes from pending local changes?
         if (!max.isRevisionNewer(localLastRev)

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Wed Mar 22 09:12:51 2017
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.commons.PathUtils.ROOT_PATH;
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 
@@ -59,7 +60,8 @@ public final class JournalEntry extends
     /**
      * The revision format for external changes:
      * &lt;clusterId>-&lt;timestamp>-&lt;counter>. The string is prefixed with
-     * "b" if it denotes a branch revision.
+     * "b" if it denotes a branch revision or id of an {@link #INVALIDATE_ONLY}
+     * entry.
      */
     private static final String REVISION_FORMAT = "%d-%0" +
             Long.toHexString(Long.MAX_VALUE).length() + "x-%0" +
@@ -71,6 +73,8 @@ public final class JournalEntry extends
 
     static final String BRANCH_COMMITS = "_bc";
 
+    private static final String INVALIDATE_ONLY = "_inv";
+
     public static final String MODIFIED = "_modified";
 
     private static final int READ_CHUNK_SIZE = 100;
@@ -123,7 +127,7 @@ public final class JournalEntry extends
         });
     }
 
-    static void applyTo(@Nonnull StringSort externalSort,
+    static void applyTo(@Nonnull Iterable<String> changedPaths,
                         @Nonnull DiffCache diffCache,
                         @Nonnull String path,
                         @Nonnull RevisionVector from,
@@ -134,7 +138,7 @@ public final class JournalEntry extends
 
         final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
 
-        final Iterator<String> it = externalSort.getIds();
+        final Iterator<String> it = changedPaths.iterator();
         if (!it.hasNext()) {
             // nothing at all? that's quite unusual..
 
@@ -218,20 +222,25 @@ public final class JournalEntry extends
      * {@code to} revision, this method will fill external changes from the
      * next higher journal entry that contains the revision.
      *
-     * @param sorter the StringSort to which all externally changed paths
+     * @param externalChanges the StringSort to which all externally changed paths
      *               between the provided revisions will be added
+     * @param invalidate the StringSort to which paths of documents will be
+     *               added that must be invalidated if cached.
      * @param from   the lower bound of the revision range (exclusive).
      * @param to     the upper bound of the revision range (inclusive).
      * @param store  the document store to query.
      * @return the number of journal entries read from the store.
-     * @throws IOException
+     * @throws IOException if adding external changes to the {@code StringSort}
+     *          instances fails with an exception.
      */
-    static int fillExternalChanges(@Nonnull StringSort sorter,
+    static int fillExternalChanges(@Nonnull StringSort externalChanges,
+                                   @Nonnull StringSort invalidate,
                                    @Nonnull Revision from,
                                    @Nonnull Revision to,
                                    @Nonnull DocumentStore store)
             throws IOException {
-        return fillExternalChanges(sorter, PathUtils.ROOT_PATH, from, to, store, null, null);
+        return fillExternalChanges(externalChanges, invalidate, ROOT_PATH,
+                from, to, store, null, null);
     }
 
     /**
@@ -243,8 +252,10 @@ public final class JournalEntry extends
      * defines the scope of the external changes that should be read and filled
      * into the {@code sorter}.
      *
-     * @param sorter the StringSort to which all externally changed paths
+     * @param externalChanges the StringSort to which all externally changed paths
      *               between the provided revisions will be added
+     * @param invalidate the StringSort to which paths of documents will be
+     *               added that must be invalidated if cached.
      * @param path   a path that defines the scope of the changes to read.
      * @param from   the lower bound of the revision range (exclusive).
      * @param to     the upper bound of the revision range (inclusive).
@@ -254,9 +265,11 @@ public final class JournalEntry extends
      * @param journalPropertyHandler a nullable JournalPropertyHandler to read
      *                               stored journal properties for builders from JournalPropertyService
      * @return the number of journal entries read from the store.
-     * @throws IOException
+     * @throws IOException if adding external changes to the {@code StringSort}
+     *          instances fails with an exception.
      */
-    static int fillExternalChanges(@Nonnull StringSort sorter,
+    static int fillExternalChanges(@Nonnull StringSort externalChanges,
+                                   @Nonnull StringSort invalidate,
                                    @Nonnull String path,
                                    @Nonnull Revision from,
                                    @Nonnull Revision to,
@@ -300,7 +313,8 @@ public final class JournalEntry extends
             }
 
             for (JournalEntry d : partialResult) {
-                fillFromJournalEntry(sorter, path, changeSetBuilder, journalPropertyHandler, d);
+                fillFromJournalEntry(externalChanges, invalidate, path,
+                        changeSetBuilder, journalPropertyHandler, d);
             }
             if (partialResult.size() < READ_CHUNK_SIZE) {
                 break;
@@ -317,18 +331,22 @@ public final class JournalEntry extends
                 || (lastEntry != null && !lastEntry.getId().equals(inclusiveToId))) {
             String maxId = asId(new Revision(Long.MAX_VALUE, 0, to.getClusterId()));
             for (JournalEntry d : store.query(JOURNAL, inclusiveToId, maxId, 1)) {
-                fillFromJournalEntry(sorter, path, changeSetBuilder, journalPropertyHandler, d);
+                fillFromJournalEntry(externalChanges, invalidate, path,
+                        changeSetBuilder, journalPropertyHandler, d);
                 numEntries++;
             }
         }
         return numEntries;
     }
 
-    private static void fillFromJournalEntry(@Nonnull StringSort sorter, @Nonnull String path,
+    private static void fillFromJournalEntry(@Nonnull StringSort externalChanges,
+                                             @Nonnull StringSort invalidate,
+                                             @Nonnull String path,
                                              @Nullable ChangeSetBuilder changeSetBuilder,
                                              @Nullable JournalPropertyHandler journalPropertyHandler,
                                              JournalEntry d) throws IOException {
-        d.addTo(sorter, path);
+        d.addTo(externalChanges, path);
+        d.addInvalidateOnlyTo(invalidate);
         if (changeSetBuilder != null) {
             d.addTo(changeSetBuilder);
         }
@@ -430,10 +448,34 @@ public final class JournalEntry extends
         if (bc != null) {
             op.set(BRANCH_COMMITS, bc);
         }
+        String inv = (String) get(INVALIDATE_ONLY);
+        if (inv != null) {
+            op.set(INVALIDATE_ONLY, inv);
+        }
         return op;
     }
 
     /**
+     * Add the given {@code revisions} to the list of referenced journal entries
+     * that contain paths of documents that must be invalidated.
+     *
+     * @param revisions the references to invalidate-only journal entries.
+     */
+    void invalidate(@Nonnull Iterable<Revision> revisions) {
+        String value = (String) get(INVALIDATE_ONLY);
+        if (value == null) {
+            value = "";
+        }
+        for (Revision r : revisions) {
+            if (value.length() > 0) {
+                value += ",";
+            }
+            value += asId(r.asBranchRevision());
+        }
+        put(INVALIDATE_ONLY, value);
+    }
+
+    /**
      * Add changed paths in this journal entry that are in the scope of
      * {@code path} to {@code sort}.
      *
@@ -468,8 +510,52 @@ public final class JournalEntry extends
      */
     @Nonnull
     Iterable<JournalEntry> getBranchCommits() {
+        return getLinkedEntries(BRANCH_COMMITS);
+    }
+
+    /**
+     * @return number of changed nodes being tracked by this journal entry.
+     */
+    int getNumChangedNodes() {
+        return numChangedNodes;
+    }
+
+    /**
+     * @return if this entry has some changes to be pushed
+     */
+    boolean hasChanges() {
+        return numChangedNodes > 0 || hasBranchCommits;
+    }
+
+    //-----------------------------< internal >---------------------------------
+
+    private void addInvalidateOnlyTo(final StringSort sort) throws IOException {
+        TraversingVisitor v = new TraversingVisitor() {
+
+            @Override
+            public void node(TreeNode node, String path) throws IOException {
+                sort.add(path);
+            }
+        };
+        for (JournalEntry e : getInvalidateOnly()) {
+            e.getChanges().accept(v, "/");
+        }
+    }
+
+    /**
+     * Returns the invalidate-only entries that are related to this journal
+     * entry.
+     *
+     * @return the invalidate-only entries.
+     */
+    @Nonnull
+    private Iterable<JournalEntry> getInvalidateOnly() {
+        return getLinkedEntries(INVALIDATE_ONLY);
+    }
+
+    private Iterable<JournalEntry> getLinkedEntries(final String name) {
         final List<String> ids = Lists.newArrayList();
-        String bc = (String) get(BRANCH_COMMITS);
+        String bc = (String) get(name);
         if (bc != null) {
             for (String id : bc.split(",")) {
                 if (id.length() != 0) {
@@ -493,7 +579,7 @@ public final class JournalEntry extends
                         JournalEntry d = store.find(JOURNAL, id);
                         if (d == null) {
                             throw new IllegalStateException(
-                                    "Missing external change for branch revision: " + id);
+                                    "Missing " + name + " entry for revision: " + id);
                         }
                         return d;
                     }
@@ -502,22 +588,6 @@ public final class JournalEntry extends
         };
     }
 
-    /**
-     * @return number of changed nodes being tracked by this journal entry.
-     */
-    int getNumChangedNodes() {
-        return numChangedNodes;
-    }
-
-    /**
-     * @return if this entry has some changes to be pushed
-     */
-    boolean hasChanges() {
-        return numChangedNodes > 0 || hasBranchCommits;
-    }
-
-    //-----------------------------< internal >---------------------------------
-
     private static String getChanges(TreeNode node) {
         JsopBuilder builder = new JsopBuilder();
         for (String name : node.keySet()) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Mar 22 09:12:51 2017
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.sql.Timestamp;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -814,4 +816,50 @@ public class Utils {
             }
         };
     }
+
+    /**
+     * Makes sure the current time is after the most recent external revision
+     * timestamp in the _lastRev map of the given root document. If necessary
+     * the current thread waits until {@code clock} is after the external
+     * revision timestamp.
+     *
+     * @param rootDoc the root document.
+     * @param clock the clock.
+     * @param clusterId the local clusterId.
+     * @throws InterruptedException if the current thread is interrupted while
+     *          waiting. The interrupted status on the current thread is cleared
+     *          when this exception is thrown.
+     */
+    public static void alignWithExternalRevisions(@Nonnull NodeDocument rootDoc,
+                                                  @Nonnull Clock clock,
+                                                  int clusterId)
+            throws InterruptedException {
+        Map<Integer, Revision> lastRevMap = checkNotNull(rootDoc).getLastRev();
+        long externalTime = Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
+        long localTime = clock.getTime();
+        if (localTime < externalTime) {
+            LOG.warn("Detected clock differences. Local time is '{}', " +
+                            "while most recent external time is '{}'. " +
+                            "Current _lastRev entries: {}",
+                    new Date(localTime), new Date(externalTime), lastRevMap.values());
+            double delay = ((double) externalTime - localTime) / 1000d;
+            String fmt = "Background read will be delayed by %.1f seconds. " +
+                    "Please check system time on cluster nodes.";
+            String msg = String.format(fmt, delay);
+            LOG.warn(msg);
+            while (localTime + 60000 < externalTime) {
+                clock.waitUntil(localTime + 60000);
+                localTime = clock.getTime();
+                delay = ((double) externalTime - localTime) / 1000d;
+                LOG.warn(String.format(fmt, delay));
+            }
+            clock.waitUntil(externalTime + 1);
+        } else if (localTime == externalTime) {
+            // make sure local time is past external time
+            // but only log at debug
+            LOG.debug("Local and external time are equal. Waiting until local" +
+                    "time is more recent than external reported time.");
+            clock.waitUntil(externalTime + 1);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Wed Mar 22 09:12:51 2017
@@ -34,6 +34,8 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.junit.Test;
 
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
 import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -146,17 +148,61 @@ public class JournalEntryTest {
         assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
 
         StringSort sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r2, r3, store);
+        StringSort inv = JournalEntry.newSorter();
+        JournalEntry.fillExternalChanges(sort, inv, r2, r3, store);
         assertEquals(0, sort.getSize());
+        assertEquals(0, inv.getSize());
 
-        JournalEntry.fillExternalChanges(sort, r1, r2, store);
+        JournalEntry.fillExternalChanges(sort, inv, r1, r2, store);
         assertEquals(paths.size(), sort.getSize());
+        assertEquals(0, inv.getSize());
         sort.close();
 
         sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r1, r3, store);
+        JournalEntry.fillExternalChanges(sort, inv, r1, r3, store);
         assertEquals(paths.size(), sort.getSize());
+        assertEquals(0, inv.getSize());
         sort.close();
+        inv.close();
+    }
+
+    @Test
+    public void invalidateOnly() throws Exception {
+        DocumentStore store = new MemoryDocumentStore();
+        JournalEntry invalidateEntry = JOURNAL.newDocument(store);
+        Set<String> paths = Sets.newHashSet();
+        addRandomPaths(paths);
+        invalidateEntry.modified(paths);
+        Revision r1 = new Revision(1, 0, 1);
+        Revision r2 = new Revision(2, 0, 1);
+        Revision r3 = new Revision(3, 0, 1);
+        UpdateOp op = invalidateEntry.asUpdateOp(r1.asBranchRevision());
+        assertTrue(store.create(JOURNAL, singletonList(op)));
+
+        JournalEntry entry = JOURNAL.newDocument(store);
+        entry.invalidate(singleton(r1));
+        op = entry.asUpdateOp(r2);
+        assertTrue(store.create(JOURNAL, singletonList(op)));
+
+        StringSort sort = JournalEntry.newSorter();
+        StringSort inv = JournalEntry.newSorter();
+        JournalEntry.fillExternalChanges(sort, inv, r2, r3, store);
+        assertEquals(0, sort.getSize());
+        assertEquals(0, inv.getSize());
+
+        JournalEntry.fillExternalChanges(sort, inv, r1, r2, store);
+        assertEquals(1, sort.getSize());
+        assertEquals(paths.size(), inv.getSize());
+        inv.close();
+        sort.close();
+
+        sort = JournalEntry.newSorter();
+        inv = JournalEntry.newSorter();
+        JournalEntry.fillExternalChanges(sort, inv, r1, r3, store);
+        assertEquals(1, sort.getSize());
+        assertEquals(paths.size(), inv.getSize());
+        sort.close();
+        inv.close();
     }
 
     @Test
@@ -178,53 +224,43 @@ public class JournalEntryTest {
         op = entry.asUpdateOp(r4);
         assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
 
-        StringSort sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r1, r1, store);
+        StringSort sort = externalChanges(r1, r1, store);
         assertEquals(0, sort.getSize());
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r1, r2, store);
+        sort = externalChanges(r1, r2, store);
         assertEquals(Sets.newHashSet("/", "/foo"), Sets.newHashSet(sort));
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r1, r3, store);
+        sort = externalChanges(r1, r3, store);
         assertEquals(Sets.newHashSet("/", "/foo", "/bar"), Sets.newHashSet(sort));
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r1, r4, store);
+        sort = externalChanges(r1, r4, store);
         assertEquals(Sets.newHashSet("/", "/foo", "/bar"), Sets.newHashSet(sort));
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r2, r2, store);
+        sort = externalChanges(r2, r2, store);
         assertEquals(0, sort.getSize());
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r2, r3, store);
+        sort = externalChanges(r2, r3, store);
         assertEquals(Sets.newHashSet("/", "/bar"), Sets.newHashSet(sort));
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r2, r4, store);
+        sort = externalChanges(r2, r4, store);
         assertEquals(Sets.newHashSet("/", "/bar"), Sets.newHashSet(sort));
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r3, r3, store);
+        sort = externalChanges(r3, r3, store);
         assertEquals(0, sort.getSize());
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r3, r4, store);
+        sort = externalChanges(r3, r4, store);
         assertEquals(Sets.newHashSet("/", "/bar"), Sets.newHashSet(sort));
         sort.close();
 
-        sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, r4, r4, store);
+        sort = externalChanges(r4, r4, store);
         assertEquals(0, sort.getSize());
         sort.close();
     }
@@ -249,9 +285,12 @@ public class JournalEntryTest {
         assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
 
         StringSort sort = JournalEntry.newSorter();
-        JournalEntry.fillExternalChanges(sort, "/foo", r1, r2, store, null, null);
+        StringSort inv = JournalEntry.newSorter();
+        JournalEntry.fillExternalChanges(sort, inv, "/foo", r1, r2, store, null, null);
         assertEquals(4, sort.getSize());
+        assertEquals(0, inv.getSize());
         sort.close();
+        inv.close();
     }
 
     @Test
@@ -432,4 +471,17 @@ public class JournalEntryTest {
             assertTrue(loaderCalled.get());
         }
     }
+
+    private static StringSort externalChanges(Revision from,
+                                              Revision to,
+                                              DocumentStore store)
+            throws IOException {
+        StringSort changes = JournalEntry.newSorter();
+        StringSort invalidate = JournalEntry.newSorter();
+        try {
+            JournalEntry.fillExternalChanges(changes, invalidate, from, to, store);
+        } finally {
+            invalidate.close();
+        } return changes;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java Wed Mar 22 09:12:51 2017
@@ -26,18 +26,27 @@ import org.apache.jackrabbit.oak.api.Com
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -250,4 +259,29 @@ public class UtilsTest {
     public void getDepthFromIdIllegalArgumentException2() {
         Utils.getDepthFromId("42");
     }
+
+    @Test
+    public void alignWithExternalRevisions() throws Exception {
+        Clock c = new Clock.Virtual();
+        c.waitUntil(System.currentTimeMillis());
+        // past
+        Revision lastRev1 = new Revision(c.getTime() - 1000, 0, 1);
+        // future
+        Revision lastRev2 = new Revision(c.getTime() + 1000, 0, 2);
+
+        // create a root document
+        NodeDocument doc = new NodeDocument(new MemoryDocumentStore(), c.getTime());
+        UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), true);
+        NodeDocument.setLastRev(op, lastRev1);
+        NodeDocument.setLastRev(op, lastRev2);
+        UpdateUtils.applyChanges(doc, op);
+
+        // must not wait even if revision is in the future
+        Utils.alignWithExternalRevisions(doc, c, 2);
+        assertThat(c.getTime(), is(lessThan(lastRev2.getTimestamp())));
+
+        // must wait until after lastRev2 timestamp
+        Utils.alignWithExternalRevisions(doc, c, 1);
+        assertThat(c.getTime(), is(greaterThan(lastRev2.getTimestamp())));
+    }
 }