You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/05/02 16:27:37 UTC

svn commit: r1478384 - in /jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk: Branch.java MongoMK.java UnmergedBranches.java

Author: mreutegg
Date: Thu May  2 14:27:36 2013
New Revision: 1478384

URL: http://svn.apache.org/r1478384
Log:
OAK-619 Lock-free MongoMK implementation
- Speed up large branches

Added:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java   (with props)
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java

Added: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java?rev=1478384&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java Thu May  2 14:27:36 2013
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.annotation.Nonnull;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Contains commit information about a branch and its base revision.
+ */
+class Branch {
+
+    /**
+     * The commits to the branch
+     */
+    private final SortedSet<Revision> commits;
+
+    private final Revision base;
+
+    Branch(@Nonnull SortedSet<Revision> commits, @Nonnull Revision base) {
+        this.commits = checkNotNull(commits);
+        this.base = base;
+    }
+
+    synchronized Revision getHead() {
+        return commits.last();
+    }
+
+    Revision getBase() {
+        return base;
+    }
+
+    synchronized void addCommit(@Nonnull Revision r) {
+        checkArgument(commits.comparator().compare(r, commits.last()) > 0);
+        commits.add(r);
+    }
+
+    synchronized SortedSet<Revision> getCommits() {
+        return new TreeSet<Revision>(commits);
+    }
+
+    synchronized boolean containsCommit(@Nonnull Revision r) {
+        return commits.contains(r);
+    }
+
+    synchronized public void removeCommit(@Nonnull Revision rev) {
+        commits.remove(rev);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1478384&r1=1478383&r2=1478384&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Thu May  2 14:27:36 2013
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -159,13 +161,12 @@ public class MongoMK implements MicroKer
     private AtomicInteger simpleRevisionCounter;
 
     /**
-     * Maps branch commit revision to revision it is based on
+     * Unmerged branches of this MongoMK instance.
      */
-    // TODO at some point, open (unmerged) branches 
+    // TODO at some point, open (unmerged) branches
     // need to be garbage collected (in-memory and on disk)
-    private final Map<Revision, Revision> branchCommits
-            = new ConcurrentHashMap<Revision, Revision>();
-    
+    private final UnmergedBranches branches = new UnmergedBranches();
+
     /**
      * The comparator for revisions.
      */
@@ -215,21 +216,7 @@ public class MongoMK implements MicroKer
             commit.applyToDocumentStore();
         } else {
             // initialize branchCommits
-            Map<String, Object> nodeMap = store.find(
-                    DocumentStore.Collection.NODES, Utils.getIdFromPath("/"));
-            @SuppressWarnings("unchecked")
-            Map<String, String> valueMap = (Map<String, String>) nodeMap.get(UpdateOp.REVISIONS);
-            if (valueMap != null) {
-                for (Map.Entry<String, String> commit : valueMap.entrySet()) {
-                    if (!commit.getValue().equals("true")) {
-                        Revision r = Revision.fromString(commit.getKey());
-                        if (r.getClusterId() == clusterId) {
-                            Revision b = Revision.fromString(commit.getValue());
-                            branchCommits.put(r, b);
-                        }
-                    }
-                }
-            }
+            branches.init(store, clusterId);
         }
     }
     
