You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/07/01 15:37:36 UTC
svn commit: r1688649 [1/2] - in /jackrabbit/oak/branches/1.2: ./
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugi...
Author: mreutegg
Date: Wed Jul 1 13:37:35 2015
New Revision: 1688649
URL: http://svn.apache.org/r1688649
Log:
OAK-2829: Comparing node states for external changes is too slow
OAK-3002: Optimize docCache and docChildrenCache invalidation by filtering using journal
Merged revisions 1678023,1678171,1684820,1685590,1685964,1685977,1685989,1686023,1686032,1688179 from trunk
Added:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
- copied, changed from r1684820, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java
- copied, changed from r1684820, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java
- copied unchanged from r1688179, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
- copied, changed from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java
- copied, changed from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
- copied, changed from r1685977, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
- copied, changed from r1684820, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
- copied unchanged from r1688179, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
Modified:
jackrabbit/oak/branches/1.2/ (props changed)
jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java
jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java
jackrabbit/oak/branches/1.2/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
jackrabbit/oak/branches/1.2/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jul 1 13:37:35 2015
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684868,1685023,1685370,1685552,1685589,1685840,1685999,1686097,1686162,1686229,1686234
,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688349,1688421,1688436,1688453,1688616,1688622,1688636
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685370,1685552,1685589-1685590,1685840,1685964
,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688636
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java (original)
+++ jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java Wed Jul 1 13:37:35 2015
@@ -35,6 +35,7 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.io.Files;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.slf4j.Logger;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
* the list would be maintained in memory. If the size crosses the required threshold then
* the sorting would be performed externally
*/
-public class StringSort implements Closeable {
+public class StringSort implements Iterable<String>, Closeable {
private final Logger log = LoggerFactory.getLogger(getClass());
public static final int BATCH_SIZE = 2048;
@@ -117,6 +118,17 @@ public class StringSort implements Close
}
}
+ @Override
+ public Iterator<String> iterator() {
+ try {
+ return getIds();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ //--------------------------< internal >------------------------------------
+
private void addToBatch(String id) throws IOException {
inMemBatch.add(id);
if (inMemBatch.size() >= BATCH_SIZE) {
Modified: jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java (original)
+++ jackrabbit/oak/branches/1.2/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java Wed Jul 1 13:37:35 2015
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("1.0")
+@Version("1.1")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.commons.sort;
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java Wed Jul 1 13:37:35 2015
@@ -70,6 +70,20 @@ public abstract class Collection<T exten
}
};
+ /**
+ * The 'journal' collection contains documents with consolidated
+ * diffs for changes performed by a cluster node between two background
+ * updates.
+ */
+ public static final Collection<JournalEntry> JOURNAL =
+ new Collection<JournalEntry>("journal") {
+ @Nonnull
+ @Override
+ public JournalEntry newDocument(DocumentStore store) {
+ return new JournalEntry(store);
+ }
+ };
+
private final String name;
public Collection(String name) {
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java Wed Jul 1 13:37:35 2015
@@ -40,7 +40,9 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
+import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD;
@@ -52,7 +54,7 @@ public class Commit {
private static final Logger LOG = LoggerFactory.getLogger(Commit.class);
- private final DocumentNodeStore nodeStore;
+ protected final DocumentNodeStore nodeStore;
private final DocumentNodeStoreBranch branch;
private final Revision baseRevision;
private final Revision revision;
@@ -128,6 +130,15 @@ public class Commit {
return baseRevision;
}
+ /**
+ * @return all modified paths, including ancestors without explicit
+ * modifications.
+ */
+ @Nonnull
+ Iterable<String> getModifiedPaths() {
+ return modifiedNodes;
+ }
+
void addNodeDiff(DocumentNodeState n) {
diff.tag('+').key(n.getPath());
diff.object();
@@ -275,7 +286,7 @@ public class Commit {
// so that all operations can be rolled back if there is a conflict
ArrayList<UpdateOp> opLog = new ArrayList<UpdateOp>();
- //Compute the commit root
+ // Compute the commit root
for (String p : operations.keySet()) {
markChanged(p);
if (commitRootPath == null) {
@@ -289,6 +300,16 @@ public class Commit {
}
}
}
+
+ // push branch changes to journal
+ if (baseBranchRevision != null) {
+ // store as external change
+ JournalEntry doc = JOURNAL.newDocument(store);
+ doc.modified(modifiedNodes);
+ Revision r = revision.asBranchRevision();
+ store.create(JOURNAL, singletonList(doc.asUpdateOp(r)));
+ }
+
int commitRootDepth = PathUtils.getDepth(commitRootPath);
// check if there are real changes on the commit root
boolean commitRootHasChanges = operations.containsKey(commitRootPath);
@@ -574,7 +595,7 @@ public class Commit {
}
list.add(p);
}
- DiffCache.Entry cacheEntry = nodeStore.getDiffCache().newEntry(before, revision);
+ DiffCache.Entry cacheEntry = nodeStore.getDiffCache().newEntry(before, revision, true);
LastRevTracker tracker = nodeStore.createTracker(revision, isBranchCommit);
List<String> added = new ArrayList<String>();
List<String> removed = new ArrayList<String>();
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java Wed Jul 1 13:37:35 2015
@@ -56,11 +56,14 @@ public interface DiffCache {
*
* @param from the from revision.
* @param to the to revision.
+ * @param local true indicates that the entry results from a local change,
+ * false if it results from an external change
* @return the cache entry.
*/
@Nonnull
Entry newEntry(@Nonnull Revision from,
- @Nonnull Revision to);
+ @Nonnull Revision to,
+ boolean local);
/**
* @return the statistics for this cache.
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java Wed Jul 1 13:37:35 2015
@@ -396,10 +396,9 @@ public class DocumentNodeState extends A
@Override
public String toString() {
StringBuilder buff = new StringBuilder();
- buff.append("path: ").append(path).append('\n');
- buff.append("rev: ").append(rev).append('\n');
- buff.append(properties);
- buff.append('\n');
+ buff.append("{ path: '").append(path).append("', ");
+ buff.append("rev: '").append(rev).append("', ");
+ buff.append("properties: '").append(properties.values()).append("' }");
return buff.toString();
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Jul 1 13:37:35 2015
@@ -21,14 +21,19 @@ import static com.google.common.base.Pre
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
+import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.MANY_CHILDREN_THRESHOLD;
+import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.asStringValueIterable;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.pathToId;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.unshareString;
import java.io.Closeable;
@@ -84,6 +89,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.commons.json.JsopStream;
import org.apache.jackrabbit.oak.commons.json.JsopWriter;
+import org.apache.jackrabbit.oak.commons.sort.StringSort;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -246,6 +252,12 @@ public final class DocumentNodeStore
private final Map<String, String> splitCandidates = Maps.newConcurrentMap();
/**
+ * Summary of changes done by this cluster node to persist by the background
+ * update thread.
+ */
+ private JournalEntry changes;
+
+ /**
* The last known revision for each cluster instance.
*
* Key: the machine id, value: revision.
@@ -359,6 +371,8 @@ public final class DocumentNodeStore
private final VersionGarbageCollector versionGarbageCollector;
+ private final JournalGarbageCollector journalGarbageCollector;
+
private final Executor executor;
private final LastRevRecoveryAgent lastRevRecoveryAgent;
@@ -382,6 +396,7 @@ public final class DocumentNodeStore
s = new LoggingDocumentStoreWrapper(s);
}
this.store = s;
+ this.changes = Collection.JOURNAL.newDocument(s);
this.executor = builder.getExecutor();
this.clock = builder.getClock();
int cid = builder.getClusterId();
@@ -401,6 +416,7 @@ public final class DocumentNodeStore
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(
this, builder.createVersionGCSupport());
+ this.journalGarbageCollector = new JournalGarbageCollector(this);
this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this);
this.disableBranches = builder.isDisableBranches();
this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
@@ -428,7 +444,8 @@ public final class DocumentNodeStore
checkpoints = new Checkpoints(this);
// check if root node exists
- if (store.find(Collection.NODES, Utils.getIdFromPath("/")) == null) {
+ NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath("/"));
+ if (rootDoc == null) {
// root node is missing: repository is not initialized
Revision head = newRevision();
Commit commit = new Commit(this, head, null, null);
@@ -449,6 +466,11 @@ public final class DocumentNodeStore
// no revision read from other cluster nodes
setHeadRevision(newRevision());
}
+ // check if _lastRev for our clusterId exists
+ if (!rootDoc.getLastRev().containsKey(clusterId)) {
+ unsavedLastRevisions.put("/", headRevision);
+ backgroundWrite();
+ }
}
getRevisionComparator().add(headRevision, Revision.newRevision(0));
@@ -642,6 +664,8 @@ public final class DocumentNodeStore
Revision before = getHeadRevision();
// apply changes to cache based on before revision
c.applyToCache(before, false);
+ // track modified paths
+ changes.modified(c.getModifiedPaths());
// update head revision
setHeadRevision(c.getRevision());
dispatcher.contentChanged(getRoot(), info);
@@ -1023,15 +1047,13 @@ public final class DocumentNodeStore
}
final Revision readRevision = parent.getLastRevision();
- return transform(getChildren(parent, name, limit).children,
- new Function<String, DocumentNodeState>() {
+ return transform(getChildren(parent, name, limit).children, new Function<String, DocumentNodeState>() {
@Override
public DocumentNodeState apply(String input) {
String p = concat(parent.getPath(), input);
DocumentNodeState result = getNode(p, readRevision);
if (result == null) {
- throw new DocumentStoreException("DocumentNodeState is null for revision " + readRevision + " of " + p
- + " (aborting getChildNodes())");
+ throw new DocumentStoreException("DocumentNodeState is null for revision " + readRevision + " of " + p + " (aborting getChildNodes())");
}
return result;
}
@@ -1050,10 +1072,8 @@ public final class DocumentNodeStore
path, readRevision);
return null;
}
- final DocumentNodeState result = doc.getNodeAtRevision(this,
- readRevision, lastRevision);
- PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path,
- readRevision);
+ final DocumentNodeState result = doc.getNodeAtRevision(this, readRevision, lastRevision);
+ PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path, readRevision);
return result;
}
@@ -1068,10 +1088,10 @@ public final class DocumentNodeStore
* @param changed the list of changed child nodes.
*
*/
- public void applyChanges(Revision rev, String path,
- boolean isNew, List<String> added,
- List<String> removed, List<String> changed,
- DiffCache.Entry cacheEntry) {
+ void applyChanges(Revision rev, String path,
+ boolean isNew, List<String> added,
+ List<String> removed, List<String> changed,
+ DiffCache.Entry cacheEntry) {
if (isNew && !added.isEmpty()) {
DocumentNodeState.Children c = new DocumentNodeState.Children();
Set<String> set = Sets.newTreeSet();
@@ -1086,13 +1106,13 @@ public final class DocumentNodeStore
// update diff cache
JsopWriter w = new JsopStream();
for (String p : added) {
- w.tag('+').key(PathUtils.getName(p)).object().endObject().newline();
+ w.tag('+').key(PathUtils.getName(p)).object().endObject();
}
for (String p : removed) {
- w.tag('-').value(PathUtils.getName(p)).newline();
+ w.tag('-').value(PathUtils.getName(p));
}
for (String p : changed) {
- w.tag('^').key(PathUtils.getName(p)).object().endObject().newline();
+ w.tag('^').key(PathUtils.getName(p)).object().endObject();
}
cacheEntry.append(path, w.toString());
@@ -1133,6 +1153,15 @@ public final class DocumentNodeStore
}
/**
+ * Called when a branch is merged.
+ *
+ * @param revisions the revisions of the merged branch commits.
+ */
+ void revisionsMerged(@Nonnull Iterable<Revision> revisions) {
+ changes.branchCommit(revisions);
+ }
+
+ /**
* Updates a commit root document.
*
* @param commit the updates to apply on the commit root document.
@@ -1308,6 +1337,7 @@ public final class DocumentNodeStore
UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
NodeDocument.setModified(op, commit.getRevision());
if (b != null) {
+ commit.addBranchCommits(b);
Iterator<Revision> mergeCommits = commit.getMergeRevisions().iterator();
for (Revision rev : b.getCommits()) {
rev = rev.asTrunkRevision();
@@ -1731,6 +1761,8 @@ public final class DocumentNodeStore
// then we saw this new revision (from another cluster node)
Revision otherSeen = Revision.newRevision(0);
+ StringSort externalSort = JournalEntry.newSorter();
+
Map<Revision, Revision> externalChanges = Maps.newHashMap();
for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
int machineId = e.getKey();
@@ -1751,6 +1783,16 @@ public final class DocumentNodeStore
|| r.getTimestamp() > revisionPurgeMillis()) {
externalChanges.put(r, otherSeen);
}
+ // collect external changes
+ if (last != null && externalSort != null) {
+ // add changes for this particular clusterId to the externalSort
+ try {
+ fillExternalChanges(externalSort, last, r, store);
+ } catch (IOException e1) {
+ LOG.error("backgroundRead: Exception while reading external changes from journal: "+e1, e1);
+ externalSort = null;
+ }
+ }
}
}
@@ -1759,9 +1801,38 @@ public final class DocumentNodeStore
if (!externalChanges.isEmpty()) {
// invalidate caches
- stats.cacheStats = store.invalidateCache();
- // TODO only invalidate affected items
- docChildrenCache.invalidateAll();
+ if (externalSort == null) {
+ // if no externalSort available, then invalidate the classic way: everything
+ stats.cacheStats = store.invalidateCache();
+ docChildrenCache.invalidateAll();
+ } else {
+ try {
+ externalSort.sort();
+ stats.cacheStats = store.invalidateCache(pathToId(externalSort));
+ // OAK-3002: only invalidate affected items (using journal)
+ long origSize = docChildrenCache.size();
+ if (origSize == 0) {
+ // if docChildrenCache is empty, don't bother
+ // calling invalidateAll either way
+ // (esp calling invalidateAll(Iterable) will
+ // potentially iterate over all keys even though
+ // there's nothing to be deleted)
+ LOG.trace("backgroundRead: docChildrenCache nothing to invalidate");
+ } else {
+ // however, if the docChildrenCache is not empty,
+ // use the invalidateAll(Iterable) variant,
+ // passing it a Iterable<StringValue>, as that's
+ // what is contained in the cache
+ docChildrenCache.invalidateAll(asStringValueIterable(externalSort));
+ long newSize = docChildrenCache.size();
+ LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize);
+ }
+ } catch (Exception ioe) {
+ LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+ stats.cacheStats = store.invalidateCache();
+ docChildrenCache.invalidateAll();
+ }
+ }
stats.cacheInvalidationTime = clock.getTime() - time;
time = clock.getTime();
@@ -1770,7 +1841,6 @@ public final class DocumentNodeStore
backgroundOperationLock.writeLock().lock();
try {
stats.lock = clock.getTime() - time;
- time = clock.getTime();
// the latest revisions of the current cluster node
// happened before the latest revisions of other cluster nodes
@@ -1779,9 +1849,24 @@ public final class DocumentNodeStore
for (Map.Entry<Revision, Revision> e : externalChanges.entrySet()) {
revisionComparator.add(e.getKey(), e.getValue());
}
+
+ Revision oldHead = headRevision;
// the new head revision is after other revisions
setHeadRevision(newRevision());
if (dispatchChange) {
+ time = clock.getTime();
+ if (externalSort != null) {
+ // then there were external changes and reading them
+ // was successful -> apply them to the diff cache
+ try {
+ JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
+ } catch (Exception e1) {
+ LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+ }
+ }
+ stats.populateDiffCache = clock.getTime() - time;
+ time = clock.getTime();
+
dispatcher.contentChanged(getRoot().fromExternalChange(), null);
}
} finally {
@@ -1800,6 +1885,7 @@ public final class DocumentNodeStore
CacheInvalidationStats cacheStats;
long readHead;
long cacheInvalidationTime;
+ long populateDiffCache;
long lock;
long dispatchChanges;
long purge;
@@ -1814,6 +1900,7 @@ public final class DocumentNodeStore
"cacheStats:" + cacheStatsMsg +
", head:" + readHead +
", cache:" + cacheInvalidationTime +
+ ", diff: " + populateDiffCache +
", lock:" + lock +
", dispatch:" + dispatchChanges +
", purge:" + purge +
@@ -1898,7 +1985,15 @@ public final class DocumentNodeStore
}
BackgroundWriteStats backgroundWrite() {
- return unsavedLastRevisions.persist(this, backgroundOperationLock.writeLock());
+ return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() {
+ @Override
+ public void acquiring() {
+ if (store.create(JOURNAL,
+ singletonList(changes.asUpdateOp(getHeadRevision())))) {
+ changes = JOURNAL.newDocument(getDocumentStore());
+ }
+ }
+ }, backgroundOperationLock.writeLock());
}
//-----------------------------< internal >---------------------------------
@@ -1981,19 +2076,23 @@ public final class DocumentNodeStore
case '^': {
String name = unshareString(t.readString());
t.read(':');
- if (t.matches('{')) {
- t.read('}');
- continueComparison = diff.childNodeChanged(name,
- base.getChildNode(name),
- node.getChildNode(name));
- } else if (t.matches('[')) {
- // ignore multi valued property
- while (t.read() != ']') {
- // skip values
+ t.read('{');
+ t.read('}');
+ NodeState baseChild = base.getChildNode(name);
+ NodeState nodeChild = node.getChildNode(name);
+ if (baseChild.exists()) {
+ if (nodeChild.exists()) {
+ continueComparison = diff.childNodeChanged(name,
+ baseChild, nodeChild);
+ } else {
+ continueComparison = diff.childNodeDeleted(name,
+ baseChild);
}
} else {
- // ignore single valued property
- t.read();
+ if (nodeChild.exists()) {
+ continueComparison = diff.childNodeAdded(name,
+ nodeChild);
+ }
}
break;
}
@@ -2108,13 +2207,14 @@ public final class DocumentNodeStore
}
}
+ String diff = w.toString();
if (debug) {
long end = now();
- LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms)",
+ LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms), diff '{}'",
diffAlgo, from.getPath(), fromRev, toRev,
- end - start, getChildrenDoneIn - start);
+ end - start, getChildrenDoneIn - start, diff);
}
- return w.toString();
+ return diff;
}
private void diffManyChildren(JsopWriter w, String path, Revision fromRev, Revision toRev) {
@@ -2162,17 +2262,17 @@ public final class DocumentNodeStore
if (a == null && b == null) {
// ok
} else if (a == null || b == null || !a.equals(b)) {
- w.tag('^').key(name).object().endObject().newline();
+ w.tag('^').key(name).object().endObject();
}
} else {
// does not exist in toRev -> was removed
- w.tag('-').value(name).newline();
+ w.tag('-').value(name);
}
} else {
// does not exist in fromRev
if (toNode != null) {
// exists in toRev
- w.tag('+').key(name).object().endObject().newline();
+ w.tag('+').key(name).object().endObject();
} else {
// does not exist in either revisions
// -> do nothing
@@ -2199,7 +2299,7 @@ public final class DocumentNodeStore
Set<String> childrenSet = Sets.newHashSet(toChildren.children);
for (String n : fromChildren.children) {
if (!childrenSet.contains(n)) {
- w.tag('-').value(n).newline();
+ w.tag('-').value(n);
} else {
String path = concat(parentPath, n);
DocumentNodeState n1 = getNode(path, fromRev);
@@ -2211,14 +2311,14 @@ public final class DocumentNodeStore
checkNotNull(n1, "Node at [%s] not found for fromRev [%s]", path, fromRev);
checkNotNull(n2, "Node at [%s] not found for toRev [%s]", path, toRev);
if (!n1.getLastRevision().equals(n2.getLastRevision())) {
- w.tag('^').key(n).object().endObject().newline();
+ w.tag('^').key(n).object().endObject();
}
}
}
childrenSet = Sets.newHashSet(fromChildren.children);
for (String n : toChildren.children) {
if (!childrenSet.contains(n)) {
- w.tag('+').key(n).object().endObject().newline();
+ w.tag('+').key(n).object().endObject();
}
}
}
@@ -2518,6 +2618,12 @@ public final class DocumentNodeStore
public VersionGarbageCollector getVersionGarbageCollector() {
return versionGarbageCollector;
}
+
+ @Nonnull
+ public JournalGarbageCollector getJournalGarbageCollector() {
+ return journalGarbageCollector;
+ }
+
@Nonnull
public LastRevRecoveryAgent getLastRevRecoveryAgent() {
return lastRevRecoveryAgent;
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Wed Jul 1 13:37:35 2015
@@ -27,6 +27,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_NODE_CACHE_PERCENTAGE;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -44,6 +45,7 @@ import com.mongodb.DB;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientURI;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -209,6 +211,23 @@ public class DocumentNodeStoreService {
)
public static final String CUSTOM_BLOB_STORE = "customBlobStore";
+ private static final long DEFAULT_JOURNAL_GC_INTERVAL_MILLIS = 5*60*1000; // default is 5min
+ @Property(longValue = DEFAULT_JOURNAL_GC_INTERVAL_MILLIS,
+ label = "Journal Garbage Collection Interval (millis)",
+ description = "Long value indicating interval (in milliseconds) with which the "
+ + "journal (for external changes) is cleaned up. Default is " + DEFAULT_JOURNAL_GC_INTERVAL_MILLIS
+ )
+ private static final String PROP_JOURNAL_GC_INTERVAL_MILLIS = "journalGCInterval";
+
+ private static final long DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS = 6*60*60*1000; // default is 6hours
+ @Property(longValue = DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS,
+ label = "Maximum Age of Journal Entries (millis)",
+ description = "Long value indicating max age (in milliseconds) that "
+ + "journal (for external changes) entries are kept (older ones are candidates for gc). "
+ + "Default is " + DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS
+ )
+ private static final String PROP_JOURNAL_GC_MAX_AGE_MILLIS = "journalGCMaxAge";
+
private static final long MB = 1024 * 1024;
private static enum DocumentStoreType {
@@ -417,6 +436,7 @@ public class DocumentNodeStoreService {
registerJMXBeans(mk.getNodeStore());
registerLastRevRecoveryJob(mk.getNodeStore());
+ registerJournalGC(mk.getNodeStore());
NodeStore store;
DocumentNodeStore mns = mk.getNodeStore();
@@ -599,6 +619,23 @@ public class DocumentNodeStoreService {
recoverJob, TimeUnit.MILLISECONDS.toSeconds(leaseTime)));
}
+ private void registerJournalGC(final DocumentNodeStore nodeStore) {
+ long journalGCInterval = toLong(context.getProperties().get(PROP_JOURNAL_GC_INTERVAL_MILLIS),
+ DEFAULT_JOURNAL_GC_INTERVAL_MILLIS);
+ final long journalGCMaxAge = toLong(context.getProperties().get(PROP_JOURNAL_GC_MAX_AGE_MILLIS),
+ DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS);
+ Runnable journalGCJob = new Runnable() {
+
+ @Override
+ public void run() {
+ nodeStore.getJournalGarbageCollector().gc(journalGCMaxAge, TimeUnit.MILLISECONDS);
+ }
+
+ };
+ registrations.add(WhiteboardUtils.scheduleWithFixedDelay(whiteboard,
+ journalGCJob, TimeUnit.MILLISECONDS.toSeconds(journalGCInterval), true/*runOnSingleClusterNode*/));
+ }
+
private Object prop(String propName) {
return prop(propName, PREFIX + propName);
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java Wed Jul 1 13:37:35 2015
@@ -233,6 +233,13 @@ public interface DocumentStore {
CacheInvalidationStats invalidateCache();
/**
+ * Invalidate the document cache but only with entries that match one
+ * of the keys provided.
+ */
+ @CheckForNull
+ CacheInvalidationStats invalidateCache(Iterable<String> keys);
+
+ /**
* Invalidate the document cache for the given key.
*
* @param <T> the document type
Copied: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java&r1=1684820&r2=1688649&rev=1688649&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Wed Jul 1 13:37:35 2015
@@ -16,8 +16,8 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
-
import java.io.IOException;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -27,6 +27,7 @@ import java.util.Set;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,27 +47,6 @@ import static org.apache.jackrabbit.oak.
/**
* Keeps track of changes performed between two consecutive background updates.
- *
- * Done:
- * Query external changes in chunks.
- * {@link #getChanges(Revision, Revision, DocumentStore)} current reads
- * all JournalEntry documents in one go with a limit of Integer.MAX_VALUE.
- * Done:
- * Use external sort when changes are applied to diffCache. See usage of
- * {@link #applyTo(DiffCache, Revision, Revision)} in
- * {@link DocumentNodeStore#backgroundRead(boolean)}.
- * The utility {@link StringSort} can be used for this purpose.
- * Done:
- * Push changes to {@link MemoryDiffCache} instead of {@link LocalDiffCache}.
- * See {@link TieredDiffCache#newEntry(Revision, Revision)}. Maybe a new
- * method is needed for this purpose?
- * Done (incl junit)
- * Create JournalEntry for external changes related to _lastRev recovery.
- * See {@link LastRevRecoveryAgent#recover(Iterator, int, boolean)}.
- * Done (incl junit)
- * Cleanup old journal entries in the document store.
- * Done:
- * integrate the JournalGarbageCollector similarly to the VersionGarbageCollector
*/
public final class JournalEntry extends Document {
@@ -85,10 +65,13 @@ public final class JournalEntry extends
private static final String BRANCH_COMMITS = "_bc";
- private static final int READ_CHUNK_SIZE = 1024;
-
- private static final int STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD = 1024 * 1024; // switch to disk after 1MB
-
+ private static final int READ_CHUNK_SIZE = 100;
+
+ /**
+ * switch to disk after 1MB
+ */
+ private static final int STRING_SORT_OVERFLOW_TO_DISK_THRESHOLD = 1024 * 1024;
+
private final DocumentStore store;
private volatile TreeNode changes = null;
@@ -96,143 +79,147 @@ public final class JournalEntry extends
JournalEntry(DocumentStore store) {
this.store = store;
}
-
+
static StringSort newSorter() {
- return new StringSort(STRINGSORT_OVERFLOW_TO_DISK_THRESHOLD, new Comparator<String>() {
+ return new StringSort(STRING_SORT_OVERFLOW_TO_DISK_THRESHOLD, new Comparator<String>() {
@Override
public int compare(String arg0, String arg1) {
return arg0.compareTo(arg1);
}
});
}
-
+
static void applyTo(@Nonnull StringSort externalSort,
- @Nonnull DiffCache diffCache,
- @Nonnull Revision from,
- @Nonnull Revision to) throws IOException {
+ @Nonnull DiffCache diffCache,
+ @Nonnull Revision from,
+ @Nonnull Revision to) throws IOException {
LOG.debug("applyTo: starting for {} to {}", from, to);
- externalSort.sort();
- // note that it is not deduplicated yet
- LOG.debug("applyTo: sorting done.");
-
- final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
-
- final Iterator<String> it = externalSort.getIds();
- if (!it.hasNext()) {
- // nothing at all? that's quite unusual..
-
- // we apply this diff as one '/' to the entry then
- entry.append("/", "");
- entry.done();
- return;
- }
- String previousPath = it.next();
- TreeNode node = new TreeNode(null, "");
- node = node.getOrCreatePath(previousPath);
- int totalCnt = 0;
- int deDuplicatedCnt = 0;
- while(it.hasNext()) {
- totalCnt++;
- final String currentPath = it.next();
- if (previousPath.equals(currentPath)) {
- // de-duplication
- continue;
- }
-
- // 'node' contains one hierarchy line, eg /a, /a/b, /a/b/c, /a/b/c/d
- // including the children on each level.
- // these children have not yet been appended to the diffCache entry
- // and have to be added as soon as the 'currentPath' is not
- // part of that hierarchy anymore and we 'move elsewhere'.
- // eg if 'currentPath' is /a/b/e, then we must flush /a/b/c/d and /a/b/c
- while(node!=null && !node.isParentOf(currentPath)) {
- // add parent to the diff entry
- entry.append(node.getPath(), getChanges(node));
- deDuplicatedCnt++;
- node = node.parent;
- }
-
- if (node==null) {
- // we should never go 'passed' the root, hence node should
- // never be null - if it becomes null anyway, start with
- // a fresh root:
- node = new TreeNode(null, "");
- node = node.getOrCreatePath(currentPath);
- } else {
- // this is the normal route: we add a direct or grand-child
- // node to the current node:
- node = node.getOrCreatePath(currentPath);
- }
- previousPath = currentPath;
- }
-
- // once we're done we still have the last hierarchy line contained in 'node',
- // eg /x, /x/y, /x/y/z
- // and that one we must now append to the diffcache entry:
- while(node!=null) {
+ // note that it is not de-duplicated yet
+ LOG.debug("applyTo: sorting done.");
+
+ final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false);
+
+ final Iterator<String> it = externalSort.getIds();
+ if (!it.hasNext()) {
+ // nothing at all? that's quite unusual..
+
+ // we apply this diff as one '/' to the entry then
+ entry.append("/", "");
+ entry.done();
+ return;
+ }
+ String previousPath = it.next();
+ TreeNode node = new TreeNode();
+ node = node.getOrCreatePath(previousPath);
+ int totalCnt = 0;
+ int deDuplicatedCnt = 0;
+ while (it.hasNext()) {
+ totalCnt++;
+ final String currentPath = it.next();
+ if (previousPath.equals(currentPath)) {
+ // de-duplication
+ continue;
+ }
+ final TreeNode currentNode = node.getOrCreatePath(currentPath);
+
+ // 'node' contains one hierarchy line, eg /a, /a/b, /a/b/c, /a/b/c/d
+ // including the children on each level.
+ // these children have not yet been appended to the diffCache entry
+ // and have to be added as soon as the 'currentPath' is not
+ // part of that hierarchy anymore and we 'move elsewhere'.
+ // eg if 'currentPath' is /a/b/e, then we must flush /a/b/c/d and /a/b/c
+ while (node != null && !node.isAncestorOf(currentNode)) {
+ // add parent to the diff entry
+ entry.append(node.getPath(), getChanges(node));
+ deDuplicatedCnt++;
+ node = node.parent;
+ }
+
+ if (node == null) {
+ // we should never go 'passed' the root, hence node should
+ // never be null - if it becomes null anyway, start with
+ // a fresh root:
+ node = new TreeNode();
+ node = node.getOrCreatePath(currentPath);
+ } else {
+ // this is the normal route: we add a direct or grand-child
+ // node to the current node:
+ node = currentNode;
+ }
+ previousPath = currentPath;
+ }
+
+ // once we're done we still have the last hierarchy line contained in 'node',
+ // eg /x, /x/y, /x/y/z
+ // and that one we must now append to the diff cache entry:
+ while (node != null) {
entry.append(node.getPath(), getChanges(node));
- deDuplicatedCnt++;
- node = node.parent;
- }
-
- // and finally: mark the diffcache entry as 'done':
+ deDuplicatedCnt++;
+ node = node.parent;
+ }
+
+ // and finally: mark the diff cache entry as 'done':
entry.done();
LOG.debug("applyTo: done. totalCnt: {}, deDuplicatedCnt: {}", totalCnt, deDuplicatedCnt);
}
-
+
/**
- * Reads all external changes between the two given revisions (with the same clusterId)
- * from the journal and appends the paths therein to the provided sorter.
+ * Reads all external changes between the two given revisions (with the same
+ * clusterId) from the journal and appends the paths therein to the provided
+ * sorter.
*
- * @param sorter the StringSort to which all externally changed paths between
- * the provided revisions will be added
- * @param from the lower bound of the revision range (exclusive).
- * @param to the upper bound of the revision range (inclusive).
- * @param store the document store to query.
- * @throws IOException
+ * @param sorter the StringSort to which all externally changed paths
+ * between the provided revisions will be added
+ * @param from the lower bound of the revision range (exclusive).
+ * @param to the upper bound of the revision range (inclusive).
+ * @param store the document store to query.
+ * @throws IOException
*/
static void fillExternalChanges(@Nonnull StringSort sorter,
@Nonnull Revision from,
@Nonnull Revision to,
- @Nonnull DocumentStore store) throws IOException {
+ @Nonnull DocumentStore store)
+ throws IOException {
checkArgument(checkNotNull(from).getClusterId() == checkNotNull(to).getClusterId());
-
+
// to is inclusive, but DocumentStore.query() toKey is exclusive
final String inclusiveToId = asId(to);
to = new Revision(to.getTimestamp(), to.getCounter() + 1,
to.getClusterId(), to.isBranch());
- // read in chunks to support very large sets of changes between subsequent background reads
- // to do this, provide a (TODO eventually configurable) limit for the number of entries to be returned per query
- // if the number of elements returned by the query is exactly the provided limit, then
- // loop and do subsequent queries
+ // read in chunks to support very large sets of changes between
+ // subsequent background reads to do this, provide a (TODO eventually configurable)
+ // limit for the number of entries to be returned per query if the
+ // number of elements returned by the query is exactly the provided
+ // limit, then loop and do subsequent queries
final String toId = asId(to);
String fromId = asId(from);
- while(true) {
- if (fromId.equals(inclusiveToId)) {
- // avoid query if from and to are off by just 1 counter (which we do due to exclusiveness of query borders)
- // as in this case the query will always be empty anyway - so avoid doing the query in the first place
- break;
- }
- List<JournalEntry> partialResult = store.query(JOURNAL, fromId, toId, READ_CHUNK_SIZE);
- if (partialResult==null) {
- break;
- }
- for(JournalEntry d: partialResult) {
- d.addTo(sorter);
- }
- if (partialResult.size()<READ_CHUNK_SIZE) {
- break;
- }
- // otherwise set 'fromId' to the last entry just processed
- // that works fine as the query is non-inclusive (ie does not include the from which we'd otherwise double-process)
- fromId = partialResult.get(partialResult.size()-1).getId();
+ while (true) {
+ if (fromId.equals(inclusiveToId)) {
+ // avoid query if from and to are off by just 1 counter (which
+ // we do due to exclusiveness of query borders) as in this case
+ // the query will always be empty anyway - so avoid doing the
+ // query in the first place
+ break;
+ }
+ List<JournalEntry> partialResult = store.query(JOURNAL, fromId, toId, READ_CHUNK_SIZE);
+
+ for (JournalEntry d : partialResult) {
+ d.addTo(sorter);
+ }
+ if (partialResult.size() < READ_CHUNK_SIZE) {
+ break;
+ }
+ // otherwise set 'fromId' to the last entry just processed
+ // that works fine as the query is non-inclusive (ie does not
+ // include the from which we'd otherwise double-process)
+ fromId = partialResult.get(partialResult.size() - 1).getId();
}
}
long getRevisionTimestamp() {
- final String[] parts = getId().split("_");
- return Long.parseLong(parts[1]);
+ final String[] parts = getId().split("-");
+ return Long.parseLong(parts[1], 16);
}
void modified(String path) {
@@ -281,17 +268,17 @@ public final class JournalEntry extends
}
return op;
}
-
+
void addTo(final StringSort sort) throws IOException {
TreeNode n = getChanges();
TraversingVisitor v = new TraversingVisitor() {
-
+
@Override
public void node(TreeNode node, String path) throws IOException {
sort.add(path);
}
};
- n.accept(v, "/");
+ n.accept(v, "/");
for (JournalEntry e : getBranchCommits()) {
e.getChanges().accept(v, "/");
}
@@ -304,20 +291,36 @@ public final class JournalEntry extends
*/
@Nonnull
Iterable<JournalEntry> getBranchCommits() {
- List<JournalEntry> commits = Lists.newArrayList();
+ final List<String> ids = Lists.newArrayList();
String bc = (String) get(BRANCH_COMMITS);
if (bc != null) {
for (String id : bc.split(",")) {
- JournalEntry d = store.find(JOURNAL, id);
- if (d == null) {
- throw new IllegalStateException(
- "Missing external change for branch revision: " + id);
- }
- //TODO: could this also be a problem with very large number of branches ???
- commits.add(d);
+ ids.add(id);
}
}
- return commits;
+ return new Iterable<JournalEntry>() {
+ @Override
+ public Iterator<JournalEntry> iterator() {
+ return new AbstractIterator<JournalEntry>() {
+
+ private final Iterator<String> it = ids.iterator();
+
+ @Override
+ protected JournalEntry computeNext() {
+ if (!it.hasNext()) {
+ return endOfData();
+ }
+ String id = it.next();
+ JournalEntry d = store.find(JOURNAL, id);
+ if (d == null) {
+ throw new IllegalStateException(
+ "Missing external change for branch revision: " + id);
+ }
+ return d;
+ }
+ };
+ }
+ };
}
//-----------------------------< internal >---------------------------------
@@ -356,7 +359,7 @@ public final class JournalEntry extends
@Nonnull
private TreeNode getChanges() {
if (changes == null) {
- TreeNode node = new TreeNode(null, "");
+ TreeNode node = new TreeNode();
String c = (String) get(CHANGES);
if (c != null) {
node.parse(new JsopTokenizer(c));
@@ -368,66 +371,70 @@ public final class JournalEntry extends
private static final class TreeNode {
- private final Map<String, TreeNode> children = Maps.newHashMap();
+ private static final Map<String, TreeNode> NO_CHILDREN = Collections.emptyMap();
+
+ private Map<String, TreeNode> children = NO_CHILDREN;
- private final String path;
private final TreeNode parent;
-
+ private final String name;
+
+ TreeNode() {
+ this(null, "");
+ }
+
TreeNode(TreeNode parent, String name) {
- if (name.contains("/")) {
- throw new IllegalArgumentException("name must not contain /: "+name);
- }
+ checkArgument(!name.contains("/"),
+ "name must not contain '/': {}", name);
+
this.parent = parent;
- if (parent==null) {
- this.path = "/";
- } else if (parent.parent==null) {
- this.path = "/" + name;
- } else {
- this.path = parent.path + "/" + name;
- }
+ this.name = name;
}
-
- public TreeNode getOrCreatePath(String path) {
- if (path.equals(this.path)) {
- // then path denotes the same as myself, hence return myself
- return this;
- }
- if (!path.startsWith(this.path)) {
- // this must never happen
- throw new IllegalStateException("path not child of myself. path: "+path+", myself: "+this.path);
+
+ TreeNode getOrCreatePath(String path) {
+ TreeNode n = getRoot();
+ for (String name : PathUtils.elements(path)) {
+ n = n.getOrCreate(name);
}
- String sub = this.path.equals("/") ? path.substring(1) : path.substring(this.path.length()+1);
- String[] parts = sub.split("/");
- TreeNode n = this;
- for (int i = 0; i < parts.length; i++) {
- if (parts[i]!=null && parts[i].length()>0) {
- n = n.getOrCreate(parts[i]);
+ return n;
+ }
+
+ boolean isAncestorOf(TreeNode other) {
+ TreeNode n = other;
+ while (n.parent != null) {
+ if (this == n.parent) {
+ return true;
}
+ n = n.parent;
}
- return n;
+ return false;
}
- public boolean isParentOf(String path) {
- if (this.path.equals("/")) {
- // root is parent of everything
- return true;
- }
- if (!path.startsWith(this.path+"/")) {
- // then I'm not parent of that path
- return false;
- }
- final String sub = path.substring(this.path.length()+1);
- if (sub.indexOf("/", 1)!=-1) {
- // if the 'sub' part contains a / then
- // it is not a direct child of myself,
- // so I'm a grand-parent but not a direct-parent
- return false;
+ @Nonnull
+ private TreeNode getRoot() {
+ TreeNode n = this;
+ while (n.parent != null) {
+ n = n.parent;
}
- return true;
+ return n;
}
private String getPath() {
- return path;
+ return buildPath(new StringBuilder()).toString();
+ }
+
+ private StringBuilder buildPath(StringBuilder sb) {
+ if (parent != null) {
+ parent.buildPath(sb);
+ if (parent.parent != null) {
+ // only add slash if parent is not the root
+ sb.append("/");
+ }
+ } else {
+ // this is the root
+ sb.append("/");
+ }
+ sb.append(name);
+ return sb;
}
void parse(JsopReader reader) {
@@ -476,7 +483,11 @@ public final class JournalEntry extends
}
}
+ @Nonnull
private TreeNode getOrCreate(String name) {
+ if (children == NO_CHILDREN) {
+ children = Maps.newHashMap();
+ }
TreeNode c = children.get(name);
if (c == null) {
c = new TreeNode(this, name);
Copied: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java (from r1684820, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java?p2=jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java&r1=1684820&r2=1688649&rev=1688649&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java Wed Jul 1 13:37:35 2015
@@ -16,11 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,19 +28,19 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Stopwatch;
/**
- * The JournalGarbageCollector can clean up JournalEntries that are
- * older than a particular age.
- * <p>
+ * The JournalGarbageCollector can clean up JournalEntries that are older than a
+ * particular age.
+ * <p/>
* It would typically be invoked in conjunction with the VersionGarbageCollector
* but must not be confused with that one - 'journal' refers to the separate
- * collection that contains changed paths per background writes used for
+ * collection that contains changed paths per background writes used for
* observation.
*/
public class JournalGarbageCollector {
- //copied from VersionGarbageCollector:
+ //copied from VersionGarbageCollector:
private static final int DELETE_BATCH_SIZE = 450;
-
+
private final DocumentStore ds;
private static final Logger log = LoggerFactory.getLogger(JournalGarbageCollector.class);
@@ -52,9 +50,11 @@ public class JournalGarbageCollector {
}
/**
- * Deletes entries in the journal that are older than the given maxRevisionAge.
+ * Deletes entries in the journal that are older than the given
+ * maxRevisionAge.
+ *
* @param maxRevisionAge entries older than this age will be removed
- * @param unit the timeunit for maxRevisionAge
+ * @param unit the timeunit for maxRevisionAge
* @return the number of entries that have been removed
*/
public int gc(long maxRevisionAge, TimeUnit unit) {
@@ -63,7 +63,7 @@ public class JournalGarbageCollector {
log.debug("gc: Journal garbage collection starts with maxAge: {} min.", TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis));
}
Stopwatch sw = Stopwatch.createStarted();
-
+
// the journal has ids of the following format:
// 1-0000014db9aaf710-00000001
// whereas the first number is the cluster node id.
@@ -81,64 +81,61 @@ public class JournalGarbageCollector {
// if it's run on multiple concurrently, then they
// will compete at deletion, which is not optimal
// due to performance, but does not harm.
-
+
// 1. get the list of cluster node ids
final List<ClusterNodeInfoDocument> clusterNodeInfos = ClusterNodeInfoDocument.all(ds);
int numDeleted = 0;
- for (Iterator<ClusterNodeInfoDocument> it = clusterNodeInfos.iterator(); it
- .hasNext();) {
- // current algorithm is to simply look at all cluster nodes
- // irrespective of whether they are active or inactive etc.
- // this could be optimized for inactive ones: at some point, all
- // journal entries of inactive ones would have been cleaned up
- // and at that point we could stop including those long-time-inactive ones.
- // that 'long time' aspect would have to be tracked though, to be sure
- // we don't leave garbage.
- // so simpler is to quickly do a query even for long-time inactive ones
- final ClusterNodeInfoDocument clusterNodeInfoDocument = it.next();
- final int clusterNodeId = clusterNodeInfoDocument.getClusterId();
-
- // 2. iterate over that list and do a query with
- // a limit of 'batch size'
- boolean branch = false;
- long startPointer = 0;
- while(true) {
- String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch));
- String toKey = JournalEntry.asId(new Revision(
- System.currentTimeMillis() - maxRevisionAgeInMillis, Integer.MAX_VALUE, clusterNodeId, branch));
- int limit = DELETE_BATCH_SIZE;
- List<JournalEntry> deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, limit);
- if (deletionBatch.size()>0) {
- ds.remove(Collection.JOURNAL, asKeys(deletionBatch));
- numDeleted+=deletionBatch.size();
- }
- if (deletionBatch.size()<limit) {
- if (!branch) {
- // do the same for branches:
- // this will start at the beginning again with branch set to true
- // and eventually finish too
- startPointer = 0;
- branch = true;
- continue;
- }
- break;
- }
- startPointer = deletionBatch.get(deletionBatch.size()-1).getRevisionTimestamp();
- }
- }
-
+ for (ClusterNodeInfoDocument clusterNodeInfoDocument : clusterNodeInfos) {
+ // current algorithm is to simply look at all cluster nodes
+ // irrespective of whether they are active or inactive etc.
+ // this could be optimized for inactive ones: at some point, all
+ // journal entries of inactive ones would have been cleaned up
+ // and at that point we could stop including those long-time-inactive ones.
+ // that 'long time' aspect would have to be tracked though, to be sure
+ // we don't leave garbage.
+ // so simpler is to quickly do a query even for long-time inactive ones
+ final int clusterNodeId = clusterNodeInfoDocument.getClusterId();
+
+ // 2. iterate over that list and do a query with
+ // a limit of 'batch size'
+ boolean branch = false;
+ long startPointer = 0;
+ while (true) {
+ String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch));
+ String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis() - maxRevisionAgeInMillis, Integer.MAX_VALUE, clusterNodeId, branch));
+ int limit = DELETE_BATCH_SIZE;
+ List<JournalEntry> deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, limit);
+ if (deletionBatch.size() > 0) {
+ ds.remove(Collection.JOURNAL, asKeys(deletionBatch));
+ numDeleted += deletionBatch.size();
+ }
+ if (deletionBatch.size() < limit) {
+ if (!branch) {
+ // do the same for branches:
+ // this will start at the beginning again with branch set to true
+ // and eventually finish too
+ startPointer = 0;
+ branch = true;
+ continue;
+ }
+ break;
+ }
+ startPointer = deletionBatch.get(deletionBatch.size() - 1).getRevisionTimestamp();
+ }
+ }
+
sw.stop();
-
+
log.info("gc: Journal garbage collection took {}, deleted {} entries that were older than {} min.", sw, numDeleted, TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis));
return numDeleted;
}
- private List<String> asKeys(List<JournalEntry> deletionBatch) {
- final List<String> keys = new ArrayList<String>(deletionBatch.size());
- for (JournalEntry e: deletionBatch) {
- keys.add(e.getId());
- }
- return keys;
- }
+ private List<String> asKeys(List<JournalEntry> deletionBatch) {
+ final List<String> keys = new ArrayList<String>(deletionBatch.size());
+ for (JournalEntry e : deletionBatch) {
+ keys.add(e.getId());
+ }
+ return keys;
+ }
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Jul 1 13:37:35 2015
@@ -22,6 +22,9 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.mergeSorted;
+import static java.util.Collections.singletonList;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
+import static org.apache.jackrabbit.oak.plugins.document.UnsavedModifications.Snapshot.IGNORE;
import java.util.Iterator;
import java.util.List;
@@ -137,6 +140,8 @@ public class LastRevRecoveryAgent {
//Map of known last rev of checked paths
Map<String, Revision> knownLastRevs = MapFactory.getInstance().create();
+ final DocumentStore docStore = nodeStore.getDocumentStore();
+ final JournalEntry changes = JOURNAL.newDocument(docStore);
long count = 0;
while (suspects.hasNext()) {
@@ -165,6 +170,7 @@ public class LastRevRecoveryAgent {
//2. Update lastRev for parent paths aka rollup
if (lastRevForParents != null) {
String path = doc.getPath();
+ changes.modified(path); // track all changes
while (true) {
if (PathUtils.denotesRoot(path)) {
break;
@@ -187,6 +193,9 @@ public class LastRevRecoveryAgent {
unsaved.put(parentPath, calcLastRev);
}
}
+
+ // take the root's lastRev
+ final Revision lastRootRev = unsaved.get("/");
//Note the size before persist as persist operation
//would empty the internal state
@@ -200,7 +209,41 @@ public class LastRevRecoveryAgent {
//UnsavedModifications is designed to be used in concurrent
//access mode. For recovery case there is no concurrent access
//involve so just pass a new lock instance
- unsaved.persist(nodeStore, new ReentrantLock());
+
+ // the lock uses to do the persisting is a plain reentrant lock
+ // thus it doesn't matter, where exactly the check is done
+ // as to whether the recovered lastRev has already been
+ // written to the journal.
+ unsaved.persist(nodeStore, new UnsavedModifications.Snapshot() {
+
+ @Override
+ public void acquiring() {
+ if (lastRootRev == null) {
+ // this should never happen - when unsaved has no changes
+ // that is reflected in the 'map' to be empty - in that
+ // case 'persist()' quits early and never calls
+ // acquiring() here.
+ //
+ // but even if it would occur - if we have no lastRootRev
+ // then we cannot and probably don't have to persist anything
+ return;
+ }
+
+ final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point
+ final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id);
+ if (existingEntry != null) {
+ // then the journal entry was already written - as can happen if
+ // someone else (or the original instance itself) wrote the
+ // journal entry, then died.
+ // in this case, don't write it again.
+ // hence: nothing to be done here. return.
+ return;
+ }
+
+ // otherwise store a new journal entry now
+ docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev)));
+ }
+ }, new ReentrantLock());
log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
"cluster node [{}]: {}", size, clusterId, updates);
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java Wed Jul 1 13:37:35 2015
@@ -73,7 +73,8 @@ public class LocalDiffCache implements D
@Nonnull
@Override
public Entry newEntry(final @Nonnull Revision from,
- final @Nonnull Revision to) {
+ final @Nonnull Revision to,
+ boolean local /*ignored*/) {
return new Entry() {
private final Map<String, String> changesPerPath = Maps.newHashMap();
private int size;
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java Wed Jul 1 13:37:35 2015
@@ -80,7 +80,8 @@ public class MemoryDiffCache implements
@Nonnull
@Override
public Entry newEntry(@Nonnull Revision from,
- @Nonnull Revision to) {
+ @Nonnull Revision to,
+ boolean local /*ignored*/) {
return new MemoryEntry(from, to);
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java Wed Jul 1 13:37:35 2015
@@ -16,8 +16,13 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import java.util.Set;
import java.util.SortedSet;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.Sets;
+
/**
* A merge commit containing multiple commit revisions. One for each branch
* commit to merge.
@@ -25,6 +30,7 @@ import java.util.SortedSet;
class MergeCommit extends Commit {
private final SortedSet<Revision> mergeRevs;
+ private final Set<Revision> branchCommits = Sets.newHashSet();
MergeCommit(DocumentNodeStore nodeStore,
Revision baseRevision,
@@ -37,8 +43,18 @@ class MergeCommit extends Commit {
return mergeRevs;
}
+ void addBranchCommits(@Nonnull Branch branch) {
+ for (Revision r : branch.getCommits()) {
+ if (!branch.getCommit(r).isRebase()) {
+ branchCommits.add(r);
+ }
+ }
+ }
+
@Override
public void applyToCache(Revision before, boolean isBranchCommit) {
- // do nothing for a merge commit
+ // do nothing for a merge commit, only notify node
+ // store about merged revisions
+ nodeStore.revisionsMerged(branchCommits);
}
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java Wed Jul 1 13:37:35 2015
@@ -29,8 +29,8 @@ import org.apache.jackrabbit.oak.cache.C
*/
class TieredDiffCache implements DiffCache {
- private final LocalDiffCache localCache;
- private final MemoryDiffCache memoryCache;
+ private final DiffCache localCache;
+ private final DiffCache memoryCache;
TieredDiffCache(DocumentMK.Builder builder) {
this.localCache = new LocalDiffCache(builder);
@@ -51,7 +51,8 @@ class TieredDiffCache implements DiffCac
}
/**
- * Creates a new entry in the {@link LocalDiffCache} only!
+ * Creates a new entry in the {@link LocalDiffCache} for local changes
+ * and {@link MemoryDiffCache} for external changes
*
* @param from the from revision.
* @param to the to revision.
@@ -59,8 +60,12 @@ class TieredDiffCache implements DiffCac
*/
@Nonnull
@Override
- public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) {
- return localCache.newEntry(from, to);
+ public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) {
+ if (local) {
+ return localCache.newEntry(from, to, true);
+ } else {
+ return memoryCache.newEntry(from, to, false);
+ }
}
@Nonnull
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java?rev=1688649&r1=1688648&r2=1688649&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java Wed Jul 1 13:37:35 2015
@@ -134,11 +134,14 @@ class UnsavedModifications {
* lock for a short period of time.
*
* @param store the document node store.
+ * @param snapshot callback when the snapshot of the pending changes is
+ * acquired.
* @param lock the lock to acquire to get a consistent snapshot of the
* revisions to write back.
* @return stats about the write operation.
*/
public BackgroundWriteStats persist(@Nonnull DocumentNodeStore store,
+ @Nonnull Snapshot snapshot,
@Nonnull Lock lock) {
BackgroundWriteStats stats = new BackgroundWriteStats();
if (map.size() == 0) {
@@ -150,12 +153,13 @@ class UnsavedModifications {
Clock clock = store.getClock();
long time = clock.getTime();
- // get a copy of the map while holding the lock
+ // get a copy of the map while holding the lock
lock.lock();
stats.lock = clock.getTime() - time;
time = clock.getTime();
Map<String, Revision> pending;
try {
+ snapshot.acquiring();
pending = Maps.newTreeMap(PathComparator.INSTANCE);
pending.putAll(map);
} finally {
@@ -218,4 +222,15 @@ class UnsavedModifications {
public String toString() {
return map.toString();
}
+
+ public interface Snapshot {
+
+ Snapshot IGNORE = new Snapshot() {
+ @Override
+ public void acquiring() {
+ }
+ };
+
+ void acquiring();
+ }
}