You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2017/03/22 09:12:52 UTC
svn commit: r1788073 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/util/
test/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/ja...
Author: mreutegg
Date: Wed Mar 22 09:12:51 2017
New Revision: 1788073
URL: http://svn.apache.org/viewvc?rev=1788073&view=rev
Log:
OAK-5964: Invalidate documents through journal
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Mar 22 09:12:51 2017
@@ -31,9 +31,9 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.MANY_CHILDREN_THRESHOLD;
-import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.alignWithExternalRevisions;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.pathToId;
import static org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
@@ -44,7 +44,6 @@ import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.text.SimpleDateFormat;
import java.util.Collections;
-import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -83,7 +82,6 @@ import com.google.common.util.concurrent
import org.apache.jackrabbit.api.stats.TimeSeries;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
-import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.core.SimpleCommitContext;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
@@ -101,7 +99,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.commons.json.JsopStream;
import org.apache.jackrabbit.oak.commons.json.JsopWriter;
-import org.apache.jackrabbit.oak.commons.sort.StringSort;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -305,6 +302,9 @@ public final class DocumentNodeStore
*/
private final Object backgroundReadMonitor = new Object();
+ /**
+ * Background thread performing updates of _lastRev entries.
+ */
private Thread backgroundUpdateThread;
/**
@@ -2003,7 +2003,7 @@ public final class DocumentNodeStore
* cluster node.
*/
@Nonnull
- RevisionVector getMinExternalRevisions() {
+ private RevisionVector getMinExternalRevisions() {
return new RevisionVector(transform(filter(clusterNodes.values(),
new Predicate<ClusterNodeInfoDocument>() {
@Override
@@ -2023,75 +2023,21 @@ public final class DocumentNodeStore
* Perform a background read and make external changes visible.
*/
private BackgroundReadStats backgroundRead() {
- BackgroundReadStats stats = new BackgroundReadStats();
- long time = clock.getTime();
- String id = Utils.getIdFromPath("/");
- NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
- if (doc == null) {
- return stats;
- }
- alignWithExternalRevisions(doc);
-
- StringSort externalSort = JournalEntry.newSorter();
-
- Map<Integer, Revision> lastRevMap = doc.getLastRev();
- try {
- ChangeSetBuilder changeSetBuilder = newChangeSetBuilder();
- JournalPropertyHandler journalPropertyHandler = journalPropertyHandlerFactory.newHandler();
- RevisionVector headRevision = getHeadRevision();
- Set<Revision> externalChanges = Sets.newHashSet();
- for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
- int machineId = e.getKey();
- if (machineId == clusterId) {
- // ignore own lastRev
- continue;
- }
- Revision r = e.getValue();
- Revision last = headRevision.getRevision(machineId);
- if (last == null) {
- // make sure we see all changes when a cluster node joins
- last = new Revision(0, 0, machineId);
- }
- if (r.compareRevisionTime(last) > 0) {
- // OAK-2345
- // only consider as external change if
- // the revision changed for the machineId
- externalChanges.add(r);
- // collect external changes
- if (externalSort != null) {
- // add changes for this particular clusterId to the externalSort
- try {
- fillExternalChanges(externalSort, PathUtils.ROOT_PATH, last, r, store, changeSetBuilder, journalPropertyHandler);
- } catch (Exception e1) { // OAK-5601 : catch any Exception, not only IOException
- LOG.error("backgroundRead: Exception while reading external changes from journal: " + e1, e1);
- IOUtils.closeQuietly(externalSort);
- externalSort = null;
- }
- }
- }
+ return new ExternalChange(this) {
+ @Override
+ void invalidateCache(@Nonnull Iterable<String> paths) {
+ stats.cacheStats = store.invalidateCache(pathToId(paths));
}
- stats.readHead = clock.getTime() - time;
- time = clock.getTime();
-
- if (!externalChanges.isEmpty()) {
- // invalidate caches
- if (externalSort == null) {
- // if no externalSort available, then invalidate the classic way: everything
- stats.cacheStats = store.invalidateCache();
- } else {
- try {
- externalSort.sort();
- stats.numExternalChanges = externalSort.getSize();
- stats.cacheStats = store.invalidateCache(pathToId(externalSort));
- } catch (Exception ioe) {
- LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
- stats.cacheStats = store.invalidateCache();
- }
- }
- stats.cacheInvalidationTime = clock.getTime() - time;
- time = clock.getTime();
+ @Override
+ void invalidateCache() {
+ stats.cacheStats = store.invalidateCache();
+ }
+ @Override
+ void updateHead(@Nonnull Set<Revision> externalChanges,
+ @Nullable Iterable<String> changedPaths) {
+ long time = clock.getTime();
// make sure no local commit is in progress
backgroundOperationLock.writeLock().lock();
try {
@@ -2105,32 +2051,29 @@ public final class DocumentNodeStore
setRoot(newHead);
commitQueue.headRevisionChanged();
time = clock.getTime();
- if (externalSort != null) {
+ if (changedPaths != null) {
// then there were external changes and reading them
// was successful -> apply them to the diff cache
try {
- JournalEntry.applyTo(externalSort, diffCache,
+ JournalEntry.applyTo(changedPaths, diffCache,
PathUtils.ROOT_PATH, oldHead, newHead);
} catch (Exception e1) {
- LOG.error("backgroundRead: Exception while processing external changes from journal: {}", e1, e1);
+ LOG.error("backgroundRead: Exception while processing external changes from journal: " + e1, e1);
}
}
stats.populateDiffCache = clock.getTime() - time;
time = clock.getTime();
- ChangeSet changeSet = changeSetBuilder.build();
+ ChangeSet changeSet = getChangeSetBuilder().build();
LOG.debug("Dispatching external change with ChangeSet {}", changeSet);
- dispatcher.contentChanged(getRoot().fromExternalChange(), newCommitInfo(changeSet, journalPropertyHandler));
+ dispatcher.contentChanged(getRoot().fromExternalChange(),
+ newCommitInfo(changeSet, getJournalPropertyHandler()));
} finally {
backgroundOperationLock.writeLock().unlock();
}
stats.dispatchChanges = clock.getTime() - time;
}
- } finally {
- IOUtils.closeQuietly(externalSort);
- }
-
- return stats;
+ }.process();
}
private static CommitInfo newCommitInfo(@Nonnull ChangeSet changeSet, JournalPropertyHandler journalPropertyHandler) {
@@ -2330,52 +2273,15 @@ public final class DocumentNodeStore
private void initializeRootState(NodeDocument rootDoc) {
checkState(root == null);
- alignWithExternalRevisions(rootDoc);
- RevisionVector headRevision = new RevisionVector(
- rootDoc.getLastRev().values()).update(newRevision());
- setRoot(headRevision);
- }
-
- /**
- * Makes sure the current time is after the most recent external revision
- * timestamp in the _lastRev map of the given root document. If necessary
- * the current thread waits until {@link #clock} is after the external
- * revision timestamp.
- *
- * @param rootDoc the root document.
- */
- private void alignWithExternalRevisions(@Nonnull NodeDocument rootDoc) {
- Map<Integer, Revision> lastRevMap = checkNotNull(rootDoc).getLastRev();
try {
- long externalTime = Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
- long localTime = clock.getTime();
- if (localTime < externalTime) {
- LOG.warn("Detected clock differences. Local time is '{}', " +
- "while most recent external time is '{}'. " +
- "Current _lastRev entries: {}",
- new Date(localTime), new Date(externalTime), lastRevMap.values());
- double delay = ((double) externalTime - localTime) / 1000d;
- String fmt = "Background read will be delayed by %.1f seconds. " +
- "Please check system time on cluster nodes.";
- String msg = String.format(fmt, delay);
- LOG.warn(msg);
- while (localTime + 60000 < externalTime) {
- clock.waitUntil(localTime + 60000);
- localTime = clock.getTime();
- delay = ((double) externalTime - localTime) / 1000d;
- LOG.warn(String.format(fmt, delay));
- }
- clock.waitUntil(externalTime + 1);
- } else if (localTime == externalTime) {
- // make sure local time is past external time
- // but only log at debug
- LOG.debug("Local and external time are equal. Waiting until local" +
- "time is more recent than external reported time.");
- clock.waitUntil(externalTime + 1);
- }
+ alignWithExternalRevisions(rootDoc, clock, clusterId);
} catch (InterruptedException e) {
- throw new RuntimeException("Background read interrupted", e);
+ throw new DocumentStoreException("Interrupted while aligning with " +
+ "external revision: " + rootDoc.getLastRev());
}
+ RevisionVector headRevision = new RevisionVector(
+ rootDoc.getLastRev().values()).update(newRevision());
+ setRoot(headRevision);
}
@Nonnull
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java?rev=1788073&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java Wed Mar 22 09:12:51 2017
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.sort.StringSort;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSetBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
+import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.newSorter;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.alignWithExternalRevisions;
+
+/**
+ * Utility class to pull in external changes in the DocumentNodeStore and
+ * process journal entries.
+ */
+abstract class ExternalChange {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExternalChange.class);
+
+ private final DocumentNodeStore store;
+
+ protected final BackgroundReadStats stats;
+
+ private final ChangeSetBuilder changeSetBuilder;
+
+ private final JournalPropertyHandler journalPropertyHandler;
+
+ ExternalChange(DocumentNodeStore store) {
+ this.store = store;
+ this.stats = new BackgroundReadStats();
+ this.changeSetBuilder = new ChangeSetBuilder(
+ store.getChangeSetMaxItems(), store.getChangeSetMaxDepth());
+ this.journalPropertyHandler = store.getJournalPropertyHandlerFactory().newHandler();
+ }
+
+ /**
+ * Called when when cache entries related to nodes with the given paths
+ * must be invalidated.
+ *
+ * @param paths the paths of affected nodes.
+ */
+ abstract void invalidateCache(@Nonnull Iterable<String> paths);
+
+ /**
+ * Called when all cache entries must be invalidated.
+ */
+ abstract void invalidateCache();
+
+ /**
+ * Called when the current head should be updated with revisions of external
+ * changes.
+ *
+ * @param externalChanges the head revision of other cluster nodes that
+ * changed and should now be considered visible.
+ * @param changedPaths paths of nodes that are affected by those external
+ * changes.
+ */
+ abstract void updateHead(@Nonnull Set<Revision> externalChanges,
+ @Nullable Iterable<String> changedPaths);
+
+ /**
+ * Processes external changes if there are any.
+ *
+ * @return statistics about the background read operation.
+ */
+ BackgroundReadStats process() {
+ Clock clock = store.getClock();
+ int clusterId = store.getClusterId();
+ long time = clock.getTime();
+ String id = Utils.getIdFromPath("/");
+ NodeDocument doc = store.getDocumentStore().find(NODES, id, store.getAsyncDelay());
+ if (doc == null) {
+ return stats;
+ }
+ try {
+ alignWithExternalRevisions(doc, clock, clusterId);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Background read interrupted", e);
+ }
+
+ StringSort externalSort = newSorter();
+ StringSort invalidate = newSorter();
+
+ Map<Integer, Revision> lastRevMap = doc.getLastRev();
+ try {
+ RevisionVector headRevision = store.getHeadRevision();
+ Set<Revision> externalChanges = newHashSet();
+ for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
+ int machineId = e.getKey();
+ if (machineId == clusterId) {
+ // ignore own lastRev
+ continue;
+ }
+ Revision r = e.getValue();
+ Revision last = headRevision.getRevision(machineId);
+ if (last == null) {
+ // make sure we see all changes when a cluster node joins
+ last = new Revision(0, 0, machineId);
+ }
+ if (r.compareRevisionTime(last) > 0) {
+ // OAK-2345
+ // only consider as external change if
+ // the revision changed for the machineId
+ externalChanges.add(r);
+ // collect external changes
+ if (externalSort != null) {
+ // add changes for this particular clusterId to the externalSort
+ try {
+ fillExternalChanges(externalSort, invalidate,
+ PathUtils.ROOT_PATH, last, r,
+ store.getDocumentStore(),
+ changeSetBuilder, journalPropertyHandler);
+ } catch (Exception e1) {
+ LOG.error("backgroundRead: Exception while reading external changes from journal: " + e1, e1);
+ closeQuietly(externalSort);
+ closeQuietly(invalidate);
+ externalSort = null;
+ invalidate = null;
+ }
+ }
+ }
+ }
+
+ stats.readHead = clock.getTime() - time;
+ time = clock.getTime();
+
+ // invalidate cache
+ if (cacheInvalidationNeeded(externalSort, invalidate)) {
+ // invalidate caches
+ if (externalSort == null) {
+ // if no externalSort available, then invalidate everything
+ invalidateCache();
+ } else {
+ stats.numExternalChanges = externalSort.getSize();
+ try {
+ sortAndInvalidate(externalSort);
+ sortAndInvalidate(invalidate);
+ } catch (Exception ioe) {
+ LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+ invalidateCache();
+ }
+ }
+ stats.cacheInvalidationTime = clock.getTime() - time;
+ }
+
+ // update head
+ if (!externalChanges.isEmpty()) {
+ updateHead(externalChanges, externalSort);
+ }
+ } finally {
+ closeQuietly(externalSort);
+ closeQuietly(invalidate);
+ }
+
+ return stats;
+ }
+
+ ChangeSetBuilder getChangeSetBuilder() {
+ return changeSetBuilder;
+ }
+
+ JournalPropertyHandler getJournalPropertyHandler() {
+ return journalPropertyHandler;
+ }
+
+ //-------------------------< internal >-------------------------------------
+
+ private boolean cacheInvalidationNeeded(StringSort externalSort,
+ StringSort invalidate) {
+ return externalSort == null || invalidate == null
+ || !externalSort.isEmpty() || !invalidate.isEmpty();
+ }
+
+ private void sortAndInvalidate(StringSort paths) throws IOException {
+ if (paths.isEmpty()) {
+ return;
+ }
+ paths.sort();
+ invalidateCache(paths);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ExternalChange.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FormatVersion.java Wed Mar 22 09:12:51 2017
@@ -83,6 +83,7 @@ public final class FormatVersion impleme
* Changes introduced with this version:
* <ul>
* <li>SplitDocType.DEFAULT_NO_BRANCH (OAK-5869)</li>
+ * <li>journal entries with invalidate-only changes (OAK-5964)</li>
* </ul>
*/
static final FormatVersion V1_8 = new FormatVersion(1, 8, 0);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalDiffLoader.java Wed Mar 22 09:12:51 2017
@@ -159,7 +159,13 @@ class JournalDiffLoader implements DiffC
// use revision with a timestamp of zero
from = new Revision(0, 0, to.getClusterId());
}
- stats.numJournalEntries += fillExternalChanges(changes, path, from, to, ns.getDocumentStore(), null, null);
+ StringSort invalidateOnly = JournalEntry.newSorter();
+ try {
+ stats.numJournalEntries += fillExternalChanges(changes, invalidateOnly,
+ path, from, to, ns.getDocumentStore(), null, null);
+ } finally {
+ invalidateOnly.close();
+ }
}
// do we need to include changes from pending local changes?
if (!max.isRevisionNewer(localLastRev)
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Wed Mar 22 09:12:51 2017
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.commons.PathUtils.ROOT_PATH;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
@@ -59,7 +60,8 @@ public final class JournalEntry extends
/**
* The revision format for external changes:
* <clusterId>-<timestamp>-<counter>. The string is prefixed with
- * "b" if it denotes a branch revision.
+ * "b" if it denotes a branch revision or id of an {@link #INVALIDATE_ONLY}
+ * entry.
*/
private static final String REVISION_FORMAT = "%d-%0" +
Long.toHexString(Long.MAX_VALUE).length() + "x-%0" +
@@ -71,6 +73,8 @@ public final class JournalEntry extends
static final String BRANCH_COMMITS = "_bc";
+ private static final String INVALIDATE_ONLY = "_inv";
+
public static final String MODIFIED = "_modified";
private static final int READ_CHUNK_SIZE = 100;
@@ -123,7 +127,7 @@ public final class JournalEntry extends
});
}
- static void applyTo(@Nonnull StringSort externalSort,
+ static void applyTo(@Nonnull Iterable<String> changedPaths,
@Nonnull DiffCache diffCache,
@Nonnull String path,
@Nonnull RevisionVector from,
@@ -134,7 +138,7 @@ public final class JournalEntry extends
final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
- final Iterator<String> it = externalSort.getIds();
+ final Iterator<String> it = changedPaths.iterator();
if (!it.hasNext()) {
// nothing at all? that's quite unusual..
@@ -218,20 +222,25 @@ public final class JournalEntry extends
* {@code to} revision, this method will fill external changes from the
* next higher journal entry that contains the revision.
*
- * @param sorter the StringSort to which all externally changed paths
+ * @param externalChanges the StringSort to which all externally changed paths
* between the provided revisions will be added
+ * @param invalidate the StringSort to which paths of documents will be
+ * added that must be invalidated if cached.
* @param from the lower bound of the revision range (exclusive).
* @param to the upper bound of the revision range (inclusive).
* @param store the document store to query.
* @return the number of journal entries read from the store.
- * @throws IOException
+ * @throws IOException if adding external changes to the {@code StringSort}
+ * instances fails with an exception.
*/
- static int fillExternalChanges(@Nonnull StringSort sorter,
+ static int fillExternalChanges(@Nonnull StringSort externalChanges,
+ @Nonnull StringSort invalidate,
@Nonnull Revision from,
@Nonnull Revision to,
@Nonnull DocumentStore store)
throws IOException {
- return fillExternalChanges(sorter, PathUtils.ROOT_PATH, from, to, store, null, null);
+ return fillExternalChanges(externalChanges, invalidate, ROOT_PATH,
+ from, to, store, null, null);
}
/**
@@ -243,8 +252,10 @@ public final class JournalEntry extends
* defines the scope of the external changes that should be read and filled
* into the {@code sorter}.
*
- * @param sorter the StringSort to which all externally changed paths
+ * @param externalChanges the StringSort to which all externally changed paths
* between the provided revisions will be added
+ * @param invalidate the StringSort to which paths of documents will be
+ * added that must be invalidated if cached.
* @param path a path that defines the scope of the changes to read.
* @param from the lower bound of the revision range (exclusive).
* @param to the upper bound of the revision range (inclusive).
@@ -254,9 +265,11 @@ public final class JournalEntry extends
* @param journalPropertyHandler a nullable JournalPropertyHandler to read
* stored journal properties for builders from JournalPropertyService
* @return the number of journal entries read from the store.
- * @throws IOException
+ * @throws IOException if adding external changes to the {@code StringSort}
+ * instances fails with an exception.
*/
- static int fillExternalChanges(@Nonnull StringSort sorter,
+ static int fillExternalChanges(@Nonnull StringSort externalChanges,
+ @Nonnull StringSort invalidate,
@Nonnull String path,
@Nonnull Revision from,
@Nonnull Revision to,
@@ -300,7 +313,8 @@ public final class JournalEntry extends
}
for (JournalEntry d : partialResult) {
- fillFromJournalEntry(sorter, path, changeSetBuilder, journalPropertyHandler, d);
+ fillFromJournalEntry(externalChanges, invalidate, path,
+ changeSetBuilder, journalPropertyHandler, d);
}
if (partialResult.size() < READ_CHUNK_SIZE) {
break;
@@ -317,18 +331,22 @@ public final class JournalEntry extends
|| (lastEntry != null && !lastEntry.getId().equals(inclusiveToId))) {
String maxId = asId(new Revision(Long.MAX_VALUE, 0, to.getClusterId()));
for (JournalEntry d : store.query(JOURNAL, inclusiveToId, maxId, 1)) {
- fillFromJournalEntry(sorter, path, changeSetBuilder, journalPropertyHandler, d);
+ fillFromJournalEntry(externalChanges, invalidate, path,
+ changeSetBuilder, journalPropertyHandler, d);
numEntries++;
}
}
return numEntries;
}
- private static void fillFromJournalEntry(@Nonnull StringSort sorter, @Nonnull String path,
+ private static void fillFromJournalEntry(@Nonnull StringSort externalChanges,
+ @Nonnull StringSort invalidate,
+ @Nonnull String path,
@Nullable ChangeSetBuilder changeSetBuilder,
@Nullable JournalPropertyHandler journalPropertyHandler,
JournalEntry d) throws IOException {
- d.addTo(sorter, path);
+ d.addTo(externalChanges, path);
+ d.addInvalidateOnlyTo(invalidate);
if (changeSetBuilder != null) {
d.addTo(changeSetBuilder);
}
@@ -430,10 +448,34 @@ public final class JournalEntry extends
if (bc != null) {
op.set(BRANCH_COMMITS, bc);
}
+ String inv = (String) get(INVALIDATE_ONLY);
+ if (inv != null) {
+ op.set(INVALIDATE_ONLY, inv);
+ }
return op;
}
/**
+ * Add the given {@code revisions} to the list of referenced journal entries
+ * that contain paths of documents that must be invalidated.
+ *
+ * @param revisions the references to invalidate-only journal entries.
+ */
+ void invalidate(@Nonnull Iterable<Revision> revisions) {
+ String value = (String) get(INVALIDATE_ONLY);
+ if (value == null) {
+ value = "";
+ }
+ for (Revision r : revisions) {
+ if (value.length() > 0) {
+ value += ",";
+ }
+ value += asId(r.asBranchRevision());
+ }
+ put(INVALIDATE_ONLY, value);
+ }
+
+ /**
* Add changed paths in this journal entry that are in the scope of
* {@code path} to {@code sort}.
*
@@ -468,8 +510,52 @@ public final class JournalEntry extends
*/
@Nonnull
Iterable<JournalEntry> getBranchCommits() {
+ return getLinkedEntries(BRANCH_COMMITS);
+ }
+
+ /**
+ * @return number of changed nodes being tracked by this journal entry.
+ */
+ int getNumChangedNodes() {
+ return numChangedNodes;
+ }
+
+ /**
+ * @return if this entry has some changes to be pushed
+ */
+ boolean hasChanges() {
+ return numChangedNodes > 0 || hasBranchCommits;
+ }
+
+ //-----------------------------< internal >---------------------------------
+
+ private void addInvalidateOnlyTo(final StringSort sort) throws IOException {
+ TraversingVisitor v = new TraversingVisitor() {
+
+ @Override
+ public void node(TreeNode node, String path) throws IOException {
+ sort.add(path);
+ }
+ };
+ for (JournalEntry e : getInvalidateOnly()) {
+ e.getChanges().accept(v, "/");
+ }
+ }
+
+ /**
+ * Returns the invalidate-only entries that are related to this journal
+ * entry.
+ *
+ * @return the invalidate-only entries.
+ */
+ @Nonnull
+ private Iterable<JournalEntry> getInvalidateOnly() {
+ return getLinkedEntries(INVALIDATE_ONLY);
+ }
+
+ private Iterable<JournalEntry> getLinkedEntries(final String name) {
final List<String> ids = Lists.newArrayList();
- String bc = (String) get(BRANCH_COMMITS);
+ String bc = (String) get(name);
if (bc != null) {
for (String id : bc.split(",")) {
if (id.length() != 0) {
@@ -493,7 +579,7 @@ public final class JournalEntry extends
JournalEntry d = store.find(JOURNAL, id);
if (d == null) {
throw new IllegalStateException(
- "Missing external change for branch revision: " + id);
+ "Missing " + name + " entry for revision: " + id);
}
return d;
}
@@ -502,22 +588,6 @@ public final class JournalEntry extends
};
}
- /**
- * @return number of changed nodes being tracked by this journal entry.
- */
- int getNumChangedNodes() {
- return numChangedNodes;
- }
-
- /**
- * @return if this entry has some changes to be pushed
- */
- boolean hasChanges() {
- return numChangedNodes > 0 || hasBranchCommits;
- }
-
- //-----------------------------< internal >---------------------------------
-
private static String getChanges(TreeNode node) {
JsopBuilder builder = new JsopBuilder();
for (String name : node.keySet()) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java Wed Mar 22 09:12:51 2017
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Timestamp;
import java.util.Comparator;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,6 +48,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -814,4 +816,50 @@ public class Utils {
}
};
}
+
+ /**
+ * Makes sure the current time is after the most recent external revision
+ * timestamp in the _lastRev map of the given root document. If necessary
+ * the current thread waits until {@code clock} is after the external
+ * revision timestamp.
+ *
+ * @param rootDoc the root document.
+ * @param clock the clock.
+ * @param clusterId the local clusterId.
+ * @throws InterruptedException if the current thread is interrupted while
+ * waiting. The interrupted status on the current thread is cleared
+ * when this exception is thrown.
+ */
+ public static void alignWithExternalRevisions(@Nonnull NodeDocument rootDoc,
+ @Nonnull Clock clock,
+ int clusterId)
+ throws InterruptedException {
+ Map<Integer, Revision> lastRevMap = checkNotNull(rootDoc).getLastRev();
+ long externalTime = Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
+ long localTime = clock.getTime();
+ if (localTime < externalTime) {
+ LOG.warn("Detected clock differences. Local time is '{}', " +
+ "while most recent external time is '{}'. " +
+ "Current _lastRev entries: {}",
+ new Date(localTime), new Date(externalTime), lastRevMap.values());
+ double delay = ((double) externalTime - localTime) / 1000d;
+ String fmt = "Background read will be delayed by %.1f seconds. " +
+ "Please check system time on cluster nodes.";
+ String msg = String.format(fmt, delay);
+ LOG.warn(msg);
+ while (localTime + 60000 < externalTime) {
+ clock.waitUntil(localTime + 60000);
+ localTime = clock.getTime();
+ delay = ((double) externalTime - localTime) / 1000d;
+ LOG.warn(String.format(fmt, delay));
+ }
+ clock.waitUntil(externalTime + 1);
+ } else if (localTime == externalTime) {
+ // make sure local time is past external time
+ // but only log at debug
+ LOG.debug("Local and external time are equal. Waiting until local" +
+ "time is more recent than external reported time.");
+ clock.waitUntil(externalTime + 1);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Wed Mar 22 09:12:51 2017
@@ -34,6 +34,8 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.junit.Test;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -146,17 +148,61 @@ public class JournalEntryTest {
assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
StringSort sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r2, r3, store);
+ StringSort inv = JournalEntry.newSorter();
+ JournalEntry.fillExternalChanges(sort, inv, r2, r3, store);
assertEquals(0, sort.getSize());
+ assertEquals(0, inv.getSize());
- JournalEntry.fillExternalChanges(sort, r1, r2, store);
+ JournalEntry.fillExternalChanges(sort, inv, r1, r2, store);
assertEquals(paths.size(), sort.getSize());
+ assertEquals(0, inv.getSize());
sort.close();
sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r1, r3, store);
+ JournalEntry.fillExternalChanges(sort, inv, r1, r3, store);
assertEquals(paths.size(), sort.getSize());
+ assertEquals(0, inv.getSize());
sort.close();
+ inv.close();
+ }
+
+ @Test
+ public void invalidateOnly() throws Exception {
+ DocumentStore store = new MemoryDocumentStore();
+ JournalEntry invalidateEntry = JOURNAL.newDocument(store);
+ Set<String> paths = Sets.newHashSet();
+ addRandomPaths(paths);
+ invalidateEntry.modified(paths);
+ Revision r1 = new Revision(1, 0, 1);
+ Revision r2 = new Revision(2, 0, 1);
+ Revision r3 = new Revision(3, 0, 1);
+ UpdateOp op = invalidateEntry.asUpdateOp(r1.asBranchRevision());
+ assertTrue(store.create(JOURNAL, singletonList(op)));
+
+ JournalEntry entry = JOURNAL.newDocument(store);
+ entry.invalidate(singleton(r1));
+ op = entry.asUpdateOp(r2);
+ assertTrue(store.create(JOURNAL, singletonList(op)));
+
+ StringSort sort = JournalEntry.newSorter();
+ StringSort inv = JournalEntry.newSorter();
+ JournalEntry.fillExternalChanges(sort, inv, r2, r3, store);
+ assertEquals(0, sort.getSize());
+ assertEquals(0, inv.getSize());
+
+ JournalEntry.fillExternalChanges(sort, inv, r1, r2, store);
+ assertEquals(1, sort.getSize());
+ assertEquals(paths.size(), inv.getSize());
+ inv.close();
+ sort.close();
+
+ sort = JournalEntry.newSorter();
+ inv = JournalEntry.newSorter();
+ JournalEntry.fillExternalChanges(sort, inv, r1, r3, store);
+ assertEquals(1, sort.getSize());
+ assertEquals(paths.size(), inv.getSize());
+ sort.close();
+ inv.close();
}
@Test
@@ -178,53 +224,43 @@ public class JournalEntryTest {
op = entry.asUpdateOp(r4);
assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
- StringSort sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r1, r1, store);
+ StringSort sort = externalChanges(r1, r1, store);
assertEquals(0, sort.getSize());
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r1, r2, store);
+ sort = externalChanges(r1, r2, store);
assertEquals(Sets.newHashSet("/", "/foo"), Sets.newHashSet(sort));
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r1, r3, store);
+ sort = externalChanges(r1, r3, store);
assertEquals(Sets.newHashSet("/", "/foo", "/bar"), Sets.newHashSet(sort));
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r1, r4, store);
+ sort = externalChanges(r1, r4, store);
assertEquals(Sets.newHashSet("/", "/foo", "/bar"), Sets.newHashSet(sort));
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r2, r2, store);
+ sort = externalChanges(r2, r2, store);
assertEquals(0, sort.getSize());
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r2, r3, store);
+ sort = externalChanges(r2, r3, store);
assertEquals(Sets.newHashSet("/", "/bar"), Sets.newHashSet(sort));
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r2, r4, store);
+ sort = externalChanges(r2, r4, store);
assertEquals(Sets.newHashSet("/", "/bar"), Sets.newHashSet(sort));
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r3, r3, store);
+ sort = externalChanges(r3, r3, store);
assertEquals(0, sort.getSize());
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r3, r4, store);
+ sort = externalChanges(r3, r4, store);
assertEquals(Sets.newHashSet("/", "/bar"), Sets.newHashSet(sort));
sort.close();
- sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, r4, r4, store);
+ sort = externalChanges(r4, r4, store);
assertEquals(0, sort.getSize());
sort.close();
}
@@ -249,9 +285,12 @@ public class JournalEntryTest {
assertTrue(store.create(JOURNAL, Collections.singletonList(op)));
StringSort sort = JournalEntry.newSorter();
- JournalEntry.fillExternalChanges(sort, "/foo", r1, r2, store, null, null);
+ StringSort inv = JournalEntry.newSorter();
+ JournalEntry.fillExternalChanges(sort, inv, "/foo", r1, r2, store, null, null);
assertEquals(4, sort.getSize());
+ assertEquals(0, inv.getSize());
sort.close();
+ inv.close();
}
@Test
@@ -432,4 +471,17 @@ public class JournalEntryTest {
assertTrue(loaderCalled.get());
}
}
+
+ private static StringSort externalChanges(Revision from,
+ Revision to,
+ DocumentStore store)
+ throws IOException {
+ StringSort changes = JournalEntry.newSorter();
+ StringSort invalidate = JournalEntry.newSorter();
+ try {
+ JournalEntry.fillExternalChanges(changes, invalidate, from, to, store);
+ } finally {
+ invalidate.close();
+ } return changes;
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java?rev=1788073&r1=1788072&r2=1788073&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java Wed Mar 22 09:12:51 2017
@@ -26,18 +26,27 @@ import org.apache.jackrabbit.oak.api.Com
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.Ignore;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -250,4 +259,29 @@ public class UtilsTest {
public void getDepthFromIdIllegalArgumentException2() {
Utils.getDepthFromId("42");
}
+
+ @Test
+ public void alignWithExternalRevisions() throws Exception {
+ Clock c = new Clock.Virtual();
+ c.waitUntil(System.currentTimeMillis());
+ // past
+ Revision lastRev1 = new Revision(c.getTime() - 1000, 0, 1);
+ // future
+ Revision lastRev2 = new Revision(c.getTime() + 1000, 0, 2);
+
+ // create a root document
+ NodeDocument doc = new NodeDocument(new MemoryDocumentStore(), c.getTime());
+ UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), true);
+ NodeDocument.setLastRev(op, lastRev1);
+ NodeDocument.setLastRev(op, lastRev2);
+ UpdateUtils.applyChanges(doc, op);
+
+ // must not wait even if revision is in the future
+ Utils.alignWithExternalRevisions(doc, c, 2);
+ assertThat(c.getTime(), is(lessThan(lastRev2.getTimestamp())));
+
+ // must wait until after lastRev2 timestamp
+ Utils.alignWithExternalRevisions(doc, c, 1);
+ assertThat(c.getTime(), is(greaterThan(lastRev2.getTimestamp())));
+ }
}