You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/05/02 16:27:37 UTC
svn commit: r1478384 - in
/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk:
Branch.java MongoMK.java UnmergedBranches.java
Author: mreutegg
Date: Thu May 2 14:27:36 2013
New Revision: 1478384
URL: http://svn.apache.org/r1478384
Log:
OAK-619 Lock-free MongoMK implementation
- Speed up large branches
Added:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java (with props)
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java (with props)
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
Added: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java?rev=1478384&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java Thu May 2 14:27:36 2013
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.annotation.Nonnull;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Contains commit information about a branch and its base revision.
+ */
+class Branch {
+
+ /**
+ * The commits to the branch
+ */
+ private final SortedSet<Revision> commits;
+
+ private final Revision base;
+
+ Branch(@Nonnull SortedSet<Revision> commits, @Nonnull Revision base) {
+ this.commits = checkNotNull(commits);
+ this.base = base;
+ }
+
+ synchronized Revision getHead() {
+ return commits.last();
+ }
+
+ Revision getBase() {
+ return base;
+ }
+
+ synchronized void addCommit(@Nonnull Revision r) {
+ checkArgument(commits.comparator().compare(r, commits.last()) > 0);
+ commits.add(r);
+ }
+
+ synchronized SortedSet<Revision> getCommits() {
+ return new TreeSet<Revision>(commits);
+ }
+
+ synchronized boolean containsCommit(@Nonnull Revision r) {
+ return commits.contains(r);
+ }
+
+ synchronized public void removeCommit(@Nonnull Revision rev) {
+ commits.remove(rev);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/Branch.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java?rev=1478384&r1=1478383&r2=1478384&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/MongoMK.java Thu May 2 14:27:36 2013
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -159,13 +161,12 @@ public class MongoMK implements MicroKer
private AtomicInteger simpleRevisionCounter;
/**
- * Maps branch commit revision to revision it is based on
+ * Unmerged branches of this MongoMK instance.
*/
- // TODO at some point, open (unmerged) branches
+ // TODO at some point, open (unmerged) branches
// need to be garbage collected (in-memory and on disk)
- private final Map<Revision, Revision> branchCommits
- = new ConcurrentHashMap<Revision, Revision>();
-
+ private final UnmergedBranches branches = new UnmergedBranches();
+
/**
* The comparator for revisions.
*/
@@ -215,21 +216,7 @@ public class MongoMK implements MicroKer
commit.applyToDocumentStore();
} else {
// initialize branchCommits
- Map<String, Object> nodeMap = store.find(
- DocumentStore.Collection.NODES, Utils.getIdFromPath("/"));
- @SuppressWarnings("unchecked")
- Map<String, String> valueMap = (Map<String, String>) nodeMap.get(UpdateOp.REVISIONS);
- if (valueMap != null) {
- for (Map.Entry<String, String> commit : valueMap.entrySet()) {
- if (!commit.getValue().equals("true")) {
- Revision r = Revision.fromString(commit.getKey());
- if (r.getClusterId() == clusterId) {
- Revision b = Revision.fromString(commit.getValue());
- branchCommits.put(r, b);
- }
- }
- }
- }
+ branches.init(store, clusterId);
}
}
@@ -400,28 +387,27 @@ public class MongoMK implements MicroKer
}
private boolean includeRevision(Revision x, Revision requestRevision) {
- if (branchCommits.containsKey(x)) {
+ Branch b = branches.getBranch(x);
+ if (b != null) {
// only include if requested revision is also a branch revision
// with a history including x
- Revision rev = requestRevision;
- for (;;) {
- if (rev != null) {
- if (rev.equals(x)) {
- return true;
- }
- } else {
- // not part of branch identified by requestedRevision
- return false;
- }
- rev = branchCommits.get(rev);
+ if (b.containsCommit(requestRevision)) {
+ // in same branch, include if the same revision or
+ // requestRevision is newer
+ return x.equals(requestRevision) || isRevisionNewer(requestRevision, x);
+ } else {
+ // not part of branch identified by requestedRevision
+ return false;
}
+
}
// assert: x is not a branch commit
- while (branchCommits.containsKey(requestRevision)) {
+ b = branches.getBranch(requestRevision);
+ if (b != null) {
// reset requestRevision to branch base revision to make
// sure we don't include revisions committed after branch
// was created
- requestRevision = branchCommits.get(requestRevision);
+ requestRevision = b.getBase();
}
if (x.getClusterId() == this.clusterId &&
requestRevision.getClusterId() == this.clusterId) {
@@ -452,7 +438,7 @@ public class MongoMK implements MicroKer
* perform this check.
* This method also takes pending branches into consideration.
* The <code>readRevision</code> identifies the read revision used by the
- * client, which may be a branch revision logged in {@link #branchCommits}.
+ * client, which may be a branch revision logged in {@link #branches}.
* The revision <code>rev</code> is valid if it is part of the branch
* history of <code>readRevision</code>.
*
@@ -540,7 +526,7 @@ public class MongoMK implements MicroKer
return false;
}
if (value.equals("true")) {
- if (!branchCommits.containsKey(readRevision)) {
+ if (branches.getBranch(readRevision) == null) {
return true;
}
} else {
@@ -634,6 +620,10 @@ public class MongoMK implements MicroKer
@SuppressWarnings("unchecked")
Map<String, String> valueMap = (Map<String, String>) v;
if (valueMap != null) {
+ if (valueMap instanceof TreeMap) {
+ // use descending keys (newest first) if map is sorted
+ valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+ }
String value = getLatestValue(valueMap, min, rev);
String propertyName = Utils.unescapePropertyName(key);
n.setProperty(propertyName, value);
@@ -652,21 +642,22 @@ public class MongoMK implements MicroKer
* @param max the maximum revision
* @return the value, or null if not found
*/
- private String getLatestValue(Map<String, String> valueMap, Revision min, Revision max) {
+ private String getLatestValue(@Nonnull Map<String, String> valueMap,
+ @Nullable Revision min,
+ @Nonnull Revision max) {
String value = null;
Revision latestRev = null;
for (String r : valueMap.keySet()) {
Revision propRev = Revision.fromString(r);
- if (min != null) {
- if (isRevisionNewer(min, propRev)) {
- continue;
- }
+ if (min != null && isRevisionNewer(min, propRev)) {
+ continue;
+ }
+ if (latestRev != null && !isRevisionNewer(propRev, latestRev)) {
+ continue;
}
if (includeRevision(propRev, max)) {
- if (latestRev == null || isRevisionNewer(propRev, latestRev)) {
- latestRev = propRev;
- value = valueMap.get(r);
- }
+ latestRev = propRev;
+ value = valueMap.get(r);
}
}
return value;
@@ -913,7 +904,12 @@ public class MongoMK implements MicroKer
}
if (baseRevId.startsWith("b")) {
// remember branch commit
- branchCommits.put(rev, baseRev);
+ Branch b = branches.getBranch(baseRev);
+ if (b == null) {
+ b = branches.create(baseRev, rev);
+ } else {
+ b.addCommit(rev);
+ }
boolean success = false;
try {
// prepare commit
@@ -921,17 +917,14 @@ public class MongoMK implements MicroKer
success = true;
} finally {
if (!success) {
- branchCommits.remove(rev);
+ b.removeCommit(rev);
+ if (b.getCommits().isEmpty()) {
+ branches.remove(b);
+ }
}
}
return "b" + rev.toString();
-
- // String jsonBranch = branchCommits.remove(revisionId);
- // jsonBranch += commit.getDiff().toString();
- // String branchRev = revisionId + "+";
- // branchCommits.put(branchRev, jsonBranch);
- // return branchRev;
}
commit.apply();
headRevision = commit.getRevision();
@@ -1043,6 +1036,10 @@ public class MongoMK implements MicroKer
if (valueMap == null) {
return null;
}
+ if (valueMap instanceof TreeMap) {
+ // use descending keys (newest first) if map is sorted
+ valueMap = ((TreeMap<String, String>) valueMap).descendingMap();
+ }
for (String r : valueMap.keySet()) {
Revision propRev = Revision.fromString(r);
if (isRevisionNewer(propRev, maxRev)
@@ -1077,7 +1074,7 @@ public class MongoMK implements MicroKer
if (nodeMap == null) {
return null;
}
- Set<String> revisions = Utils.newSet();
+ SortedSet<String> revisions = new TreeSet<String>(Collections.reverseOrder());
if (nodeMap.containsKey(UpdateOp.REVISIONS)) {
revisions.addAll(((Map<String, String>) nodeMap.get(UpdateOp.REVISIONS)).keySet());
}
@@ -1161,24 +1158,25 @@ public class MongoMK implements MicroKer
String revisionId = stripBranchRevMarker(branchRevisionId);
// make branch commits visible
- List<Revision> branchRevisions = new ArrayList<Revision>();
UpdateOp op = new UpdateOp("/", Utils.getIdFromPath("/"), false);
- Revision baseRevId = Revision.fromString(revisionId);
- while (baseRevId != null) {
- branchRevisions.add(baseRevId);
- op.setMapEntry(UpdateOp.REVISIONS, baseRevId.toString(), "true");
- op.containsMapEntry(UpdateOp.COLLISIONS, baseRevId.toString(), false);
- baseRevId = branchCommits.get(baseRevId);
- }
- if (store.findAndUpdate(DocumentStore.Collection.NODES, op) != null) {
- // remove from branchCommits map after successful update
- for (Revision r : branchRevisions) {
- branchCommits.remove(r);
+ Revision revision = Revision.fromString(revisionId);
+ Branch b = branches.getBranch(revision);
+ if (b != null) {
+ for (Revision rev : b.getCommits()) {
+ op.setMapEntry(UpdateOp.REVISIONS, rev.toString(), "true");
+ op.containsMapEntry(UpdateOp.COLLISIONS, rev.toString(), false);
+ }
+ if (store.findAndUpdate(DocumentStore.Collection.NODES, op) != null) {
+ // remove from branchCommits map after successful update
+ branches.remove(b);
+ } else {
+ throw new MicroKernelException("Conflicting concurrent change");
}
- headRevision = newRevision();
- return headRevision.toString();
+ } else {
+ // no commits in this branch -> do nothing
}
- throw new MicroKernelException("Conflicting concurrent change");
+ headRevision = newRevision();
+ return headRevision.toString();
}
@Override
@@ -1420,5 +1418,4 @@ public class MongoMK implements MicroKer
public boolean isCached(String path) {
return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
}
-
}
Added: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java?rev=1478384&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java Thu May 2 14:27:36 2013
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.mongomk.util.Utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <code>UnmergedBranches</code> contains all un-merged branches of a MongoMK
+ * instance.
+ */
+class UnmergedBranches {
+
+ /**
+ * Map of branches with the head of the branch as key.
+ */
+ private final List<Branch> branches = new ArrayList<Branch>();
+
+ /**
+ * Set to <code>true</code> once initialized.
+ */
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+ /**
+ * The revision comparator.
+ */
+ private final Revision.RevisionComparator comparator = new Revision.RevisionComparator();
+
+ /**
+ * Initialize with un-merged branches from <code>store</code> for this
+ * <code>clusterId</code>.
+ *
+ * @param store the document store.
+ * @param clusterId the cluster node id of the local MongoMK.
+ */
+ void init(DocumentStore store, int clusterId) {
+ if (!initialized.compareAndSet(false, true)) {
+ throw new IllegalStateException("already initialized");
+ }
+ Map<String, Object> nodeMap = store.find(
+ DocumentStore.Collection.NODES, Utils.getIdFromPath("/"));
+ @SuppressWarnings("unchecked")
+ Map<String, String> valueMap = (Map<String, String>) nodeMap.get(UpdateOp.REVISIONS);
+ if (valueMap != null) {
+ SortedMap<Revision, Revision> tmp =
+ new TreeMap<Revision, Revision>(comparator);
+ for (Map.Entry<String, String> commit : valueMap.entrySet()) {
+ if (!commit.getValue().equals("true")) {
+ Revision r = Revision.fromString(commit.getKey());
+ if (r.getClusterId() == clusterId) {
+ Revision b = Revision.fromString(commit.getValue());
+ tmp.put(r, b);
+ }
+ }
+ }
+ while (!tmp.isEmpty()) {
+ SortedSet<Revision> commits = new TreeSet<Revision>(comparator);
+ Revision head = tmp.lastKey();
+ commits.add(head);
+ Revision base = tmp.remove(head);
+ while (tmp.containsKey(base)) {
+ commits.add(base);
+ base = tmp.remove(base);
+ }
+ branches.add(new Branch(commits, base));
+ }
+ }
+ }
+
+ /**
+ * Create a branch with an initial commit revision.
+ *
+ * @param base the base revision of the branch.
+ * @param initial the initial commit to the branch.
+ * @return the branch.
+ */
+ @Nonnull
+ Branch create(@Nonnull Revision base, @Nonnull Revision initial) {
+ SortedSet<Revision> commits = new TreeSet<Revision>(comparator);
+ commits.add(checkNotNull(initial));
+ Branch b = new Branch(commits, checkNotNull(base));
+ synchronized (branches) {
+ branches.add(b);
+ }
+ return b;
+ }
+
+ /**
+ * Returns the branch, which contains the given revision or <code>null</code>
+ * if there is no such branch.
+ *
+ * @param r a revision.
+ * @return the branch containing the given revision or <code>null</code>.
+ */
+ @CheckForNull
+ Branch getBranch(@Nonnull Revision r) {
+ synchronized (branches) {
+ for (Branch b : branches) {
+ if (b.containsCommit(r)) {
+ return b;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Removes the given branch.
+ * @param b the branch to remove.
+ */
+ void remove(Branch b) {
+ synchronized (branches) {
+ for (int i = 0; i < branches.size(); i++) {
+ if (branches.get(i) == b) {
+ branches.remove(i);
+ return;
+ }
+ }
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/UnmergedBranches.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL