You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2016/12/13 20:15:09 UTC
svn commit: r1774094 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: catholicon
Date: Tue Dec 13 20:15:08 2016
New Revision: 1774094
URL: http://svn.apache.org/viewvc?rev=1774094&view=rev
Log:
OAK-3976: journal should support large(r) entries
JournalEntry tracks and provides number of modified nodes and if it had any branch commits.
If number of changed nodes reaches force-push-threshold (configurable), then journal entry is pushed.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Dec 13 20:15:08 2016
@@ -186,6 +186,12 @@ public final class DocumentNodeStore
Boolean.getBoolean(SYS_PROP_DISABLE_JOURNAL);
/**
+ * Threshold for number of paths in journal entry to require a force push during commit
+ * (instead of at background write)
+ */
+ private int journalPushThreshold = Integer.getInteger("oak.journalPushThreshold", 100000);
+
+ /**
* The document store (might be used by multiple node stores).
*/
protected final DocumentStore store;
@@ -789,7 +795,13 @@ public final class DocumentNodeStore
changes.modified(c.getModifiedPaths());
changes.addChangeSet(getChangeSet(info));
// update head revision
- newHead[0] = before.update(c.getRevision());
+ Revision r = c.getRevision();
+ newHead[0] = before.update(r);
+ if (changes.getNumChangedNodes() >= journalPushThreshold) {
+ LOG.info("Pushing journal entry at {} as number of changes ({}) have reached {}",
+ r, changes.getNumChangedNodes(), journalPushThreshold);
+ pushJournalEntry(r);
+ }
setRoot(newHead[0]);
commitQueue.headRevisionChanged();
dispatcher.contentChanged(getRoot(), info);
@@ -861,6 +873,14 @@ public final class DocumentNodeStore
return enableConcurrentAddRemove;
}
+ int getJournalPushThreshold() {
+ return journalPushThreshold;
+ }
+
+ void setJournalPushThreshold(int journalPushThreshold) {
+ this.journalPushThreshold = journalPushThreshold;
+ }
+
@Nonnull
public ClusterNodeInfo getClusterInfo() {
return clusterNodeInfo;
@@ -2142,20 +2162,26 @@ public final class DocumentNodeStore
return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() {
@Override
public void acquiring(Revision mostRecent) {
- if (store.create(JOURNAL, singletonList(changes.asUpdateOp(mostRecent)))) {
- // success: start with a new document
- changes = newJournalEntry();
- } else {
- // fail: log and keep the changes
- LOG.error("Failed to write to journal, accumulating changes for future write (~" + changes.getMemory()
- + " bytes).");
- }
+ pushJournalEntry(mostRecent);
}
}, backgroundOperationLock.writeLock());
}
//-----------------------------< internal >---------------------------------
+ void pushJournalEntry(Revision r) {
+ if (!changes.hasChanges()) {
+ LOG.debug("Not pushing journal as there are no changes");
+ } else if (store.create(JOURNAL, singletonList(changes.asUpdateOp(r)))) {
+ // success: start with a new document
+ changes = newJournalEntry();
+ } else {
+ // fail: log and keep the changes
+ LOG.error("Failed to write to journal({}), accumulating changes for future write (~{} bytes, {} paths)",
+ r, changes.getMemory(), changes.getNumChangedNodes());
+ }
+ }
+
/**
* Returns the binary size of a property value represented as a JSON or
* {@code -1} if the property is not of type binary.
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Tue Dec 13 20:15:08 2016
@@ -86,6 +86,17 @@ public final class JournalEntry extends
private volatile TreeNode changes = null;
+ /**
+ * Counts number of paths changed due to {@code modified()} calls.
+ * Applicable for entries being prepared to be persisted.
+ */
+ private volatile int numChangedNodes = 0;
+ /**
+ * Tracks if this entry has branch commits or not
+ * Applicable for entries being prepared to be persisted.
+ */
+ private boolean hasBranchCommits = false;
+
private boolean concurrent;
JournalEntry(DocumentStore store) {
@@ -316,6 +327,9 @@ public final class JournalEntry extends
void modified(String path) {
TreeNode node = getChanges();
for (String name : PathUtils.elements(path)) {
+ if (node.get(name) == null) {
+ numChangedNodes++;
+ }
node = node.getOrCreate(name);
}
}
@@ -364,6 +378,7 @@ public final class JournalEntry extends
branchCommits += ",";
}
branchCommits += asId(r.asBranchRevision());
+ hasBranchCommits = true;
}
put(BRANCH_COMMITS, branchCommits);
}
@@ -455,6 +470,20 @@ public final class JournalEntry extends
};
}
+ /**
+ * @return number of changed nodes being tracked by this journal entry.
+ */
+ int getNumChangedNodes() {
+ return numChangedNodes;
+ }
+
+ /**
+ * @return if this entry has some changes to be pushed
+ */
+ boolean hasChanges() {
+ return numChangedNodes > 0 || hasBranchCommits;
+ }
+
//-----------------------------< internal >---------------------------------
private static String getChanges(TreeNode node) {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Dec 13 20:15:08 2016
@@ -65,7 +65,6 @@ import java.util.concurrent.locks.Reentr
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@@ -2868,6 +2867,25 @@ public class DocumentNodeStoreTest {
}
}
+ @Test
+ public void forceJournalFlush() throws Exception {
+ DocumentNodeStore ns = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore();
+ ns.setJournalPushThreshold(2);
+ int numChangedPaths;
+
+ NodeBuilder builder = ns.getRoot().builder();
+ builder.child("foo");
+ merge(ns, builder);
+ numChangedPaths = ns.getCurrentJournalEntry().getNumChangedNodes();
+ assertTrue("Single path change shouldn't flush", numChangedPaths > 0);
+
+ builder = ns.getRoot().builder();
+ builder.child("bar");
+ merge(ns, builder);
+ numChangedPaths = ns.getCurrentJournalEntry().getNumChangedNodes();
+ assertTrue("Two added paths should have forced flush", numChangedPaths == 0);
+ }
+
private static class TestException extends RuntimeException {
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Tue Dec 13 20:15:08 2016
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -311,6 +312,50 @@ public class JournalEntryTest {
sort.close();
}
+ @Test
+ public void countUpdatedPaths() {
+ DocumentStore store = new MemoryDocumentStore();
+ JournalEntry entry = JOURNAL.newDocument(store);
+
+ assertEquals("Incorrect number of initial paths", 0, entry.getNumChangedNodes());
+ assertFalse("Incorrect hasChanges", entry.hasChanges());
+
+ entry.modified("/foo");
+ entry.modified("/bar");
+ assertEquals("Incorrect number of paths", 2, entry.getNumChangedNodes());
+ assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+ entry.modified(Arrays.asList("/foo1", "/bar1"));
+ assertEquals("Incorrect number of paths", 4, entry.getNumChangedNodes());
+ assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+ entry.modified("/foo/bar2");
+ assertEquals("Incorrect number of paths", 5, entry.getNumChangedNodes());
+ assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+ entry.modified("/foo3/bar3");
+ assertEquals("Incorrect number of paths", 7, entry.getNumChangedNodes());
+ assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+ entry.modified(Arrays.asList("/foo/bar4", "/foo5/bar5"));
+ assertEquals("Incorrect number of paths", 10, entry.getNumChangedNodes());
+ assertTrue("Incorrect hasChanges", entry.hasChanges());
+ }
+
+ @Test
+ public void branchAdditionMarksChanges() {
+ DocumentStore store = new MemoryDocumentStore();
+ JournalEntry entry = JOURNAL.newDocument(store);
+
+ assertFalse("Incorrect hasChanges", entry.hasChanges());
+
+ entry.branchCommit(Collections.<Revision>emptyList());
+ assertFalse("Incorrect hasChanges", entry.hasChanges());
+
+ entry.branchCommit(Collections.singleton(Revision.fromString("r123-0-1")));
+ assertTrue("Incorrect hasChanges", entry.hasChanges());
+ }
+
private static void addRandomPaths(java.util.Collection<String> paths) throws IOException {
paths.add("/");
Random random = new Random(42);
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java Tue Dec 13 20:15:08 2016
@@ -102,10 +102,8 @@ public class JournalGCTest {
c.waitUntil(c.getTime() + TimeUnit.HOURS.toMillis(1));
- // must collect all journal entries. the first created when
- // DocumentNodeStore was initialized and the second created
- // by the background update
- assertEquals(2, jgc.gc(1, TimeUnit.HOURS));
+ // must collect the journal entry created by the background update
+ assertEquals(1, jgc.gc(1, TimeUnit.HOURS));
// current time, but without the increment done by getTime()
now = c.getTime() - 1;
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Tue Dec 13 20:15:08 2016
@@ -337,11 +337,9 @@ public class JournalTest extends Abstrac
DocumentNodeStore ds2 = mk2.getNodeStore();
final int c2Id = ds2.getClusterId();
- // should have 1 each with just the root changed
- assertJournalEntries(ds1, "{}");
- assertJournalEntries(ds2, "{}");
- assertEquals(1, countJournalEntries(ds1, 10));
- assertEquals(1, countJournalEntries(ds2, 10));
+ // should have none yet
+ assertEquals(0, countJournalEntries(ds1, 10));
+ assertEquals(0, countJournalEntries(ds2, 10));
//1. Create base structure /x/y
NodeBuilder b1 = ds1.getRoot().builder();
@@ -381,11 +379,11 @@ public class JournalTest extends Abstrac
final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1);
- // besides the former root change, now 1 also has
+ // now 1 also has
final String change1 = "{\"x\":{\"y\":{}}}";
- assertJournalEntries(ds1, "{}", change1);
+ assertJournalEntries(ds1, change1);
final String change2 = "{\"x\":{}}";
- assertJournalEntries(ds2, "{}", change2);
+ assertJournalEntries(ds2, change2);
String change2b = "{\"x\":{\"y\":{\"z\":{}}}}";
@@ -400,14 +398,14 @@ public class JournalTest extends Abstrac
assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id));
// now 1 is unchanged, but 2 was recovered now, so has one more:
- assertJournalEntries(ds1, "{}", change1); // unchanged
- assertJournalEntries(ds2, "{}", change2, change2b);
+ assertJournalEntries(ds1, change1); // unchanged
+ assertJournalEntries(ds2, change2, change2b);
// just some no-ops:
recovery.recover(c2Id);
recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id);
- assertJournalEntries(ds1, "{}", change1); // unchanged
- assertJournalEntries(ds2, "{}", change2, change2b);
+ assertJournalEntries(ds1, change1); // unchanged
+ assertJournalEntries(ds2, change2, change2b);
} else {
@@ -439,8 +437,8 @@ public class JournalTest extends Abstrac
ready.await(5, TimeUnit.SECONDS);
start.countDown();
assertTrue(end.await(20, TimeUnit.SECONDS));
- assertJournalEntries(ds1, "{}", change1); // unchanged
- assertJournalEntries(ds2, "{}", change2, change2b);
+ assertJournalEntries(ds1, change1); // unchanged
+ assertJournalEntries(ds2, change2, change2b);
for (Exception ex : exceptions) {
throw ex;
}