You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/04/24 17:00:10 UTC

svn commit: r1471450 - in /jackrabbit/oak/trunk/oak-mongomk/src: main/java/org/apache/jackrabbit/mongomk/ main/java/org/apache/jackrabbit/mongomk/util/ test/java/org/apache/jackrabbit/mongomk/

Author: mreutegg
Date: Wed Apr 24 15:00:09 2013
New Revision: 1471450

URL: http://svn.apache.org/r1471450
Log:
OAK-619 Lock-free MongoMK implementation
- Conflict detection for branch and regular commits (work in progress)
- Introduced conditional updates with DocumentStore.findAndUpdate()

Added:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java   (with props)
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UpdateOp.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Utils.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoMKBranchMergeTest.java
    jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoUtils.java

Added: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java?rev=1471450&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java Wed Apr 24 15:00:09 2013
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.commons.PathUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A <code>Collision</code> happens when a commit modifies a node, which was
+ * also modified in a branch commit, but the branch commit is not yet merged.
+ */
+class Collision {
+
+    private final Map<String, Object> document;
+    private final String revision;
+
+    Collision(@Nonnull Map<String, Object> document,
+              @Nonnull Revision revision) {
+        this.document = checkNotNull(document);
+        this.revision = checkNotNull(revision).toString();
+    }
+
+    boolean mark(DocumentStore store) {
+        @SuppressWarnings("unchecked")
+        Map<String, Integer> commitRoots = (Map<String, Integer>) document.get(UpdateOp.COMMIT_ROOT);
+        if (commitRoots != null) {
+            Integer depth = commitRoots.get(revision);
+            if (depth != null) {
+                String p = Utils.getPathFromId((String) document.get(UpdateOp.ID));
+                String commitRootPath = PathUtils.getAncestorPath(p, PathUtils.getDepth(p) - depth);
+                UpdateOp op = new UpdateOp(commitRootPath,
+                        Utils.getIdFromPath(commitRootPath), false);
+                op.setMapEntry(UpdateOp.COLLISIONS, revision, true);
+                store.createOrUpdate(DocumentStore.Collection.NODES, op);
+            }
+        }
+        // TODO: detect concurrent commit of previously un-merged changes
+        return true;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Collision.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java?rev=1471450&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java Wed Apr 24 15:00:09 2013
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+/**
+ * <code>CollisionHandler</code>...
+ */
+abstract class CollisionHandler {
+
+    static final CollisionHandler DEFAULT = new CollisionHandler() {
+        @Override
+        void uncommittedModification(Revision uncommitted) {
+            // do nothing
+        }
+    };
+
+    /**
+     * Callback for an uncommitted modification in {@link Revision}
+     * <code>uncommitted</code>.
+     *
+     * @param uncommitted the uncommitted revision of the change.
+     */
+    abstract void uncommittedModification(Revision uncommitted);
+}

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/CollisionHandler.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Commit.java Wed Apr 24 15:00:09 2013
@@ -19,8 +19,10 @@ package org.apache.jackrabbit.mongomk;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mk.json.JsopStream;
@@ -251,7 +253,7 @@ public class Commit {
 
     /**
      * Try to create or update the node. If there was a conflict, this method
-     * throws an exception, even thought the change is still applied.
+     * throws an exception, even though the change is still applied.
      * 
      * @param store the store
      * @param op the operation
@@ -259,7 +261,17 @@ public class Commit {
     private void createOrUpdateNode(DocumentStore store, UpdateOp op) {
         Map<String, Object> map = store.createOrUpdate(Collection.NODES, op);
         if (baseRevision != null) {
-            Revision newestRev = mk.getNewestRevision(map, revision, true);
+            final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
+            Revision newestRev = mk.getNewestRevision(map, revision, true,
+                    new CollisionHandler() {
+                @Override
+                void uncommittedModification(Revision uncommitted) {
+                    if (collisions.get() == null) {
+                        collisions.set(new ArrayList<Revision>());
+                    }
+                    collisions.get().add(uncommitted);
+                }
+            });
             if (newestRev == null) {
                 if (op.isDelete || !op.isNew) {
                     throw new MicroKernelException("The node " + 
@@ -280,6 +292,19 @@ public class Commit {
                             baseRevision + "; before " + revision + "; document " + map);
                 }
             }
+            // if we get here the modification was successful
+            // -> check for collisions and conflict (concurrent updates
+            // on a node are possible if property updates do not overlap)
+            if (collisions.get() != null && isConflicting(map, op)) {
+                for (Revision r : collisions.get()) {
+                    // mark collisions on commit root
+                    Collision c = new Collision(map, r);
+                    boolean success = c.mark(store);
+                    if (!success) {
+                        // TODO: fail this commit
+                    }
+                }
+            }
         }
 
         int size = Utils.estimateMemoryUsage(map);

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/DocumentStore.java Wed Apr 24 15:00:09 2013
@@ -137,6 +137,20 @@ public interface DocumentStore {
             throws MicroKernelException;
 
     /**
+     * Performs a conditional update (e.g. using
+     * {@link UpdateOp.Operation.Type#CONTAINS_MAP_ENTRY} and only updates the
+     * document if the condition is <code>true</code>.
+     *
+     * @param collection the collection
+     * @param update the update operation with the condition
+     * @return the old document or <code>null</code> if the condition is not met.
+     * @throws MicroKernelException if the operation failed.
+     */
+    @CheckForNull
+    Map<String, Object> findAndUpdate(Collection collection, UpdateOp update)
+            throws MicroKernelException;
+
+    /**
      * Invalidate the document cache.
      */
     void invalidateCache();

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MemoryDocumentStore.java Wed Apr 24 15:00:09 2013
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
@@ -102,8 +103,10 @@ public class MemoryDocumentStore impleme
         }
     }
 
-    @Nonnull
-    public Map<String, Object> createOrUpdate(Collection collection, UpdateOp update) {
+    @CheckForNull
+    private Map<String, Object> internalCreateOrUpdate(Collection collection,
+                                                       UpdateOp update,
+                                                       boolean checkConditions) {
         ConcurrentSkipListMap<String, Map<String, Object>> map = getMap(collection);
         Map<String, Object> n;
         Map<String, Object> oldNode;
@@ -124,6 +127,9 @@ public class MemoryDocumentStore impleme
             }
         }
         synchronized (n) {
+            if (checkConditions && !checkConditions(n, update)) {
+                return null;
+            }
             if (oldNode != null) {
                 // clone the old node
                 // (document level operations are synchronized)
@@ -138,7 +144,56 @@ public class MemoryDocumentStore impleme
         }
         return oldNode;
     }
-    
+
+    @Nonnull
+    @Override
+    public Map<String, Object> createOrUpdate(Collection collection,
+                                              UpdateOp update)
+            throws MicroKernelException {
+        return internalCreateOrUpdate(collection, update, false);
+    }
+
+    @Override
+    public Map<String, Object> findAndUpdate(Collection collection,
+                                             UpdateOp update)
+            throws MicroKernelException {
+        return internalCreateOrUpdate(collection, update, true);
+    }
+
+    private static boolean checkConditions(Map<String, Object> target,
+                                           UpdateOp update) {
+        for (Map.Entry<String, Operation> change : update.changes.entrySet()) {
+            Operation op = change.getValue();
+            if (op.type == Operation.Type.CONTAINS_MAP_ENTRY) {
+                String k = change.getKey();
+                String[] kv = k.split("\\.");
+                Object value = target.get(kv[0]);
+                if (value == null) {
+                    if (Boolean.TRUE.equals(op.value)) {
+                        return false;
+                    }
+                } else {
+                    if (value instanceof java.util.Collection) {
+                        java.util.Collection col = (java.util.Collection) value;
+                        if (Boolean.TRUE.equals(op.value)) {
+                            if (!col.contains(kv[1])) {
+                                return false;
+                            }
+                        } else {
+                            if (col.contains(kv[1])) {
+                                return false;
+                            }
+                        }
+                    } else {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+
     /**
      * Apply the changes to the in-memory map.
      * 
@@ -194,7 +249,7 @@ public class MemoryDocumentStore impleme
                 }
                 m.put(kv[1], op.value);
                 break;
-                
+
             }
             }
         }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoDocumentStore.java Wed Apr 24 15:00:09 2013
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
@@ -148,7 +149,7 @@ public class MongoDocumentStore implemen
         DBCollection dbCollection = getDBCollection(collection);
         long start = start();
         try {
-            DBObject doc = dbCollection.findOne(getByKeyQuery(key));
+            DBObject doc = dbCollection.findOne(getByKeyQuery(key).get());
             if (doc == null) {
                 return null;
             }
@@ -196,7 +197,7 @@ public class MongoDocumentStore implemen
             if (collection == Collection.NODES) {
                 nodesCache.invalidate(key);
             }
-            WriteResult writeResult = dbCollection.remove(getByKeyQuery(key), WriteConcern.SAFE);
+            WriteResult writeResult = dbCollection.remove(getByKeyQuery(key).get(), WriteConcern.SAFE);
             if (writeResult.getError() != null) {
                 throw new MicroKernelException("Remove failed: " + writeResult.getError());
             }
@@ -205,11 +206,12 @@ public class MongoDocumentStore implemen
         }
     }
 
-    @Nonnull
-    @Override
-    public Map<String, Object> createOrUpdate(Collection collection, UpdateOp updateOp) {
-        log("createOrUpdate", updateOp);        
+    @CheckForNull
+    private Map<String, Object> internalCreateOrUpdate(Collection collection,
+                                                       UpdateOp updateOp,
+                                                       boolean checkConditions) {
         DBCollection dbCollection = getDBCollection(collection);
+        QueryBuilder query = getByKeyQuery(updateOp.key);
 
         BasicDBObject setUpdates = new BasicDBObject();
         BasicDBObject incUpdates = new BasicDBObject();
@@ -246,10 +248,15 @@ public class MongoDocumentStore implemen
                     setUpdates.append(kv[0], sub);
                     break;
                 }
+                case CONTAINS_MAP_ENTRY: {
+                    if (checkConditions) {
+                        query.and(k).exists(op.value);
+                    }
+                    break;
+                }
             }
         }
 
-        DBObject query = getByKeyQuery(updateOp.key);
         BasicDBObject update = new BasicDBObject();
         if (!setUpdates.isEmpty()) {
             update.append("$set", setUpdates);
@@ -267,9 +274,12 @@ public class MongoDocumentStore implemen
 
         long start = start();
         try {
-            DBObject oldNode = dbCollection.findAndModify(query, null /*fields*/,
+            DBObject oldNode = dbCollection.findAndModify(query.get(), null /*fields*/,
                     null /*sort*/, false /*remove*/, update, false /*returnNew*/,
                     true /*upsert*/);
+            if (checkConditions && oldNode == null) {
+                return null;
+            }
             Map<String, Object> map = convertFromDBObject(oldNode);
             
             // cache the new document
@@ -281,7 +291,6 @@ public class MongoDocumentStore implemen
                 nodesCache.put(key, new CachedDocument(newMap));
             }
             
-            log("createOrUpdate returns ", map);
             return map;
         } catch (Exception e) {
             throw new MicroKernelException(e);
@@ -290,6 +299,27 @@ public class MongoDocumentStore implemen
         }
     }
 
+    @Nonnull
+    @Override
+    public Map<String, Object> createOrUpdate(Collection collection,
+                                              UpdateOp update)
+            throws MicroKernelException {
+        log("createOrUpdate", update);
+        Map<String, Object> map = internalCreateOrUpdate(collection, update, false);
+        log("createOrUpdate returns ", map);
+        return map;
+    }
+
+    @Override
+    public Map<String, Object> findAndUpdate(Collection collection,
+                                             UpdateOp update)
+            throws MicroKernelException {
+        log("findAndUpdate", update);
+        Map<String, Object> map = internalCreateOrUpdate(collection, update, true);
+        log("findAndUpdate returns ", map);
+        return map;
+    }
+
     @Override
     public boolean create(Collection collection, List<UpdateOp> updateOps) {
         log("create", updateOps);       
@@ -306,10 +336,7 @@ public class MongoDocumentStore implemen
                 String k = entry.getKey();
                 Operation op = entry.getValue();
                 switch (op.type) {
-                    case SET: {
-                        inserts[i].put(k, op.value);
-                        break;
-                    }
+                    case SET:
                     case INCREMENT: {
                         inserts[i].put(k, op.value);
                         break;
@@ -379,8 +406,8 @@ public class MongoDocumentStore implemen
         }
     }
 
-    private static DBObject getByKeyQuery(String key) {
-        return QueryBuilder.start(UpdateOp.ID).is(key).get();
+    private static QueryBuilder getByKeyQuery(String key) {
+        return QueryBuilder.start(UpdateOp.ID).is(key);
     }
     
     @Override

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Wed Apr 24 15:00:09 2013
@@ -51,7 +51,6 @@ import org.apache.jackrabbit.oak.commons
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
 import com.mongodb.DB;
 
 /**
@@ -1049,31 +1048,38 @@ public class MongoMK implements MicroKer
      * @param nodeMap the document
      * @param before the returned value is guaranteed to be older than this revision
      * @param onlyCommitted whether only committed changes should be considered
+     * @param handler the conflict handler, which is called for un-committed revisions
+     *                preceding <code>before</code>.
      * @return the revision, or null if deleted
      */
     @SuppressWarnings("unchecked")
     @Nullable Revision getNewestRevision(Map<String, Object> nodeMap,
-                                         Revision before, boolean onlyCommitted) {
+                                         Revision before, boolean onlyCommitted,
+                                         CollisionHandler handler) {
         if (nodeMap == null) {
             return null;
         }
-        Map<String, String> revisions = Maps.newHashMap();
+        Set<String> revisions = Utils.newSet();
         if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
-            revisions.putAll((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS));
+            revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
+        }
+        if (nodeMap.containsKey(UpdateOp.COMMIT_ROOT)) {
+            revisions.addAll(((Map<String, Integer>) nodeMap.get(UpdateOp.COMMIT_ROOT)).keySet());
         }
         Map<String, String> deletedMap = (Map<String, String>) nodeMap
                 .get(UpdateOp.DELETED);
         if (deletedMap != null) {
-            revisions.putAll(deletedMap);
+            revisions.addAll(deletedMap.keySet());
         }
         Revision newestRev = null;
-        for (String r : revisions.keySet()) {
+        for (String r : revisions) {
             Revision propRev = Revision.fromString(r);
             if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
                 if (isRevisionNewer(before, propRev)) {
-                    if (!onlyCommitted
-                            || isValidRevision(propRev, before,
-                                nodeMap, new HashSet<Revision>())) {
+                    if (onlyCommitted && !isValidRevision(
+                            propRev, before, nodeMap, new HashSet<Revision>())) {
+                        handler.uncommittedModification(propRev);
+                    } else {
                         newestRev = propRev;
                     }
                 }
@@ -1143,15 +1149,19 @@ public class MongoMK implements MicroKer
         while (baseRevId != null) {
             branchRevisions.add(baseRevId);
             op.setMapEntry(UpdateOp.REVISIONS, baseRevId.toString(), "true");
+            op.containsMapEntry(UpdateOp.COLLISIONS, baseRevId.toString(), false);
             baseRevId = branchCommits.get(baseRevId);
         }
-        store.createOrUpdate(DocumentStore.Collection.NODES, op);
-        // remove from branchCommits map after successful update
-        for (Revision r : branchRevisions) {
-            branchCommits.remove(r);
+        if (store.findAndUpdate(DocumentStore.Collection.NODES, op) != null) {
+            // remove from branchCommits map after successful update
+            for (Revision r : branchRevisions) {
+                branchCommits.remove(r);
+            }
+            headRevision = newRevision();
+            return headRevision.toString();
+        } else {
+            throw new MicroKernelException("Conflicting concurrent change");
         }
-        headRevision = newRevision();
-        return headRevision.toString();
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UpdateOp.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UpdateOp.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UpdateOp.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UpdateOp.java Wed Apr 24 15:00:09 2013
@@ -38,7 +38,8 @@ public class UpdateOp {
     
     /**
      * The list of recent revisions for this node, where this node is the
-     * root of the commit. Key: revision, value: true.
+     * root of the commit. Key: revision, value: true or the base revision of an
+     * un-merged branch commit.
      */
     static final String REVISIONS = "_revisions";
 
@@ -60,7 +61,14 @@ public class UpdateOp {
      * Whether this node is deleted. Key: revision, value: true/false.
      */
     static final String DELETED = "_deleted";
-    
+
+    /**
+     * Revision collision markers set by commits with modifications, which
+     * overlap with un-merged branch commits.
+     * Key: revision, value:
+     */
+    static final String COLLISIONS = "_collisions";
+
     /**
      * The modified time (5 second resolution).
      */
@@ -147,7 +155,7 @@ public class UpdateOp {
         op.value = value;
         changes.put(property + "." + subName, op);
     }
-    
+
     /**
      * Set the property to the given value.
      * 
@@ -182,6 +190,23 @@ public class UpdateOp {
     }
 
     /**
+     * Checks if the named key exists or is absent in the MongoDB document. This
+     * method can be used to make a conditional update.
+     *
+     * @param property the property name
+     * @param subName the entry name
+     */
+    void containsMapEntry(String property, String subName, boolean exists) {
+        if (isNew) {
+            throw new IllegalStateException("Cannot use containsMapEntry() on new document");
+        }
+        Operation op = new Operation();
+        op.type = Operation.Type.CONTAINS_MAP_ENTRY;
+        op.value = exists;
+        changes.put(property + "." + subName, op);
+    }
+
+    /**
      * Increment the value.
      * 
      * @param property the key
@@ -246,20 +271,25 @@ public class UpdateOp {
              * Add the sub-key / value pair.
              * The value in the stored node is a map.
              */ 
-             SET_MAP_ENTRY, 
+            SET_MAP_ENTRY,
              
-             /**
-              * Remove the sub-key / value pair.
-              * The value in the stored node is a map.
-              */ 
-             REMOVE_MAP_ENTRY,
-             
-             /**
-              * Set the sub-key / value pair.
-              * The value in the stored node is a map.
-              */
-             SET_MAP,
+            /**
+             * Remove the sub-key / value pair.
+             * The value in the stored node is a map.
+             */
+            REMOVE_MAP_ENTRY,
+
+            /**
+             * Checks if the sub-key is present in a map or not.
+             */
+            CONTAINS_MAP_ENTRY,
              
+            /**
+             * Set the sub-key / value pair.
+             * The value in the stored node is a map.
+             */
+            SET_MAP,
+
          }
              
         

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Utils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Utils.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Utils.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Utils.java Wed Apr 24 15:00:09 2013
@@ -16,8 +16,10 @@
  */
 package org.apache.jackrabbit.mongomk;
 
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.bson.types.ObjectId;
@@ -44,6 +46,10 @@ public class Utils {
         return new TreeMap<K, V>();
     }
 
+    static <E> Set<E> newSet() {
+        return new HashSet<E>();
+    }
+
     @SuppressWarnings("unchecked")
     public static int estimateMemoryUsage(Map<String, Object> map) {
         if (map == null) {
@@ -175,5 +181,5 @@ public class Utils {
             target.put(e.getKey(), value);
         }
     }
-    
+
 }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/util/LoggingDocumentStoreWrapper.java Wed Apr 24 15:00:09 2013
@@ -119,6 +119,19 @@ public class LoggingDocumentStoreWrapper
     }
 
     @Override
+    public Map<String, Object> findAndUpdate(Collection collection,
+                                             UpdateOp update)
+            throws MicroKernelException {
+        try {
+            logMethod("findAndUpdate", collection, update);
+            return logResult(store.findAndUpdate(collection, update));
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
+        }
+    }
+
+    @Override
     public void invalidateCache() {
         try {
             logMethod("invalidateCache");

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoMKBranchMergeTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoMKBranchMergeTest.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoMKBranchMergeTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoMKBranchMergeTest.java Wed Apr 24 15:00:09 2013
@@ -279,7 +279,6 @@ public class MongoMKBranchMergeTest exte
     }
 
     @Test
-    @Ignore
     public void oneBranchChangedPropertiesWithConflict() {
         addNodes(null, "/trunk");
         setProp(null, "/trunk/prop1", "value1");
@@ -300,6 +299,40 @@ public class MongoMKBranchMergeTest exte
     }
 
     @Test
+    public void twoBranchChangedPropertiesWithConflict() {
+        addNodes(null, "/trunk");
+        setProp(null, "/trunk/prop1", "value1");
+        setProp(null, "/trunk/prop2", "value1");
+        assertPropExists(null, "/trunk", "prop1");
+        assertPropExists(null, "/trunk", "prop2");
+
+        String branchRev1 = mk.branch(null);
+        branchRev1 = setProp(branchRev1, "/trunk/prop1", "value1-b1");
+        assertPropValue(branchRev1, "/trunk", "prop1", "value1-b1");
+
+        String branchRev2 = mk.branch(null);
+        branchRev2 = setProp(branchRev2, "/trunk/prop2", "value1-b2");
+        assertPropValue(branchRev2, "/trunk", "prop2", "value1-b2");
+
+        // creates a conflict for both branches
+        mk.commit("/", "^\"trunk/prop1\":\"value1-modified\"" +
+                "^\"trunk/prop2\":\"value1-modified\"", null, null);
+        try {
+            mk.merge(branchRev1, "");
+            fail("Expected: Concurrent modification exception");
+        } catch (Exception expected) {
+            // expected
+        }
+
+        try {
+            mk.merge(branchRev2, "");
+            fail("Expected: Concurrent modification exception");
+        } catch (Exception expected) {
+            // expected
+        }
+    }
+
+    @Test
     public void addExistingRootInBranch() {
         addNodes(null, "/root");
         assertNodesExist(null, "/root");

Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoUtils.java?rev=1471450&r1=1471449&r2=1471450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoUtils.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/MongoUtils.java Wed Apr 24 15:00:09 2013
@@ -50,14 +50,13 @@ public class MongoUtils {
             return null;
         }
         MongoConnection mongoConnection = null;
-        if (mongoConnection == null) {
-            try {
-                mongoConnection = new MongoConnection(HOST, PORT, DB);
-                mongoConnection.getDB().command(new BasicDBObject("ping", 1));
-                // dropCollections(mongoConnection.getDB());
-            } catch (Exception e) {
-                exception = e;
-            }
+        try {
+            mongoConnection = new MongoConnection(HOST, PORT, DB);
+            mongoConnection.getDB().command(new BasicDBObject("ping", 1));
+            // dropCollections(mongoConnection.getDB());
+        } catch (Exception e) {
+            exception = e;
+            mongoConnection = null;
         }
         return mongoConnection;
     }