You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/03/25 12:18:18 UTC
svn commit: r1460619 - in /jackrabbit/oak/trunk/oak-mongomk/src:
main/java/org/apache/jackrabbit/mongomk/prototype/
test/java/org/apache/jackrabbit/mongomk/prototype/
Author: thomasm
Date: Mon Mar 25 11:18:17 2013
New Revision: 1460619
URL: http://svn.apache.org/r1460619
Log:
OAK-619 Lock-free MongoMK implementation (detect conflicts; WIP)
Added:
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java?rev=1460619&r1=1460618&r2=1460619&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java Mon Mar 25 11:18:17 2013
@@ -51,6 +51,7 @@ public class Commit {
private static final boolean PURGE_OLD_REVISIONS = true;
private final MongoMK mk;
+ private final Revision baseRevision;
private final Revision revision;
private HashMap<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
private JsopWriter diff = new JsopStream();
@@ -59,7 +60,8 @@ public class Commit {
private HashSet<String> addedNodes = new HashSet<String>();
private HashSet<String> removedNodes = new HashSet<String>();
- Commit(MongoMK mk, Revision revision) {
+ Commit(MongoMK mk, Revision baseRevision, Revision revision) {
+ this.baseRevision = baseRevision;
this.revision = revision;
this.mk = mk;
}
@@ -118,57 +120,65 @@ public class Commit {
*/
void applyToDocumentStore() {
DocumentStore store = mk.getDocumentStore();
- String commitRoot = null;
+ String commitRootPath = null;
ArrayList<UpdateOp> newNodes = new ArrayList<UpdateOp>();
ArrayList<UpdateOp> changedNodes = new ArrayList<UpdateOp>();
for (String p : operations.keySet()) {
markChanged(p);
- if (commitRoot == null) {
- commitRoot = p;
+ if (commitRootPath == null) {
+ commitRootPath = p;
} else {
- while (!PathUtils.isAncestor(commitRoot, p)) {
- commitRoot = PathUtils.getParentPath(commitRoot);
- if (PathUtils.denotesRoot(commitRoot)) {
+ while (!PathUtils.isAncestor(commitRootPath, p)) {
+ commitRootPath = PathUtils.getParentPath(commitRootPath);
+ if (PathUtils.denotesRoot(commitRootPath)) {
break;
}
}
}
}
- int commitRootDepth = PathUtils.getDepth(commitRoot);
+ int commitRootDepth = PathUtils.getDepth(commitRootPath);
// create a "root of the commit" if there is none
- UpdateOp root = getUpdateOperationForNode(commitRoot);
+ UpdateOp commitRoot = getUpdateOperationForNode(commitRootPath);
for (String p : operations.keySet()) {
UpdateOp op = operations.get(p);
- if (op == root) {
+ op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
+ if (op.isNew) {
+ op.addMapEntry(UpdateOp.DELETED + "." + revision.toString(), "false");
+ }
+ if (op == commitRoot) {
// apply at the end
- } else if (op.isNew()) {
- newNodes.add(op);
} else {
- changedNodes.add(op);
+ op.addMapEntry(UpdateOp.COMMIT_ROOT + "." + revision.toString(), commitRootDepth);
+ if (op.isNew()) {
+ newNodes.add(op);
+ } else {
+ changedNodes.add(op);
+ }
}
}
- if (changedNodes.size() == 0 && root.isNew) {
+ if (changedNodes.size() == 0 && commitRoot.isNew) {
// no updates and root of commit is also new. that is,
// it is the root of a subtree added in a commit.
- // so we just add the root like the others
- root.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
- newNodes.add(root);
+ // so we try to add the root like all other nodes
+ commitRoot.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
+ newNodes.add(commitRoot);
}
try {
if (newNodes.size() > 0) {
// set commit root on new nodes
- for (UpdateOp op : newNodes) {
- if (op != root) {
- op.addMapEntry(UpdateOp.COMMIT_ROOT + "." + revision.toString(), commitRootDepth);
- }
- }
if (!store.create(Collection.NODES, newNodes)) {
+ // some of the documents already exist:
+ // try to apply all changes one by one
for (UpdateOp op : newNodes) {
op.unset(UpdateOp.ID);
- op.addMapEntry(UpdateOp.DELETED + "." + revision.toString(), "false");
- op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
- createOrUpdateNode(store, op);
+ if (op == commitRoot) {
+ // don't write the commit root just yet
+ // (because there might be a conflict)
+ commitRoot.unset(UpdateOp.REVISIONS + "." + revision.toString());
+ }
+ changedNodes.add(op);
}
+ newNodes.clear();
}
}
for (UpdateOp op : changedNodes) {
@@ -176,13 +186,14 @@ public class Commit {
op.addMapEntry(UpdateOp.COMMIT_ROOT + "." + revision.toString(), commitRootDepth);
createOrUpdateNode(store, op);
}
- // finally write commit, unless it was already written
- // with added nodes.
- if (changedNodes.size() != 0 || !root.isNew) {
- root.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
- root.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
- createOrUpdateNode(store, root);
- operations.put(commitRoot, root);
+ // finally write the commit root, unless it was already written
+ // with added nodes (the commit root might be written twice,
+ // first to check if there was a conflict, and only then to commit
+ // the revision, with the revision property set)
+ if (changedNodes.size() > 0 || !commitRoot.isNew) {
+ commitRoot.addMapEntry(UpdateOp.REVISIONS + "." + revision.toString(), "true");
+ createOrUpdateNode(store, commitRoot);
+ operations.put(commitRootPath, commitRoot);
}
} catch (MicroKernelException e) {
String msg = "Exception committing " + diff.toString();
@@ -193,6 +204,19 @@ public class Commit {
private void createOrUpdateNode(DocumentStore store, UpdateOp op) {
Map<String, Object> map = store.createOrUpdate(Collection.NODES, op);
+ if (baseRevision != null) {
+ // TODO detect conflicts here
+ Revision newestRev = mk.getNewestRevision(map, revision, true);
+ if (mk.isRevisionNewer(newestRev, baseRevision)) {
+ // TODO transaction rollback
+ throw new MicroKernelException("The node " +
+ op.path + " was changed in revision " +
+ newestRev +
+ ", which was applied after the base revision " +
+ baseRevision);
+ }
+ }
+
int size = Utils.getMapSize(map);
if (size > MAX_DOCUMENT_SIZE) {
UpdateOp[] split = splitDocument(map);
@@ -212,8 +236,6 @@ public class Commit {
store.createOrUpdate(Collection.NODES, main);
}
}
- // TODO detect conflicts here
- op.setMapEntry(UpdateOp.LAST_REV + "." + revision.getClusterId(), revision.toString());
}
private UpdateOp[] splitDocument(Map<String, Object> map) {
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1460619&r1=1460618&r2=1460619&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java Mon Mar 25 11:18:17 2013
@@ -196,7 +196,7 @@ public class MongoMK implements MicroKer
Node n = readNode("/", headRevision);
if (n == null) {
// root node is missing: repository is not initialized
- Commit commit = new Commit(this, headRevision);
+ Commit commit = new Commit(this, null, headRevision);
n = new Node("/", headRevision);
commit.addNode(n);
commit.applyToDocumentStore();
@@ -597,12 +597,18 @@ public class MongoMK implements MicroKer
}
@Override
- public synchronized String commit(String rootPath, String json, String revisionId,
+ public synchronized String commit(String rootPath, String json, String baseRevId,
String message) throws MicroKernelException {
- revisionId = revisionId == null ? headRevision.toString() : revisionId;
+ Revision baseRev;
+ if (baseRevId == null) {
+ baseRev = headRevision;
+ baseRevId = baseRev.toString();
+ } else {
+ baseRev = Revision.fromString(stripBranchRevMarker(baseRevId));
+ }
JsopReader t = new JsopTokenizer(json);
Revision rev = newRevision();
- Commit commit = new Commit(this, rev);
+ Commit commit = new Commit(this, baseRev, rev);
while (true) {
int r = t.read();
if (r == JsopReader.END) {
@@ -644,7 +650,7 @@ public class MongoMK implements MicroKer
targetPath = PathUtils.concat(path, targetPath);
}
commit.moveNode(sourcePath, targetPath);
- moveNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(revisionId)), commit);
+ moveNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(baseRevId)), commit);
break;
}
case '*': {
@@ -656,18 +662,18 @@ public class MongoMK implements MicroKer
targetPath = PathUtils.concat(path, targetPath);
}
commit.copyNode(sourcePath, targetPath);
- copyNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(revisionId)), commit);
+ copyNode(sourcePath, targetPath, Revision.fromString(stripBranchRevMarker(baseRevId)), commit);
break;
}
default:
throw new MicroKernelException("token: " + (char) t.getTokenType());
}
}
- if (revisionId.startsWith("b")) {
+ if (baseRevId.startsWith("b")) {
// just commit to head currently
commit.apply();
// remember branch commit
- branchCommits.put(rev.toString(), revisionId.substring(1));
+ branchCommits.put(rev.toString(), baseRevId.substring(1));
headRevision = commit.getRevision();
return "b" + rev.toString();
@@ -774,10 +780,9 @@ public class MongoMK implements MicroKer
if (isRevisionNewer(propRev, maxRev)) {
continue;
}
- String v = valueMap.get(r);
if (firstRev == null || isRevisionNewer(propRev, firstRev)) {
firstRev = propRev;
- value = v;
+ value = valueMap.get(r);
}
}
if ("true".equals(value)) {
@@ -786,6 +791,35 @@ public class MongoMK implements MicroKer
return firstRev;
}
+ /**
+ * Get the revision of the latest change made to this node.
+ *
+ * @param nodeMap the document
+ * @param before the returned value is guaranteed to be older than this revision
+ * @param onlyCommitted whether only committed changes should be considered
+ * @return the revision, or null if deleted
+ */
+ Revision getNewestRevision(Map<String, Object> nodeMap, Revision before, boolean onlyCommitted) {
+ if (nodeMap == null) {
+ return null;
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, String> valueMap = (Map<String, String>) nodeMap
+ .get(UpdateOp.DELETED);
+ Revision newestRev = null;
+ for (String r : valueMap.keySet()) {
+ Revision propRev = Revision.fromString(r);
+ if (newestRev == null || isRevisionNewer(propRev, newestRev)) {
+ // TODO check if propRev is really committed, if
+ // onlyCommitted is set
+ if (isRevisionNewer(before, propRev)) {
+ newestRev = propRev;
+ }
+ }
+ }
+ return newestRev;
+ }
+
private static String stripBranchRevMarker(String revisionId) {
if (revisionId.startsWith("b")) {
return revisionId.substring(1);
Added: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java?rev=1460619&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java (added)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/ClusterTest.java Mon Mar 25 11:18:17 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.mongomk.prototype;
+
+import static org.junit.Assert.fail;
+
+import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.blobs.MemoryBlobStore;
+import org.junit.Test;
+
+import com.mongodb.DB;
+
+/**
+ * A set of simple cluster tests.
+ */
+public class ClusterTest {
+
+ private static final boolean MONGO_DB = false;
+// private static final boolean MONGO_DB = true;
+
+ private MemoryDocumentStore ds;
+ private MemoryBlobStore bs;
+
+ @Test
+ public void conflict() {
+ MongoMK mk1 = createMK(1);
+ MongoMK mk2 = createMK(2);
+
+ String m1r0 = mk1.getHeadRevision();
+ String m2r0 = mk2.getHeadRevision();
+
+// String m1r1 = mk1.commit("/", "+\"testa\":{}", m1r0, null);
+// String m2r1 = mk2.commit("/", "+\"testb\":{}", m2r0, null);
+
+ String m1r2 = mk1.commit("/", "+\"test\":{}", m1r0, null);
+ try {
+ String m2r2 = mk2.commit("/", "+\"test\":{}", m2r0, null);
+ fail();
+ } catch (MicroKernelException e) {
+ // expected
+ }
+
+ mk1.dispose();
+ mk2.dispose();
+ }
+
+ private MongoMK createMK(int clusterId) {
+ if (MONGO_DB) {
+ DB db = MongoUtils.getConnection().getDB();
+ MongoUtils.dropCollections(db);
+ return new MongoMK(db, clusterId);
+ }
+ if (ds == null) {
+ ds = new MemoryDocumentStore();
+ }
+ if (bs == null) {
+ bs = new MemoryBlobStore();
+ }
+ return new MongoMK(ds, bs, clusterId);
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java?rev=1460619&r1=1460618&r2=1460619&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/prototype/SimpleTest.java Mon Mar 25 11:18:17 2013
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTru
import org.apache.jackrabbit.mongomk.prototype.DocumentStore.Collection;
import org.apache.jackrabbit.mongomk.prototype.Node.Children;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Lists;