@@ -400,28 +387,27 @@ public class MongoMK implements MicroKer
     }
     
     private boolean includeRevision(Revision x, Revision requestRevision) {
-        if (branchCommits.containsKey(x)) {
+        Branch b = branches.getBranch(x);
+        if (b != null) {
             // only include if requested revision is also a branch revision
             // with a history including x
-            Revision rev = requestRevision;
-            for (;;) {
-                if (rev != null) {
-                    if (rev.equals(x)) {
-                        return true;
-                    }
-                } else {
-                    // not part of branch identified by requestedRevision
-                    return false;
-                }
-                rev = branchCommits.get(rev);
+            if (b.containsCommit(requestRevision)) {
+                // in same branch, include if the same revision or
+                // requestRevision is newer
+                return x.equals(requestRevision) || isRevisionNewer(requestRevision, x);
+            } else {
+                // not part of branch identified by requestedRevision
+                return false;
             }
+
         }
         // assert: x is not a branch commit
-        while (branchCommits.containsKey(requestRevision)) {
+        b = branches.getBranch(requestRevision);
+        if (b != null) {
             // reset requestRevision to branch base revision to make
             // sure we don't include revisions committed after branch
             // was created
-            requestRevision = branchCommits.get(requestRevision);
+            requestRevision = b.getBase();
         }
         if (x.getClusterId() == this.clusterId && 
                 requestRevision.getClusterId() == this.clusterId) {
@@ -452,7 +438,7 @@ public class MongoMK implements MicroKer
      * perform this check.
      * This method also takes pending branches into consideration.
      * The <code>readRevision</code> identifies the read revision used by the
-     * client, which may be a branch revision logged in {@link #branchCommits}.
+     * client, which may be a branch revision logged in {@link #branches}.
      * The revision <code>rev</code> is valid if it is part of the branch
      * history of <code>readRevision</code>.
      *
@@ -540,7 +526,7 @@ public class MongoMK implements MicroKer
             return false;
         }
         if (value.equals("true")) {
-            if (!branchCommits.containsKey(readRevision)) {
+            if (branches.getBranch(readRevision) == null) {
                 return true;
             }
         } else {
@@ -634,6 +620,10 @@ public class MongoMK implements MicroKer
             @SuppressWarnings("unchecked")
             Map<String, String> valueMap = (Map<String, String>) v;
             if (valueMap != null) {
+                if (valueMap instanceof TreeMap) {
+                    // use descending keys (newest first) if map is sorted
+                    valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+                }
                 String value = getLatestValue(valueMap, min, rev);
                 String propertyName = Utils.unescapePropertyName(key);
                 n.setProperty(propertyName, value);
@@ -652,21 +642,22 @@ public class MongoMK implements MicroKer
      * @param max the maximum revision
      * @return the value, or null if not found
      */
-    private String getLatestValue(Map<String, String> valueMap, Revision min, Revision max) {
+    private String getLatestValue(@Nonnull Map<String, String> valueMap,
+                                  @Nullable Revision min,
+                                  @Nonnull Revision max) {
         String value = null;
         Revision latestRev = null;
         for (String r : valueMap.keySet()) {
             Revision propRev = Revision.fromString(r);
-            if (min != null) {
-                if (isRevisionNewer(min, propRev)) {
-                    continue;
-                }
+            if (min != null && isRevisionNewer(min, propRev)) {
+                continue;
+            }
+            if (latestRev != null && !isRevisionNewer(propRev, latestRev)) {
+                continue;
             }
             if (includeRevision(propRev, max)) {
-                if (latestRev == null || isRevisionNewer(propRev, latestRev)) {
-                    latestRev = propRev;
-                    value = valueMap.get(r);
-                }
+                latestRev = propRev;
+                value = valueMap.get(r);
             }
         }
         return value;
@@ -913,7 +904,12 @@ public class MongoMK implements MicroKer
         }
         if (baseRevId.startsWith("b")) {
             // remember branch commit
-            branchCommits.put(rev, baseRev);
+            Branch b = branches.getBranch(baseRev);
+            if (b == null) {
+                b = branches.create(baseRev, rev);
+            } else {
+                b.addCommit(rev);
+            }
             boolean success = false;
             try {
                 // prepare commit
@@ -921,17 +917,14 @@ public class MongoMK implements MicroKer
                 success = true;
             } finally {
                 if (!success) {
-                    branchCommits.remove(rev);
+                    b.removeCommit(rev);
+                    if (b.getCommits().isEmpty()) {
+                        branches.remove(b);
+                    }
                 }
             }
 
             return "b" + rev.toString();
-
-            // String jsonBranch = branchCommits.remove(revisionId);
-            // jsonBranch += commit.getDiff().toString();
-            // String branchRev = revisionId + "+";
-            // branchCommits.put(branchRev, jsonBranch);
-            // return branchRev;
         }
         commit.apply();
         headRevision = commit.getRevision();
@@ -1043,6 +1036,10 @@ public class MongoMK implements MicroKer
         if (valueMap == null) {
             return null;
         }
+        if (valueMap instanceof TreeMap) {
+            // use descending keys (newest first) if map is sorted
+            valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+        }
         for (String r : valueMap.keySet()) {
             Revision propRev = Revision.fromString(r);
             if (isRevisionNewer(propRev, maxRev)
@@ -1077,7 +1074,7 @@ public class MongoMK implements MicroKer
         if (nodeMap == null) {
             return null;
         }
-        Set<String> revisions = Utils.newSet();
+        SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
         if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
             revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
         }
@@ -1161,24 +1158,25 @@ public class MongoMK implements MicroKer
 
         String revisionId = stripBranchRevMarker(branchRevisionId);
         // make branch commits visible
-        List<Revision> branchRevisions = new ArrayList<Revision>();
         UpdateOp op = new UpdateOp("/", Utils.getIdFromPath("/"), false);
-        Revision baseRevId = Revision.fromString(revisionId);
-        while (baseRevId != null) {
-            branchRevisions.add(baseRevId);
-            op.setMapEntry(UpdateOp.REVISIONS, baseRevId.toString(), "true");
-            op.containsMapEntry(UpdateOp.COLLISIONS, baseRevId.toString(), false);
-            baseRevId = branchCommits.get(baseRevId);
-        }
-        if (store.findAndUpdate(DocumentStore.Collection.NODES, op) != null) {
-            // remove from branchCommits map after successful update
-            for (Revision r : branchRevisions) {
-                branchCommits.remove(r);
+        Revision revision = Revision.fromString(revisionId);
+        Branch b = branches.getBranch(revision);
+        if (b != null) {
+            for (Revision rev : b.getCommits()) {
+                op.setMapEntry(UpdateOp.REVISIONS, rev.toString(), "true");
+                op.containsMapEntry(UpdateOp.COLLISIONS, rev.toString(), false);
+            }
+            if (store.findAndUpdate(DocumentStore.Collection.NODES, op) != null) {
+                // remove from branchCommits map after successful update
+                branches.remove(b);
+            } else {
+                throw new MicroKernelException("Conflicting concurrent change");
             }
-            headRevision = newRevision();
-            return headRevision.toString();
+        } else {
+            // no commits in this branch -> do nothing
         }
-        throw new MicroKernelException("Conflicting concurrent change");
+        headRevision = newRevision();
+        return headRevision.toString();
     }
 
     @Override
@@ -1420,5 +1418,4 @@ public class MongoMK implements MicroKer
     public boolean isCached(String path) {
         return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
     }
-
 }

Added: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java?rev=1478384&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java Thu May  2 14:27:36 2013
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mongomk.util.Utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <code>UnmergedBranches</code> contains all un-merged branches of a MongoMK
+ * instance.
+ */
+class UnmergedBranches {
+
+    /**
+     * Map of branches with the head of the branch as key.
+     */
+    private final List<Branch> branches = new ArrayList<Branch>();
+
+    /**
+     * Set to <code>true</code> once initialized.
+     */
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+    /**
+     * The revision comparator.
+     */
+    private final Revision.RevisionComparator comparator = new Revision.RevisionComparator();
+
+    /**
+     * Initialize with un-merged branches from <code>store</code> for this
+     * <code>clusterId</code>.
+     *
+     * @param store the document store.
+     * @param clusterId the cluster node id of the local MongoMK.
+     */
+    void init(DocumentStore store, int clusterId) {
+        if (!initialized.compareAndSet(false, true)) {
+            throw new IllegalStateException("already initialized");
+        }
+        Map<String, Object> nodeMap = store.find(
+                DocumentStore.Collection.NODES, Utils.getIdFromPath("/"));
+        @SuppressWarnings("unchecked")
+        Map<String, String> valueMap = (Map<String, String>) nodeMap.get(UpdateOp.REVISIONS);
+        if (valueMap != null) {
+            SortedMap<Revision, Revision> tmp =
+                    new TreeMap<Revision, Revision>(comparator);
+            for (Map.Entry<String, String> commit : valueMap.entrySet()) {
+                if (!commit.getValue().equals("true")) {
+                    Revision r = Revision.fromString(commit.getKey());
+                    if (r.getClusterId() == clusterId) {
+                        Revision b = Revision.fromString(commit.getValue());
+                        tmp.put(r, b);
+                    }
+                }
+            }
+            while (!tmp.isEmpty()) {
+                SortedSet<Revision> commits = new TreeSet<Revision>(comparator);
+                Revision head = tmp.lastKey();
+                commits.add(head);
+                Revision base = tmp.remove(head);
+                while (tmp.containsKey(base)) {
+                    commits.add(base);
+                    base = tmp.remove(base);
+                }
+                branches.add(new Branch(commits, base));
+            }
+        }
+    }
+
+    /**
+     * Create a branch with an initial commit revision.
+     *
+     * @param base the base revision of the branch.
+     * @param initial the initial commit to the branch.
+     * @return the branch.
+     */
+    @Nonnull
+    Branch create(@Nonnull Revision base, @Nonnull Revision initial) {
+        SortedSet<Revision> commits = new TreeSet<Revision>(comparator);
+        commits.add(checkNotNull(initial));
+        Branch b = new Branch(commits, checkNotNull(base));
+        synchronized (branches) {
+            branches.add(b);
+        }
+        return b;
+    }
+
+    /**
+     * Returns the branch, which contains the given revision or <code>null</code>
+     * if there is no such branch.
+     *
+     * @param r a revision.
+     * @return the branch containing the given revision or <code>null</code>.
+     */
+    @CheckForNull
+    Branch getBranch(@Nonnull Revision r) {
+        synchronized (branches) {
+            for (Branch b : branches) {
+                if (b.containsCommit(r)) {
+                    return b;
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Removes the given branch.
+     * @param b the branch to remove.
+     */
+    void remove(Branch b) {
+        synchronized (branches) {
+            for (int i = 0; i < branches.size(); i++) {
+                if (branches.get(i) == b) {
+                    branches.remove(i);
+                    return;
+                }
+            }
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL