You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2016/12/13 20:15:09 UTC

svn commit: r1774094 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: catholicon
Date: Tue Dec 13 20:15:08 2016
New Revision: 1774094

URL: http://svn.apache.org/viewvc?rev=1774094&view=rev
Log:
OAK-3976: journal should support large(r) entries

JournalEntry tracks and provides number of modified nodes and if it had any branch commits.
If number of changed nodes reaches force-push-threshold (configurable), then journal entry is pushed.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Dec 13 20:15:08 2016
@@ -186,6 +186,12 @@ public final class DocumentNodeStore
             Boolean.getBoolean(SYS_PROP_DISABLE_JOURNAL);
 
     /**
+     * Threshold for number of paths in journal entry to require a force push during commit
+     * (instead of at background write)
+     */
+    private int journalPushThreshold = Integer.getInteger("oak.journalPushThreshold", 100000);
+
+    /**
      * The document store (might be used by multiple node stores).
      */
     protected final DocumentStore store;
@@ -789,7 +795,13 @@ public final class DocumentNodeStore
                         changes.modified(c.getModifiedPaths());
                         changes.addChangeSet(getChangeSet(info));
                         // update head revision
-                        newHead[0] = before.update(c.getRevision());
+                        Revision r = c.getRevision();
+                        newHead[0] = before.update(r);
+                        if (changes.getNumChangedNodes() >= journalPushThreshold) {
+                            LOG.info("Pushing journal entry at {} as number of changes ({}) have reached {}",
+                                    r, changes.getNumChangedNodes(), journalPushThreshold);
+                            pushJournalEntry(r);
+                        }
                         setRoot(newHead[0]);
                         commitQueue.headRevisionChanged();
                         dispatcher.contentChanged(getRoot(), info);
@@ -861,6 +873,14 @@ public final class DocumentNodeStore
         return enableConcurrentAddRemove;
     }
 
+    int getJournalPushThreshold() {
+        return journalPushThreshold;
+    }
+
+    void setJournalPushThreshold(int journalPushThreshold) {
+        this.journalPushThreshold = journalPushThreshold;
+    }
+
     @Nonnull
     public ClusterNodeInfo getClusterInfo() {
         return clusterNodeInfo;
@@ -2142,20 +2162,26 @@ public final class DocumentNodeStore
         return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() {
             @Override
             public void acquiring(Revision mostRecent) {
-                if (store.create(JOURNAL, singletonList(changes.asUpdateOp(mostRecent)))) {
-                    // success: start with a new document
-                    changes = newJournalEntry();
-                } else {
-                    // fail: log and keep the changes
-                    LOG.error("Failed to write to journal, accumulating changes for future write (~" + changes.getMemory()
-                            + " bytes).");
-                }
+                pushJournalEntry(mostRecent);
             }
         }, backgroundOperationLock.writeLock());
     }
 
     //-----------------------------< internal >---------------------------------
 
+    void pushJournalEntry(Revision r) {
+        if (!changes.hasChanges()) {
+            LOG.debug("Not pushing journal as there are no changes");
+        } else if (store.create(JOURNAL, singletonList(changes.asUpdateOp(r)))) {
+            // success: start with a new document
+            changes = newJournalEntry();
+        } else {
+            // fail: log and keep the changes
+            LOG.error("Failed to write to journal({}), accumulating changes for future write (~{} bytes, {} paths)",
+                    r, changes.getMemory(), changes.getNumChangedNodes());
+        }
+    }
+
     /**
      * Returns the binary size of a property value represented as a JSON or
      * {@code -1} if the property is not of type binary.

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java Tue Dec 13 20:15:08 2016
@@ -86,6 +86,17 @@ public final class JournalEntry extends
 
     private volatile TreeNode changes = null;
 
+    /**
+     * Counts number of paths changed due to {@code modified()} calls.
+     * Applicable for entries being prepared to be persisted.
+     */
+    private volatile int numChangedNodes = 0;
+    /**
+     * Tracks if this entry has branch commits or not
+     * Applicable for entries being prepared to be persisted.
+     */
+    private boolean hasBranchCommits = false;
+
     private boolean concurrent;
 
     JournalEntry(DocumentStore store) {
@@ -316,6 +327,9 @@ public final class JournalEntry extends
     void modified(String path) {
         TreeNode node = getChanges();
         for (String name : PathUtils.elements(path)) {
+            if (node.get(name) == null) {
+                numChangedNodes++;
+            }
             node = node.getOrCreate(name);
         }
     }
@@ -364,6 +378,7 @@ public final class JournalEntry extends
                 branchCommits += ",";
             }
             branchCommits += asId(r.asBranchRevision());
+            hasBranchCommits = true;
         }
         put(BRANCH_COMMITS, branchCommits);
     }
@@ -455,6 +470,20 @@ public final class JournalEntry extends
         };
     }
 
+    /**
+     * @return number of changed nodes being tracked by this journal entry.
+     */
+    int getNumChangedNodes() {
+        return numChangedNodes;
+    }
+
+    /**
+     * @return if this entry has some changes to be pushed
+     */
+    boolean hasChanges() {
+        return numChangedNodes > 0 || hasBranchCommits;
+    }
+
     //-----------------------------< internal >---------------------------------
 
     private static String getChanges(TreeNode node) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java Tue Dec 13 20:15:08 2016
@@ -65,7 +65,6 @@ import java.util.concurrent.locks.Reentr
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -2868,6 +2867,25 @@ public class DocumentNodeStoreTest {
         }
     }
 
+    @Test
+    public void forceJournalFlush() throws Exception {
+        DocumentNodeStore ns = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore();
+        ns.setJournalPushThreshold(2);
+        int numChangedPaths;
+
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("foo");
+        merge(ns, builder);
+        numChangedPaths = ns.getCurrentJournalEntry().getNumChangedNodes();
+        assertTrue("Single path change shouldn't flush", numChangedPaths > 0);
+
+        builder = ns.getRoot().builder();
+        builder.child("bar");
+        merge(ns, builder);
+        numChangedPaths = ns.getCurrentJournalEntry().getNumChangedNodes();
+        assertTrue("Two added paths should have forced flush", numChangedPaths == 0);
+    }
+
     private static class TestException extends RuntimeException {
 
     }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java Tue Dec 13 20:15:08 2016
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -311,6 +312,50 @@ public class JournalEntryTest {
         sort.close();
     }
 
