You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/03/25 12:18:18 UTC

svn commit: r1460619 - in /jackrabbit/oak/trunk/oak-mongomk/src: main/java/org/apache/jackrabbit/mongomk/prototype/ test/java/org/apache/jackrabbit/mongomk/prototype/

Author: thomasm
Date: Mon Mar 25 11:18:17 2013
New Revision: 1460619

URL: http://svn.apache.org/r1460619
Log:
OAK-619 Lock-free MongoMK implementation (detect conflicts; WIP)

Added:
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java?rev=1460619&r1=1460618&r2=1460619&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java Mon Mar 25 11:18:17 2013
@@ -51,6 +51,7 @@ public class Commit {
     private static final boolean PURGE_OLD_REVISIONS = true;
     
     private final MongoMK mk;
+    private final Revision baseRevision;
     private final Revision revision;
     private HashMap<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
     private JsopWriter diff = new JsopStream();
@@ -59,7 +60,8 @@ public class Commit {
     private HashSet<String> addedNodes = new HashSet<String>();
     private HashSet<String> removedNodes = new HashSet<String>();
     
-    Commit(MongoMK mk, Revision revision) {
+    Commit(MongoMK mk, Revision baseRevision, Revision revision) {
+        this.baseRevision = baseRevision;
         this.revision = revision;
         this.mk = mk;
     }
@@ -118,57 +120,65 @@ public class Commit {
      */
     void applyToDocumentStore() {
         DocumentStore store = mk.getDocumentStore();
-        String commitRoot = null;
+        String commitRootPath = null;
         ArrayList<UpdateOp> newNodes = new ArrayList<UpdateOp>();
         ArrayList<UpdateOp> changedNodes = new ArrayList<UpdateOp>();
         for (String p : operations.keySet()) {
             markChanged(p);
-            if (commitRoot == null) {
-                commitRoot = p;
+            if (commitRootPath == null) {
+                commitRootPath = p;
             } else {
-                while (!PathUtils.isAncestor(commitRoot, p)) {
-                    commitRoot = PathUtils.getParentPath(commitRoot);
-                    if (PathUtils.denotesRoot(commitRoot)) {
+                while (!PathUtils.isAncestor(commitRootPath, p)) {
+                    commitRootPath = PathUtils.getParentPath(commitRootPath);
+                    if (PathUtils.denotesRoot(commitRootPath)) {
                         break;
                     }
                 }
             }
         }
-        int commitRootDepth = PathUtils.getDepth(commitRoot);
+        int commitRootDepth = PathUtils.getDepth(commitRootPath);
         // create a "root of the commit" if there is none
-        UpdateOp root = getUpdateOperationForNode(commitRoot);
+        UpdateOp commitRoot = getUpdateOperationForNode(commitRootPath);
         for (String p : operations.keySet()) {
             UpdateOp op = operations.get(p);
-            if (op == root) {
+            op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
+            if (op.isNew) {
+                op.addMapEntry(UpdateOp.DELETED + "." + revision.toString(), "false");
+            }
+            if (op == commitRoot) {
                 // apply at the end
-            } else if (op.isNew()) {
-                newNodes.add(op);
             } else {
-                changedNodes.add(op);
+                op.addMapEntry(UpdateOp.COMMIT_ROOT + "." + revision.toString(), commitRootDepth);
+                if (op.isNew()) {
+                    newNodes.add(op);
+                } else {
+                    changedNodes.add(op);
+                }
             }
         }
-        if (changedNodes.size() == 0 && root.isNew) {
+        if (changedNodes.size() == 0 && commitRoot.isNew) {
             // no updates and root of commit is also new. that is,
             // it is the root of a subtree added in a commit.
-            // so we just add the root like the others
-            root.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
-            newNodes.add(root);
+            // so we try to add the root like all other nodes
+            commitRoot.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
+            newNodes.add(commitRoot);
         }
         try {
             if (newNodes.size() > 0) {
                 // set commit root on new nodes
-                for (UpdateOp op : newNodes) {
-                    if (op != root) {
-                        op.addMapEntry(UpdateOp.COMMIT_ROOT + "." + revision.toString(), commitRootDepth);
-                    }
-                }
                 if (!store.create(Collection.NODES, newNodes)) {
+                    // some of the documents already exist:
+                    // try to apply all changes one by one
                     for (UpdateOp op : newNodes) {
                         op.unset(UpdateOp.ID);
-                        op.addMapEntry(UpdateOp.DELETED + "." + revision.toString(), "false");
-                        op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
-                        createOrUpdateNode(store, op);
+                        if (op == commitRoot) {
+                            // don't write the commit root just yet
+                            // (because there might be a conflict)
+                            commitRoot.unset(UpdateOp.REVISIONS + "." + revision.toString());
+                        }
+                        changedNodes.add(op);
                     }
+                    newNodes.clear();
                 }
             }
             for (UpdateOp op : changedNodes) {
@@ -176,13 +186,14 @@ public class Commit {
                 op.addMapEntry(UpdateOp.COMMIT_ROOT + "." + revision.toString(), commitRootDepth);
                 createOrUpdateNode(store, op);
             }
-            // finally write commit, unless it was already written
-            // with added nodes.
-            if (changedNodes.size() != 0 || !root.isNew) {
-                root.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
-                root.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
-                createOrUpdateNode(store, root);
-                operations.put(commitRoot, root);
+            // finally write the commit root, unless it was already written
+            // with added nodes (the commit root might be written twice,
+            // first to check if there was a conflict, and only then to commit
+            // the revision, with the revision property set)
+            if (changedNodes.size() > 0 || !commitRoot.isNew) {
+                commitRoot.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
+                createOrUpdateNode(store, commitRoot);
+                operations.put(commitRootPath, commitRoot);
             }
         } catch (MicroKernelException e) {
             String msg = "Exception committing " + diff.toString();
@@ -193,6 +204,19 @@ public class Commit {
     
     private void createOrUpdateNode(DocumentStore store, UpdateOp op) {
         Map<String, Object> map = store.createOrUpdate(Collection.NODES, op);
+        if (baseRevision != null) {
+            // TODO detect conflicts here
+            Revision newestRev = mk.getNewestRevision(map, revision, true);
+            if (mk.isRevisionNewer(newestRev, baseRevision)) {
+                // TODO transaction rollback
+                throw new MicroKernelException("The node " + 
+                        op.path + " was changed in revision " + 
+                        newestRev + 
+                        ", which was applied after the base revision " + 
+                        baseRevision);
+            }
+        }
+
         int size = Utils.getMapSize(map);
         if (size > MAX_DOCUMENT_SIZE) {
             UpdateOp[] split = splitDocument(map);
@@ -212,8 +236,6 @@ public class Commit {
                 store.createOrUpdate(Collection.NODES, main);
             }
         }
-        // TODO detect conflicts here
-        op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
     }
     
     private UpdateOp[] splitDocument(Map<String, Object> map) {

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1460619&r1=1460618&r2=1460619&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java Mon Mar 25 11:18:17 2013
@@ -196,7 +196,7 @@ public class MongoMK implements MicroKer
         Node n = readNode("/", headRevision);
         if (n == null) {
             // root node is missing: repository is not initialized
-            Commit commit = new Commit(this, headRevision);
+            Commit commit = new Commit(this, null, headRevision);
             n = new Node("/", headRevision);
             commit.addNode(n);
             commit.applyToDocumentStore();
@@ -597,12 +597,18 @@ public class MongoMK implements MicroKer
     }
 
     @Override
-    public synchronized String commit(String rootPath, String json, String revisionId,
+    public synchronized String commit(String rootPath, String json, String baseRevId,
             String message) throws MicroKernelException {
-        revisionId = revisionId == null ? headRevision.toString() : revisionId;
+        Revision baseRev;
+        if (baseRevId == null) {
+            baseRev = headRevision;
+            baseRevId = baseRev.toString();
+        } else {
+            baseRev = Revision.fromString(stripBranchRevMarker(baseRevId));
+        }
         JsopReader t = new JsopTokenizer(json);
         Revision rev = newRevision();
-        Commit commit = new Commit(this, rev);
+        Commit commit = new Commit(this, baseRev, rev);
         while (true) {
             int r = t.read();
             if (r == JsopReader.END) {
@@ -644,7 +650,7 @@ public class MongoMK implements MicroKer
                     targetPath = PathUtils.concat(path, targetPath);
                 }
                 commit.moveNode(sourcePath, targetPath);
-                moveNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(revisionId)), commit);
+                moveNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(baseRevId)), commit);
                 break;
             }
             case '*': {
@@ -656,18 +662,18 @@ public class MongoMK implements MicroKer
                     targetPath = PathUtils.concat(path, targetPath);
                 }
                 commit.copyNode(sourcePath, targetPath);
-                copyNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(revisionId)), commit);
+                copyNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(baseRevId)), commit);
                 break;
             }
             default:
                 throw new MicroKernelException("token: " + (char) t.getTokenType());
             }
         }
-        if (revisionId.startsWith("b")) {
+        if (baseRevId.startsWith("b")) {
             // just commit to head currently
             commit.apply();
             // remember branch commit
-            branchCommits.put(rev.toString(), revisionId.substring(1));
+            branchCommits.put(rev.toString(), baseRevId.substring(1));
 
             headRevision = commit.getRevision();
             return "b" + rev.toString();
@@ -774,10 +780,9 @@ public class MongoMK implements MicroKer
             if (isRevisionNewer(propRev, maxRev)) {
                 continue;
             }
-            String v = valueMap.get(r);
             if (firstRev == null || isRevisionNewer(propRev, firstRev)) {
                 firstRev = propRev;
-                value = v;
+                value = valueMap.get(r);
             }
         }
         if ("true".equals(value)) {
@@ -786,6 +791,35 @@ public class MongoMK implements MicroKer
         return firstRev;
     }
     
+    /**
+     * Get the revision of the latest change made to this node.
+     * 
+     * @param nodeMap the document
+     * @param before the returned value is guaranteed to be older than this revision
+     * @param onlyCommitted whether only committed changes should be considered
+     * @return the revision, or null if deleted
+     */
+    Revision getNewestRevision(Map<String, Object> nodeMap, Revision before, boolean onlyCommitted) {
+        if (nodeMap == null) {
+            return null;
+        }
+        @SuppressWarnings("unchecked")
+        Map<String, String> valueMap = (Map<String, String>) nodeMap
+                .get(UpdateOp.DELETED);
+        Revision newestRev = null;
+        for (String r : valueMap.keySet()) {
+            Revision propRev = Revision.fromString(r);
+            if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
+                // TODO check if propRev is really committed, if
+                // onlyCommitted is set
+                if (isRevisionNewer(before, propRev)) {
+                    newestRev = propRev;
+                }
+            }
+        }
+        return newestRev;
+    }
+    
     private static String stripBranchRevMarker(String revisionId) {
         if (revisionId.startsWith("b")) {
             return revisionId.substring(1);

Added: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java?rev=1460619&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java Mon Mar 25 11:18:17 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk.prototype;
+
+import static org.junit.Assert.fail;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
+import org.junit.Test;
+
+import com.mongodb.DB;
+
+/**
+ * A set of simple cluster tests.
+ */
+public class ClusterTest {
+    
+    private static final boolean MONGO_DB = false;
+//    private static final boolean MONGO_DB = true;
+    
+    private MemoryDocumentStore ds;
+    private MemoryBlobStore bs;
+    
+    @Test
+    public void conflict() {
+        MongoMK mk1 = createMK(1);
+        MongoMK mk2 = createMK(2);
+        
+        String m1r0 = mk1.getHeadRevision();
+        String m2r0 = mk2.getHeadRevision();
+        
+//        String m1r1 = mk1.commit("/", "+\"testa\":{}", m1r0, null);
+//        String m2r1 = mk2.commit("/", "+\"testb\":{}", m2r0, null);
+
+        String m1r2 = mk1.commit("/", "+\"test\":{}", m1r0, null);
+        try {
+            String m2r2 = mk2.commit("/", "+\"test\":{}", m2r0, null);
+            fail();
+        } catch (MicroKernelException e) {
+            // expected
+        }
+        
+        mk1.dispose();
+        mk2.dispose();
+    }
+
+    private MongoMK createMK(int clusterId) {
+        if (MONGO_DB) {
+            DB db = MongoUtils.getConnection().getDB();
+            MongoUtils.dropCollections(db);
+            return new MongoMK(db, clusterId);
+        }
+        if (ds == null) {
+            ds = new MemoryDocumentStore();
+        }
+        if (bs == null) {
+            bs = new MemoryBlobStore();
+        }
+        return new MongoMK(ds, bs, clusterId);
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java?rev=1460619&r1=1460618&r2=1460619&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java Mon Mar 25 11:18:17 2013
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTru
 
 import org.apache.jackrabbit.mongomk.prototype.DocumentStore.Collection;
 import org.apache.jackrabbit.mongomk.prototype.Node.Children;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;