You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/05/07 13:16:32 UTC

svn commit: r1479858 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: kernel/ plugins/segment/

Author: jukka
Date: Tue May  7 11:16:32 2013
New Revision: 1479858

URL: http://svn.apache.org/r1479858
Log:
OAK-775: Implement backward compatible observation

Simplify Kernel- and SegmentNodeStore to make it easier to inject local observation support

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java Tue May  7 11:16:32 2013
@@ -22,12 +22,9 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.api.Type.STRINGS;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
 
 import javax.jcr.PropertyType;
 
-import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.json.JsopBuilder;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -35,50 +32,36 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TODO document
  */
 class JsopDiff implements NodeStateDiff {
-    private static final Logger log = LoggerFactory.getLogger(JsopDiff.class);
 
-    private final MicroKernel kernel;
+    private final KernelNodeStore store;
 
     protected final JsopBuilder jsop;
 
     protected final String path;
 
-    public JsopDiff(MicroKernel kernel, JsopBuilder jsop, String path) {
-        this.kernel = kernel;
+    JsopDiff(KernelNodeStore store, JsopBuilder jsop, String path) {
+        this.store = store;
         this.jsop = jsop;
         this.path = path;
     }
 
-    public JsopDiff(MicroKernel kernel) {
-        this(kernel, new JsopBuilder(), "/");
+    JsopDiff(KernelNodeStore store) {
+        this(store, new JsopBuilder(), "/");
     }
 
     public static void diffToJsop(
-            MicroKernel kernel, NodeState before, NodeState after,
+            KernelNodeStore store, NodeState before, NodeState after,
             String path, JsopBuilder jsop) {
-        after.compareAgainstBaseState(before, new JsopDiff(kernel, jsop, path));
-    }
-
-    public static String diffToJsop(NodeState before, NodeState after) {
-        JsopDiff diff = new JsopDiff(null) {
-            @Override
-            protected String writeBlob(Blob blob) {
-                return "Blob{" + Arrays.toString(blob.sha256()) + '}';
-            }
-        };
-        after.compareAgainstBaseState(before, diff);
-        return diff.toString();
+        after.compareAgainstBaseState(before, new JsopDiff(store, jsop, path));
     }
 
     protected JsopDiff createChildDiff(JsopBuilder jsop, String path) {
-        return new JsopDiff(kernel, jsop, path);
+        return new JsopDiff(store, jsop, path);
     }
 
     //-----------------------------------------------------< NodeStateDiff >--
@@ -194,24 +177,18 @@ class JsopDiff implements NodeStateDiff 
      * @param blob  blob to persist
      * @return  id of the persisted blob
      */
-    protected String writeBlob(Blob blob) {
-        String blobId;
+    private String writeBlob(Blob blob) {
+        KernelBlob kernelBlob;
         if (blob instanceof KernelBlob) {
-            blobId = ((KernelBlob) blob).getBinaryID();
+            kernelBlob = (KernelBlob) blob;
         } else {
-            InputStream is = blob.getNewStream();
-            blobId = kernel.write(is);
-            close(is);
+            try {
+                kernelBlob = store.createBlob(blob.getNewStream());
+            } catch (IOException e) {
+                throw new IllegalStateException(e);
+            }
         }
-        return blobId;
+        return kernelBlob.getBinaryID();
     }
 
-    private static void close(InputStream stream) {
-        try {
-            stream.close();
-        }
-        catch (IOException e) {
-            log.warn("Error closing stream", e);
-        }
-    }
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java Tue May  7 11:16:32 2013
@@ -64,7 +64,6 @@ public class KernelNodeStore extends Abs
      */
     private KernelNodeState root;
 
-
     public KernelNodeStore(final MicroKernel kernel, long cacheSize) {
         this.kernel = checkNotNull(kernel);
         this.cache = CacheBuilder.newBuilder()
@@ -160,4 +159,12 @@ public class KernelNodeStore extends Abs
         }
     }
 
+    NodeState commit(String jsop, String baseRevision) {
+        return getRootState(kernel.commit("", jsop, baseRevision, null));
+    }
+
+    NodeState merge(String headRevision) {
+        return getRootState(kernel.merge(headRevision, null));
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java Tue May  7 11:16:32 2013
@@ -43,10 +43,7 @@ class KernelNodeStoreBranch extends Abst
     private final KernelNodeStore store;
 
     /** Root state of the base revision of this branch */
-    private NodeState base;
-
-    /** Revision of the base state of this branch*/
-    private String baseRevision;
+    private KernelNodeState base;
 
     /** Root state of the transient head revision on top of persisted branch, null if merged. */
     private NodeState head;
@@ -60,7 +57,6 @@ class KernelNodeStoreBranch extends Abst
     KernelNodeStoreBranch(KernelNodeStore store, KernelNodeState root) {
         this.store = store;
         this.base = root;
-        this.baseRevision = root.getRevision();
         this.head = root;
     }
 
@@ -151,25 +147,25 @@ class KernelNodeStoreBranch extends Abst
                 head = null;  // Mark as merged
                 return base;
             } else {
-                MicroKernel kernel = store.getKernel();
-                String newRevision;
-                JsopDiff diff = new JsopDiff(kernel);
+                NodeState newRoot;
+                JsopDiff diff = new JsopDiff(store);
                 if (headRevision == null) {
                     // no branch created yet, commit directly
                     head.compareAgainstBaseState(base, diff);
-                    newRevision = kernel.commit("", diff.toString(), baseRevision, null);
+                    newRoot = store.commit(diff.toString(), base.getRevision());
                 } else {
                     // commit into branch and merge
                     head.compareAgainstBaseState(store.getRootState(headRevision), diff);
                     String jsop = diff.toString();
                     if (!jsop.isEmpty()) {
-                        headRevision = kernel.commit("", jsop, headRevision, null);
+                        headRevision = store.getKernel().commit(
+                                "", jsop, headRevision, null);
                     }
-                    newRevision = kernel.merge(headRevision, null);
+                    newRoot = store.merge(headRevision);
                     headRevision = null;
                 }
                 head = null;  // Mark as merged
-                return store.getRootState(newRevision);
+                return newRoot;
             }
         } catch (MicroKernelException e) {
             head = oldRoot;
@@ -186,7 +182,6 @@ class KernelNodeStoreBranch extends Abst
             // Nothing was written to this branch: set new base revision
             head = root;
             base = root;
-            baseRevision = root.getRevision();
         } else if (headRevision == null) {
             // Nothing written to persistent branch yet
             // perform rebase in memory
@@ -194,14 +189,12 @@ class KernelNodeStoreBranch extends Abst
             getHead().compareAgainstBaseState(getBase(), new ConflictAnnotatingRebaseDiff(builder));
             head = builder.getNodeState();
             base = root;
-            baseRevision = root.getRevision();
         } else {
             // perform rebase in kernel
             persistTransientHead();
             headRevision = store.getKernel().rebase(headRevision, root.getRevision());
             head = store.getRootState(headRevision);
             base = root;
-            baseRevision = root.getRevision();
         }
     }
 
@@ -224,7 +217,7 @@ class KernelNodeStoreBranch extends Abst
         MicroKernel kernel = store.getKernel();
         if (headRevision == null) {
             // create the branch if this is the first commit
-            headRevision = kernel.branch(baseRevision);
+            headRevision = kernel.branch(base.getRevision());
         }
 
         // persist transient changes first
@@ -235,14 +228,13 @@ class KernelNodeStoreBranch extends Abst
     }
 
     private void persistTransientHead() {
-        NodeState oldBase = base;
-        String oldBaseRevision = baseRevision;
+        KernelNodeState oldBase = base;
         NodeState oldHead = head;
         String oldHeadRevision = headRevision;
         boolean success = false;
         try {
             MicroKernel kernel = store.getKernel();
-            JsopDiff diff = new JsopDiff(store.getKernel());
+            JsopDiff diff = new JsopDiff(store);
             if (headRevision == null) {
                 // no persistent branch yet
                 if (head.equals(base)) {
@@ -251,7 +243,7 @@ class KernelNodeStoreBranch extends Abst
                     return;
                 } else {
                     // create branch
-                    headRevision = kernel.branch(baseRevision);
+                    headRevision = kernel.branch(base.getRevision());
                     head.compareAgainstBaseState(base, diff);
                 }
             } else {
@@ -274,7 +266,6 @@ class KernelNodeStoreBranch extends Abst
             // revert to old state if unsuccessful
             if (!success) {
                 base = oldBase;
-                baseRevision = oldBaseRevision;
                 head = oldHead;
                 headRevision = oldHeadRevision;
             }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue May  7 11:16:32 2013
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.AbstractNodeStore;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
@@ -34,24 +36,37 @@ public class SegmentNodeStore extends Ab
 
     private final SegmentReader reader;
 
+    private final Observer observer;
+
+    private SegmentNodeState root;
+
     public SegmentNodeStore(SegmentStore store, String journal) {
         this.store = store;
         this.journal = store.getJournal(journal);
         this.reader = new SegmentReader(store);
+        this.observer = EmptyObserver.INSTANCE;
+        this.root = new SegmentNodeState(store, this.journal.getHead());
     }
 
     public SegmentNodeStore(SegmentStore store) {
         this(store, "root");
     }
 
+    boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+        return journal.setHead(base.getRecordId(), head.getRecordId());
+    }
+
     @Override @Nonnull
-    public NodeState getRoot() {
-        return new SegmentNodeState(store, journal.getHead());
+    public synchronized SegmentNodeState getRoot() {
+        NodeState before = root;
+        root = new SegmentNodeState(store, journal.getHead());
+        observer.contentChanged(before, root);
+        return root;
     }
 
     @Override @Nonnull
     public NodeStoreBranch branch() {
-        return new SegmentNodeStoreBranch(store, journal);
+        return new SegmentNodeStoreBranch(this, new SegmentWriter(store));
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Tue May  7 11:16:32 2013
@@ -32,49 +32,46 @@ class SegmentNodeStoreBranch extends Abs
 
     private static final Random RANDOM = new Random();
 
-    private final SegmentStore store;
-
-    private final Journal journal;
+    private final SegmentNodeStore store;
 
     private final SegmentWriter writer;
 
-    private RecordId baseId;
+    private SegmentNodeState base;
 
-    private RecordId rootId;
+    private SegmentNodeState head;
 
-    SegmentNodeStoreBranch(SegmentStore store, Journal journal) {
+    SegmentNodeStoreBranch(SegmentNodeStore store, SegmentWriter writer) {
         this.store = store;
-        this.journal = journal;
-        this.writer = new SegmentWriter(store);
-        this.baseId = journal.getHead();
-        this.rootId = baseId;
+        this.writer = writer;
+        this.base = store.getRoot();
+        this.head = base;
     }
 
     @Override @Nonnull
     public NodeState getBase() {
-        return new SegmentNodeState(store, baseId);
+        return base;
     }
 
     @Override @Nonnull
     public synchronized NodeState getHead() {
-        return new SegmentNodeState(store, rootId);
+        return head;
     }
 
     @Override
     public synchronized void setRoot(NodeState newRoot) {
-        this.rootId = writer.writeNode(newRoot).getRecordId();
+        head = writer.writeNode(newRoot);
         writer.flush();
     }
 
     @Override
     public synchronized void rebase() {
-        RecordId newBaseId = journal.getHead();
-        if (!baseId.equals(newBaseId)) {
-            NodeBuilder builder =
-                    new SegmentNodeState(store, newBaseId).builder();
-            getHead().compareAgainstBaseState(getBase(), new ConflictAnnotatingRebaseDiff(builder));
-            this.baseId = newBaseId;
-            this.rootId = writer.writeNode(builder.getNodeState()).getRecordId();
+        SegmentNodeState newBase = store.getRoot();
+        if (!base.getRecordId().equals(newBase.getRecordId())) {
+            NodeBuilder builder = newBase.builder();
+            head.compareAgainstBaseState(
+                    base, new ConflictAnnotatingRebaseDiff(builder));
+            base = newBase;
+            head = writer.writeNode(builder.getNodeState());
             writer.flush();
         }
     }
@@ -82,23 +79,23 @@ class SegmentNodeStoreBranch extends Abs
     @Override @Nonnull
     public synchronized NodeState merge(CommitHook hook)
             throws CommitFailedException {
-        RecordId originalBaseId = baseId;
-        RecordId originalRootId = rootId;
+        SegmentNodeState originalBase = base;
+        SegmentNodeState originalHead = head;
         long backoff = 1;
-        while (baseId != rootId) {
+        while (base != head) {
             // apply commit hooks on the rebased changes
-            RecordId headId = writer.writeNode(
-                    hook.processCommit(getBase(), getHead())).getRecordId();
+            SegmentNodeState root = writer.writeNode(
+                    hook.processCommit(base, head));
             writer.flush();
 
             // use optimistic locking to update the journal
-            if (journal.setHead(baseId, headId)) {
-                baseId = headId;
-                rootId = headId;
+            if (store.setHead(base, root)) {
+                base = root;
+                head = root;
             } else {
                 // someone else was faster, so restore state and retry later
-                baseId = originalBaseId;
-                rootId = originalRootId;
+                base = originalBase;
+                head = originalHead;
 
                 // use exponential backoff to reduce contention
                 if (backoff < 10000) {