+    @Test
+    public void countUpdatedPaths() {
+        DocumentStore store = new MemoryDocumentStore();
+        JournalEntry entry = JOURNAL.newDocument(store);
+
+        assertEquals("Incorrect number of initial paths", 0, entry.getNumChangedNodes());
+        assertFalse("Incorrect hasChanges", entry.hasChanges());
+
+        entry.modified("/foo");
+        entry.modified("/bar");
+        assertEquals("Incorrect number of paths", 2, entry.getNumChangedNodes());
+        assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+        entry.modified(Arrays.asList("/foo1", "/bar1"));
+        assertEquals("Incorrect number of paths", 4, entry.getNumChangedNodes());
+        assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+        entry.modified("/foo/bar2");
+        assertEquals("Incorrect number of paths", 5, entry.getNumChangedNodes());
+        assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+        entry.modified("/foo3/bar3");
+        assertEquals("Incorrect number of paths", 7, entry.getNumChangedNodes());
+        assertTrue("Incorrect hasChanges", entry.hasChanges());
+
+        entry.modified(Arrays.asList("/foo/bar4", "/foo5/bar5"));
+        assertEquals("Incorrect number of paths", 10, entry.getNumChangedNodes());
+        assertTrue("Incorrect hasChanges", entry.hasChanges());
+    }
+
+    @Test
+    public void branchAdditionMarksChanges() {
+        DocumentStore store = new MemoryDocumentStore();
+        JournalEntry entry = JOURNAL.newDocument(store);
+
+        assertFalse("Incorrect hasChanges", entry.hasChanges());
+
+        entry.branchCommit(Collections.<Revision>emptyList());
+        assertFalse("Incorrect hasChanges", entry.hasChanges());
+
+        entry.branchCommit(Collections.singleton(Revision.fromString("r123-0-1")));
+        assertTrue("Incorrect hasChanges", entry.hasChanges());
+    }
+
     private static void addRandomPaths(java.util.Collection<String> paths) throws IOException {
         paths.add("/");
         Random random = new Random(42);

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java Tue Dec 13 20:15:08 2016
@@ -102,10 +102,8 @@ public class JournalGCTest {
 
         c.waitUntil(c.getTime() + TimeUnit.HOURS.toMillis(1));
 
-        // must collect all journal entries. the first created when
-        // DocumentNodeStore was initialized and the second created
-        // by the background update
-        assertEquals(2, jgc.gc(1, TimeUnit.HOURS));
+        // must collect the journal entry created by the background update
+        assertEquals(1, jgc.gc(1, TimeUnit.HOURS));
 
         // current time, but without the increment done by getTime()
         now = c.getTime() - 1;

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java?rev=1774094&r1=1774093&r2=1774094&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java Tue Dec 13 20:15:08 2016
@@ -337,11 +337,9 @@ public class JournalTest extends Abstrac
         DocumentNodeStore ds2 = mk2.getNodeStore();
         final int c2Id = ds2.getClusterId();
         
-        // should have 1 each with just the root changed
-        assertJournalEntries(ds1, "{}");
-        assertJournalEntries(ds2, "{}");
-        assertEquals(1, countJournalEntries(ds1, 10)); 
-        assertEquals(1, countJournalEntries(ds2, 10));
+        // should have none yet
+        assertEquals(0, countJournalEntries(ds1, 10));
+        assertEquals(0, countJournalEntries(ds2, 10));
         
         //1. Create base structure /x/y
         NodeBuilder b1 = ds1.getRoot().builder();
@@ -381,11 +379,11 @@ public class JournalTest extends Abstrac
 
         final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1);
 
-        // besides the former root change, now 1 also has 
+        // now 1 also has
         final String change1 = "{\"x\":{\"y\":{}}}";
-        assertJournalEntries(ds1, "{}", change1);
+        assertJournalEntries(ds1, change1);
         final String change2 = "{\"x\":{}}";
-        assertJournalEntries(ds2, "{}", change2);
+        assertJournalEntries(ds2, change2);
 
 
         String change2b = "{\"x\":{\"y\":{\"z\":{}}}}";
@@ -400,14 +398,14 @@ public class JournalTest extends Abstrac
             assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id));
     
             // now 1 is unchanged, but 2 was recovered now, so has one more:
-            assertJournalEntries(ds1, "{}", change1); // unchanged
-            assertJournalEntries(ds2, "{}", change2, change2b);
+            assertJournalEntries(ds1, change1); // unchanged
+            assertJournalEntries(ds2, change2, change2b);
             
             // just some no-ops:
             recovery.recover(c2Id);
             recovery.recover(Iterators.<NodeDocument>emptyIterator(), c2Id);
-            assertJournalEntries(ds1, "{}", change1); // unchanged
-            assertJournalEntries(ds2, "{}", change2, change2b);
+            assertJournalEntries(ds1, change1); // unchanged
+            assertJournalEntries(ds2, change2, change2b);
 
         } else {
         
@@ -439,8 +437,8 @@ public class JournalTest extends Abstrac
             ready.await(5, TimeUnit.SECONDS);
             start.countDown();
             assertTrue(end.await(20, TimeUnit.SECONDS));
-            assertJournalEntries(ds1, "{}", change1); // unchanged
-            assertJournalEntries(ds2, "{}", change2, change2b);
+            assertJournalEntries(ds1, change1); // unchanged
+            assertJournalEntries(ds2, change2, change2b);
             for (Exception ex : exceptions) {
                 throw ex;
             }