You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/10/22 15:12:01 UTC
svn commit: r1534626 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/mongomk/
test/java/org/apache/jackrabbit/oak/plugins/mongomk/
Author: mreutegg
Date: Tue Oct 22 13:12:01 2013
New Revision: 1534626
URL: http://svn.apache.org/r1534626
Log:
OAK-1080: MongoMK: improved concurrency
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * <code>CommitQueue</code> ensures a sequence of commits consistent with the
+ * commit revision even if commits did not complete in this sequence.
+ */
+class CommitQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class);
+
+ private final MongoNodeStore store;
+
+ private final SortedMap<Revision, Entry> commits = new TreeMap<Revision, Entry>(new StableRevisionComparator());
+
+ private final ChangeDispatcher dispatcher;
+
+ CommitQueue(MongoNodeStore store, ChangeDispatcher dispatcher) {
+ this.store = store;
+ this.dispatcher = dispatcher;
+ }
+
+ Commit createCommit(Revision base) {
+ synchronized (this) {
+ Commit c = new Commit(store, base, store.newRevision());
+ commits.put(c.getRevision(), new Entry(c));
+ LOG.debug("created commit {}", c.getRevision());
+ return c;
+ }
+ }
+
+ void done(@Nonnull Commit c, boolean isBranch, @Nullable CommitInfo info) {
+ checkNotNull(c);
+ if (isBranch) {
+ removeCommit(c);
+ } else {
+ checkArgument(info != null, "Trunk commit must have a CommitInfo");
+ afterTrunkCommit(c, info);
+ }
+ }
+
+ void canceled(@Nonnull Commit c) {
+ removeCommit(c);
+ }
+
+ //------------------------< internal >--------------------------------------
+
+ private void removeCommit(@Nonnull Commit c) {
+ // simply remove and notify next head if any
+ synchronized (this) {
+ boolean wasHead = commits.firstKey().equals(c.getRevision());
+ commits.remove(c.getRevision());
+ LOG.debug("removed commit {}, wasHead={}", c.getRevision(), wasHead);
+ if (wasHead) {
+ notifyHead();
+ }
+ }
+ }
+
+ private void afterTrunkCommit(Commit c, CommitInfo info) {
+ assert(!commits.isEmpty());
+
+ boolean isHead;
+ Entry commitEntry;
+ synchronized (this) {
+ isHead = commits.firstKey().equals(c.getRevision());
+ commitEntry = commits.get(c.getRevision());
+ }
+ if (!isHead) {
+ LOG.debug("not head: {}, waiting...", c.getRevision());
+ commitEntry.await();
+ }
+ synchronized (this) {
+ commits.remove(c.getRevision());
+ try {
+ LOG.debug("removed {}, head is now {}", c.getRevision(), commits.isEmpty() ? null : commits.firstKey());
+ // remember before revision
+ Revision before = store.getHeadRevision();
+ // update head revision
+ store.setHeadRevision(c.getRevision());
+ NodeState root = store.getRoot();
+ // TODO: correct?
+ dispatcher.beforeCommit(store.getRoot(before));
+ try {
+ dispatcher.localCommit(root, info);
+ } finally {
+ dispatcher.afterCommit(root);
+ }
+ } finally {
+ // notify next if there is any
+ notifyHead();
+ }
+ }
+ }
+
+ private void notifyHead() {
+ if (!commits.isEmpty()) {
+ LOG.debug("count down for {}", commits.firstKey());
+ commits.get(commits.firstKey()).latch.countDown();
+ }
+ }
+
+ private static final class Entry {
+
+ final Commit commit;
+ final CountDownLatch latch;
+
+ Entry(Commit commit) {
+ this.commit = commit;
+ this.latch = new CountDownLatch(1);
+ }
+
+ void await() {
+ for (;;) {
+ try {
+ LOG.debug("awaiting {}", commit.getRevision());
+ latch.await();
+ break;
+ } catch (InterruptedException e) {
+ // retry
+ }
+ }
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueue.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1534626&r1=1534625&r2=1534626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Tue Oct 22 13:12:01 2013
@@ -42,8 +42,6 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.plugins.mongomk.Node.Children;
import org.apache.jackrabbit.oak.plugins.mongomk.blob.MongoBlobStore;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -66,8 +64,6 @@ public class MongoMK implements MicroKer
*/
static final boolean LIRS_CACHE = Boolean.parseBoolean(System.getProperty("oak.mongoMK.lirsCache", "false"));
- private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
-
/**
* Enable fast diff operations.
*/
@@ -96,7 +92,6 @@ public class MongoMK implements MicroKer
private final CacheStats diffCacheStats;
MongoMK(Builder builder) {
-
this.nodeStore = builder.getNodeStore();
this.store = nodeStore.getDocumentStore();
this.blobStore = builder.getBlobStore();
@@ -104,11 +99,8 @@ public class MongoMK implements MicroKer
diffCache = builder.buildCache(builder.getDiffCacheSize());
diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
builder.getWeigher(), builder.getDiffCacheSize());
-
- LOG.info("Initialized MongoMK with clusterNodeId: {}", nodeStore.getClusterId());
}
-
public void dispose() {
nodeStore.dispose();
}
@@ -380,88 +372,18 @@ public class MongoMK implements MicroKer
}
@Override
- public String commit(String rootPath, String json, String baseRevId,
+ public String commit(String rootPath, String jsonDiff, String baseRevId,
String message) throws MicroKernelException {
- synchronized (nodeStore) {
- Revision baseRev;
- if (baseRevId == null) {
- baseRev = nodeStore.getHeadRevision();
- baseRevId = baseRev.toString();
- } else {
- baseRev = Revision.fromString(baseRevId);
- }
- JsopReader t = new JsopTokenizer(json);
- Revision rev = nodeStore.newRevision();
- Commit commit = new Commit(nodeStore, baseRev, rev);
- while (true) {
- int r = t.read();
- if (r == JsopReader.END) {
- break;
- }
- String path = PathUtils.concat(rootPath, t.readString());
- switch (r) {
- case '+':
- t.read(':');
- t.read('{');
- parseAddNode(commit, t, path);
- break;
- case '-':
- commit.removeNode(path);
- nodeStore.markAsDeleted(path, commit, true);
- commit.removeNodeDiff(path);
- break;
- case '^':
- t.read(':');
- String value;
- if (t.matches(JsopReader.NULL)) {
- value = null;
- } else {
- value = t.readRawValue().trim();
- }
- String p = PathUtils.getParentPath(path);
- String propertyName = PathUtils.getName(path);
- commit.updateProperty(p, propertyName, value);
- commit.updatePropertyDiff(p, propertyName, value);
- break;
- case '>': {
- // TODO support moving nodes that were modified within this commit
- t.read(':');
- String sourcePath = path;
- String targetPath = t.readString();
- if (!PathUtils.isAbsolute(targetPath)) {
- targetPath = PathUtils.concat(rootPath, targetPath);
- }
- if (!nodeExists(sourcePath, baseRevId)) {
- throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
- } else if (nodeExists(targetPath, baseRevId)) {
- throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
- }
- commit.moveNode(sourcePath, targetPath);
- nodeStore.moveNode(sourcePath, targetPath, commit);
- break;
- }
- case '*': {
- // TODO support copying nodes that were modified within this commit
- t.read(':');
- String sourcePath = path;
- String targetPath = t.readString();
- if (!PathUtils.isAbsolute(targetPath)) {
- targetPath = PathUtils.concat(rootPath, targetPath);
- }
- if (!nodeExists(sourcePath, baseRevId)) {
- throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
- } else if (nodeExists(targetPath, baseRevId)) {
- throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
- }
- commit.copyNode(sourcePath, targetPath);
- nodeStore.copyNode(sourcePath, targetPath, commit);
- break;
- }
- default:
- throw new MicroKernelException("token: " + (char) t.getTokenType());
- }
- }
- if (baseRev.isBranch()) {
+ boolean success = false;
+ boolean isBranch = false;
+ Revision rev;
+ Commit commit = nodeStore.newCommit(baseRevId != null ? Revision.fromString(baseRevId) : null);
+ try {
+ Revision baseRev = commit.getBaseRevision();
+ isBranch = baseRev != null && baseRev.isBranch();
+ rev = commit.getRevision();
+ parseJsonDiff(commit, jsonDiff, rootPath);
+ if (isBranch) {
rev = rev.asBranchRevision();
// remember branch commit
Branch b = nodeStore.getBranches().getBranch(baseRev);
@@ -471,7 +393,6 @@ public class MongoMK implements MicroKer
} else {
b.addCommit(rev);
}
- boolean success = false;
try {
// prepare commit
commit.prepare(baseRev);
@@ -484,34 +405,19 @@ public class MongoMK implements MicroKer
}
}
}
-
- return rev.toString();
} else {
commit.apply();
nodeStore.setHeadRevision(commit.getRevision());
- return rev.toString();
+ success = true;
+ }
+ } finally {
+ if (!success) {
+ nodeStore.canceled(commit);
+ } else {
+ nodeStore.done(commit, isBranch);
}
}
- }
-
- public static void parseAddNode(Commit commit, JsopReader t, String path) {
- Node n = new Node(path, commit.getRevision());
- if (!t.matches('}')) {
- do {
- String key = t.readString();
- t.read(':');
- if (t.matches('{')) {
- String childPath = PathUtils.concat(path, key);
- parseAddNode(commit, t, childPath);
- } else {
- String value = t.readRawValue().trim();
- n.setProperty(key, value);
- }
- } while (t.matches(','));
- t.read('}');
- }
- commit.addNode(n);
- commit.addNodeDiff(n);
+ return rev.toString();
}
@Override
@@ -526,27 +432,32 @@ public class MongoMK implements MicroKer
@Override
public String merge(String branchRevisionId, String message)
throws MicroKernelException {
- synchronized (nodeStore) {
- // TODO improve implementation if needed
- Revision revision = Revision.fromString(branchRevisionId);
- if (!revision.isBranch()) {
- throw new MicroKernelException("Not a branch: " + branchRevisionId);
- }
-
+ // TODO improve implementation if needed
+ Revision revision = Revision.fromString(branchRevisionId);
+ if (!revision.isBranch()) {
+ throw new MicroKernelException("Not a branch: " + branchRevisionId);
+ }
+ Branch b = nodeStore.getBranches().getBranch(revision);
+ Revision base = revision;
+ if (b != null) {
+ base = b.getBase(revision);
+ }
+ boolean success = false;
+ Commit commit = nodeStore.newCommit(base);
+ try {
// make branch commits visible
UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
- Branch b = nodeStore.getBranches().getBranch(revision);
- Revision mergeCommit = nodeStore.newRevision();
- NodeDocument.setModified(op, mergeCommit);
+ NodeDocument.setModified(op, commit.getRevision());
if (b != null) {
+ String commitTag = "c-" + commit.getRevision().toString();
for (Revision rev : b.getCommits()) {
rev = rev.asTrunkRevision();
- NodeDocument.setRevision(op, rev, "c-" + mergeCommit.toString());
+ NodeDocument.setRevision(op, rev, commitTag);
op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
}
if (store.findAndUpdate(Collection.NODES, op) != null) {
// remove from branchCommits map after successful update
- b.applyTo(nodeStore.getPendingModifications(), mergeCommit);
+ b.applyTo(nodeStore.getPendingModifications(), commit.getRevision());
nodeStore.getBranches().remove(b);
} else {
throw new MicroKernelException("Conflicting concurrent change. Update operation failed: " + op);
@@ -554,9 +465,15 @@ public class MongoMK implements MicroKer
} else {
// no commits in this branch -> do nothing
}
- nodeStore.setHeadRevision(mergeCommit);
- return mergeCommit.toString();
+ success = true;
+ } finally {
+ if (!success) {
+ nodeStore.canceled(commit);
+ } else {
+ nodeStore.done(commit, false);
+ }
}
+ return commit.getRevision().toString();
}
@Override
@@ -612,6 +529,8 @@ public class MongoMK implements MicroKer
}
}
+ //-------------------------< accessors >------------------------------------
+
public DocumentStore getDocumentStore() {
return store;
}
@@ -635,7 +554,103 @@ public class MongoMK implements MicroKer
public boolean isCached(String path) {
return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
}
-
+
+ //------------------------------< internal >--------------------------------
+
+ private void parseJsonDiff(Commit commit, String json, String rootPath) {
+ String baseRevId = commit.getBaseRevision() != null ?
+ commit.getBaseRevision().toString() : null;
+ JsopReader t = new JsopTokenizer(json);
+ while (true) {
+ int r = t.read();
+ if (r == JsopReader.END) {
+ break;
+ }
+ String path = PathUtils.concat(rootPath, t.readString());
+ switch (r) {
+ case '+':
+ t.read(':');
+ t.read('{');
+ parseAddNode(commit, t, path);
+ break;
+ case '-':
+ commit.removeNode(path);
+ nodeStore.markAsDeleted(path, commit, true);
+ commit.removeNodeDiff(path);
+ break;
+ case '^':
+ t.read(':');
+ String value;
+ if (t.matches(JsopReader.NULL)) {
+ value = null;
+ } else {
+ value = t.readRawValue().trim();
+ }
+ String p = PathUtils.getParentPath(path);
+ String propertyName = PathUtils.getName(path);
+ commit.updateProperty(p, propertyName, value);
+ commit.updatePropertyDiff(p, propertyName, value);
+ break;
+ case '>': {
+ // TODO support moving nodes that were modified within this commit
+ t.read(':');
+ String sourcePath = path;
+ String targetPath = t.readString();
+ if (!PathUtils.isAbsolute(targetPath)) {
+ targetPath = PathUtils.concat(rootPath, targetPath);
+ }
+ if (!nodeExists(sourcePath, baseRevId)) {
+ throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+ } else if (nodeExists(targetPath, baseRevId)) {
+ throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+ }
+ commit.moveNode(sourcePath, targetPath);
+ nodeStore.moveNode(sourcePath, targetPath, commit);
+ break;
+ }
+ case '*': {
+ // TODO support copying nodes that were modified within this commit
+ t.read(':');
+ String sourcePath = path;
+ String targetPath = t.readString();
+ if (!PathUtils.isAbsolute(targetPath)) {
+ targetPath = PathUtils.concat(rootPath, targetPath);
+ }
+ if (!nodeExists(sourcePath, baseRevId)) {
+ throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+ } else if (nodeExists(targetPath, baseRevId)) {
+ throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+ }
+ commit.copyNode(sourcePath, targetPath);
+ nodeStore.copyNode(sourcePath, targetPath, commit);
+ break;
+ }
+ default:
+ throw new MicroKernelException("token: " + (char) t.getTokenType());
+ }
+ }
+ }
+
+ private static void parseAddNode(Commit commit, JsopReader t, String path) {
+ Node n = new Node(path, commit.getRevision());
+ if (!t.matches('}')) {
+ do {
+ String key = t.readString();
+ t.read(':');
+ if (t.matches('{')) {
+ String childPath = PathUtils.concat(path, key);
+ parseAddNode(commit, t, childPath);
+ } else {
+ String value = t.readRawValue().trim();
+ n.setProperty(key, value);
+ }
+ } while (t.matches(','));
+ t.read('}');
+ }
+ commit.addNode(n);
+ commit.addNodeDiff(n);
+ }
+
/**
* A (cached) result of the diff operation.
*/
@@ -654,6 +669,8 @@ public class MongoMK implements MicroKer
}
+ //----------------------------< Builder >-----------------------------------
+
/**
* A builder for a MongoMK instance.
*/
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.Collections;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.AbstractNodeState;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link NodeState} implementation for the {@link MongoNodeStore}.
+ */
+class MongoNodeState extends AbstractNodeState {
+
+ private final MongoNodeStore store;
+
+ private final String path;
+
+ private final Revision revision;
+
+ MongoNodeState(@Nonnull MongoNodeStore store,
+ @Nonnull String path,
+ @Nonnull Revision revision) {
+ this.store = checkNotNull(store);
+ this.path = checkNotNull(path);
+ this.revision = checkNotNull(revision);
+ }
+
+ String getPath() {
+ return path;
+ }
+
+ Revision getRevision() {
+ return revision;
+ }
+
+ //--------------------------< NodeState >-----------------------------------
+
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that) {
+ return true;
+ } else if (that instanceof MongoNodeState) {
+ MongoNodeState other = (MongoNodeState) that;
+ if (revision.equals(other.revision) && path.equals(other.path)) {
+ return true;
+ } else {
+ // TODO: optimize equals check for this case
+ }
+ }
+ if (that instanceof NodeState) {
+ return AbstractNodeState.equals(this, (NodeState) that);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean exists() {
+ // TODO: implement
+ return true;
+ }
+
+ @Nonnull
+ @Override
+ public Iterable<? extends PropertyState> getProperties() {
+ // TODO: implement
+ return Collections.emptyList();
+ }
+
+ @Nonnull
+ @Override
+ public NodeState getChildNode(@Nonnull String name) {
+ // TODO: implement
+ return EmptyNodeState.MISSING_NODE;
+ }
+
+ @Nonnull
+ @Override
+ public Iterable<? extends ChildNodeEntry> getChildNodeEntries() {
+ // TODO: implement
+ return Collections.emptyList();
+ }
+
+ @Nonnull
+ @Override
+ public NodeBuilder builder() {
+ // TODO: implement
+ return new MemoryNodeBuilder(this);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeState.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java?rev=1534626&r1=1534625&r2=1534626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java Tue Oct 22 13:12:01 2013
@@ -36,9 +36,13 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
@@ -52,6 +56,8 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.plugins.observation.Observable;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -63,7 +69,8 @@ import org.slf4j.LoggerFactory;
/**
* Implementation of a NodeStore on MongoDB.
*/
-public final class MongoNodeStore implements NodeStore, RevisionContext {
+public final class MongoNodeStore
+ implements NodeStore, RevisionContext, Observable {
private static final Logger LOG = LoggerFactory.getLogger(MongoNodeStore.class);
@@ -96,6 +103,16 @@ public final class MongoNodeStore implem
protected final DocumentStore store;
/**
+ * The commit queue to coordinate the commits.
+ */
+ protected final CommitQueue commitQueue;
+
+ /**
+ * The change dispatcher for this node store.
+ */
+ protected final ChangeDispatcher dispatcher;
+
+ /**
* Whether this instance is disposed.
*/
private final AtomicBoolean isDisposed = new AtomicBoolean();
@@ -168,6 +185,12 @@ public final class MongoNodeStore implem
private Thread backgroundThread;
/**
+ * Read/Write lock for background operations. Regular commits will acquire
+ * a shared lock, while a background write acquires an exclusive lock.
+ */
+ private final ReadWriteLock backgroundOperationLock = new ReentrantReadWriteLock();
+
+ /**
* Enable using simple revisions (just a counter). This feature is useful
* for testing.
*/
@@ -245,6 +268,10 @@ public final class MongoNodeStore implem
backgroundRead();
getRevisionComparator().add(headRevision, Revision.newRevision(0));
headRevision = newRevision();
+ dispatcher = new ChangeDispatcher(this);
+ commitQueue = new CommitQueue(this, dispatcher);
+
+ LOG.info("Initialized MongoNodeStore with clusterNodeId: {}", clusterId);
}
void init() {
@@ -262,7 +289,7 @@ public final class MongoNodeStore implem
}
backgroundThread = new Thread(
new BackgroundOperation(this, isDisposed),
- "MongoMK background thread");
+ "MongoNodeStore background thread");
backgroundThread.setDaemon(true);
backgroundThread.start();
}
@@ -286,14 +313,10 @@ public final class MongoNodeStore implem
clusterNodeInfo.dispose();
}
store.dispose();
- LOG.info("Disposed MongoMK with clusterNodeId: {}", clusterId);
+ LOG.info("Disposed MongoNodeStore with clusterNodeId: {}", clusterId);
}
}
- public void stopBackground() {
- stopBackground = true;
- }
-
@Nonnull
Revision getHeadRevision() {
return headRevision;
@@ -323,6 +346,34 @@ public final class MongoNodeStore implem
return Revision.newRevision(clusterId);
}
+ /**
+ * Creates a new commit. The caller must acknowledge the commit either with
+ * {@link #done(Commit, boolean)} or {@link #canceled(Commit)}, depending
+ * on the result of the commit.
+ *
+ * @param base the base revision for the commit or <code>null</code> if the
+ * commit should use the current head revision as base.
+ * @return a new commit.
+ */
+ @Nonnull
+ Commit newCommit(@Nullable Revision base) {
+ if (base == null) {
+ base = headRevision;
+ }
+ backgroundOperationLock.readLock().lock();
+ return commitQueue.createCommit(base);
+ }
+
+ void done(@Nonnull Commit c, boolean isBranch) {
+ backgroundOperationLock.readLock().unlock();
+ commitQueue.done(c, isBranch, CommitInfo.EMPTY);
+ }
+
+ void canceled(Commit c) {
+ backgroundOperationLock.readLock().unlock();
+ commitQueue.canceled(c);
+ }
+
public void setAsyncDelay(int delay) {
this.asyncDelay = delay;
}
@@ -351,10 +402,6 @@ public final class MongoNodeStore implem
return unsavedLastRevisions.getPaths().size();
}
- public long getSplitDocumentAgeMillis() {
- return this.splitDocumentAgeMillis;
- }
-
/**
* Checks that revision x is newer than another revision.
*
@@ -533,7 +580,7 @@ public final class MongoNodeStore implem
docChildrenCache.put(path, clone);
c = clone;
}
- Iterable<NodeDocument> it = Iterables.transform(c.childNames,
+ Iterable<NodeDocument> it = Iterables.transform(c.childNames,
new Function<String, NodeDocument>() {
@Override
public NodeDocument apply(String name) {
@@ -656,13 +703,30 @@ public final class MongoNodeStore implem
}
}
+ /**
+ * Returns the root node state at the given revision.
+ *
+ * @param revision a revision.
+ * @return the root node state at the given revision.
+ */
+ @Nonnull
+ NodeState getRoot(@Nonnull Revision revision) {
+ return new MongoNodeState(this, "/", revision);
+ }
+
+ //------------------------< Observable >------------------------------------
+
+ @Override
+ public ChangeDispatcher.Listener newListener() {
+ return dispatcher.newListener();
+ }
+
//-------------------------< NodeStore >------------------------------------
@Nonnull
@Override
public NodeState getRoot() {
- // TODO: implement
- return null;
+ return getRoot(headRevision);
}
@Nonnull
@@ -865,61 +929,67 @@ public final class MongoNodeStore implem
if (unsavedLastRevisions.getPaths().size() == 0) {
return;
}
- ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
- // sort by depth (high depth first), then path
- Collections.sort(paths, new Comparator<String>() {
+ Lock writeLock = backgroundOperationLock.writeLock();
+ writeLock.lock();
+ try {
+ ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
+ // sort by depth (high depth first), then path
+ Collections.sort(paths, new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- int d1 = Utils.pathDepth(o1);
- int d2 = Utils.pathDepth(o1);
- if (d1 != d2) {
- return Integer.signum(d1 - d2);
+ @Override
+ public int compare(String o1, String o2) {
+ int d1 = Utils.pathDepth(o1);
+ int d2 = Utils.pathDepth(o1);
+ if (d1 != d2) {
+ return Integer.signum(d1 - d2);
+ }
+ return o1.compareTo(o2);
}
- return o1.compareTo(o2);
- }
- });
+ });
- long now = Revision.getCurrentTimestamp();
- UpdateOp updateOp = null;
- Revision lastRev = null;
- List<String> ids = new ArrayList<String>();
- for (int i = 0; i < paths.size(); i++) {
- String p = paths.get(i);
- Revision r = unsavedLastRevisions.get(p);
- if (r == null) {
- continue;
- }
- // FIXME: with below code fragment the root (and other nodes
- // 'close' to the root) will not be updated in MongoDB when there
- // are frequent changes.
- if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
- continue;
- }
- int size = ids.size();
- if (updateOp == null) {
- // create UpdateOp
- Commit commit = new Commit(this, null, r);
- commit.touchNode(p);
- updateOp = commit.getUpdateOperationForNode(p);
- lastRev = r;
- ids.add(Utils.getIdFromPath(p));
- } else if (r.equals(lastRev)) {
- // use multi update when possible
- ids.add(Utils.getIdFromPath(p));
- }
- // update if this is the last path or
- // revision is not equal to last revision
- if (i + 1 >= paths.size() || size == ids.size()) {
- store.update(Collection.NODES, ids, updateOp);
- for (String id : ids) {
- unsavedLastRevisions.remove(Utils.getPathFromId(id));
- }
- ids.clear();
- updateOp = null;
- lastRev = null;
+ long now = Revision.getCurrentTimestamp();
+ UpdateOp updateOp = null;
+ Revision lastRev = null;
+ List<String> ids = new ArrayList<String>();
+ for (int i = 0; i < paths.size(); i++) {
+ String p = paths.get(i);
+ Revision r = unsavedLastRevisions.get(p);
+ if (r == null) {
+ continue;
+ }
+ // FIXME: with below code fragment the root (and other nodes
+ // 'close' to the root) will not be updated in MongoDB when there
+ // are frequent changes.
+ if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
+ continue;
+ }
+ int size = ids.size();
+ if (updateOp == null) {
+ // create UpdateOp
+ Commit commit = new Commit(this, null, r);
+ commit.touchNode(p);
+ updateOp = commit.getUpdateOperationForNode(p);
+ lastRev = r;
+ ids.add(Utils.getIdFromPath(p));
+ } else if (r.equals(lastRev)) {
+ // use multi update when possible
+ ids.add(Utils.getIdFromPath(p));
+ }
+ // update if this is the last path or
+ // revision is not equal to last revision
+ if (i + 1 >= paths.size() || size == ids.size()) {
+ store.update(Collection.NODES, ids, updateOp);
+ for (String id : ids) {
+ unsavedLastRevisions.remove(Utils.getPathFromId(id));
+ }
+ ids.clear();
+ updateOp = null;
+ lastRev = null;
+ }
}
+ } finally {
+ writeLock.unlock();
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java?rev=1534626&r1=1534625&r2=1534626&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/UnsavedModifications.java Tue Oct 22 13:12:01 2013
@@ -33,9 +33,37 @@ class UnsavedModifications {
private final ConcurrentHashMap<String, Revision> map = new ConcurrentHashMap<String, Revision>();
+ /**
+ * Puts a revision for the given path. The revision for the given path is
+ * only put if there is no modification present for the revision or if the
+ * current modification revision is older than the passed revision.
+ *
+ * @param path the path of the modified node.
+ * @param revision the revision of the modification.
+ * @return the previously set revision for the given path or null if there
+ * was none or the current revision is newer.
+ */
@CheckForNull
public Revision put(@Nonnull String path, @Nonnull Revision revision) {
- return map.put(checkNotNull(path), checkNotNull(revision));
+ checkNotNull(path);
+ checkNotNull(revision);
+ for (;;) {
+ Revision previous = map.get(path);
+ if (previous == null) {
+ if (map.putIfAbsent(path, revision) == null) {
+ return null;
+ }
+ } else {
+ if (previous.compareRevisionTime(revision) < 0) {
+ if (map.replace(path, previous, revision)) {
+ return previous;
+ }
+ } else {
+ // revision is earlier, do not update
+ return null;
+ }
+ }
+ }
}
@CheckForNull
@@ -64,12 +92,7 @@ class UnsavedModifications {
*/
public void applyTo(UnsavedModifications other, Revision mergeCommit) {
for (Map.Entry<String, Revision> entry : map.entrySet()) {
- Revision r = other.map.putIfAbsent(entry.getKey(), mergeCommit);
- if (r != null) {
- if (r.compareRevisionTime(mergeCommit) < 0) {
- other.map.put(entry.getKey(), mergeCommit);
- }
- }
+ other.put(entry.getKey(), mergeCommit);
}
}
}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.junit.Test;
+
+/**
+ * Tests for {@link CommitQueue}.
+ */
+public class CommitQueueTest {
+
+ private static final int NUM_WRITERS = 10;
+
+ private static final int COMMITS_PER_WRITER = 100;
+
+ @Test
+ public void concurrentCommits() throws Exception {
+ final MongoNodeStore store = new MongoMK.Builder().getNodeStore();
+ ChangeDispatcher dispatcher = new ChangeDispatcher(store);
+ final ChangeDispatcher.Listener listener = dispatcher.newListener();
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final CommitQueue queue = new CommitQueue(store, dispatcher);
+ final List<Exception> exceptions = Collections.synchronizedList(
+ new ArrayList<Exception>());
+ Thread reader = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Revision before = new Revision(0, 0, store.getClusterId());
+ while (running.get()) {
+ ChangeDispatcher.ChangeSet changes = listener.getChanges();
+ if (changes != null) {
+ MongoNodeState after = (MongoNodeState) changes.getAfterState();
+ Revision r = after.getRevision();
+ // System.out.println("seen: " + r);
+ if (r.compareRevisionTime(before) < 1) {
+ exceptions.add(new Exception(
+ "Inconsistent revision sequence. Before: " +
+ before + ", after: " + r));
+ break;
+ }
+ before = r;
+ } else {
+ // avoid busy wait
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ });
+ reader.start();
+
+ // perform commits with multiple threads
+ List<Thread> writers = new ArrayList<Thread>();
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ final Random random = new Random(i);
+ writers.add(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < COMMITS_PER_WRITER; i++) {
+ Revision base = store.getHeadRevision();
+ Commit c = queue.createCommit(base);
+ try {
+ Thread.sleep(0, random.nextInt(1000));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ if (random.nextInt(5) == 0) {
+ // cancel 20% of the commits
+ queue.canceled(c);
+ } else {
+ boolean isBranch = random.nextInt(5) == 0;
+ queue.done(c, isBranch, CommitInfo.create(
+ null, null, System.currentTimeMillis()));
+ }
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ }));
+ }
+ for (Thread t : writers) {
+ t.start();
+ }
+ for (Thread t : writers) {
+ t.join();
+ }
+ running.set(false);
+ reader.join();
+ for (Exception e : exceptions) {
+ throw e;
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/CommitQueueTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java?rev=1534626&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java Tue Oct 22 13:12:01 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.mongomk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+/**
+ * Performs non-conflicting property updates on MongoMK with multiple threads.
+ */
+public class ConcurrentUpdatesTest extends AbstractMongoConnectionTest {
+
+ private static final int NUM_WRITERS = 10;
+
+ private static final int NUM_OPS = 100;
+
+ @Test
+ public void test() throws Exception {
+ final List<Exception> exceptions = Collections.synchronizedList(
+ new ArrayList<Exception>());
+ List<Thread> writers = new ArrayList<Thread>();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ final String nodeName = "test-" + i;
+ sb.append("+\"").append(nodeName).append("\":{}");
+ writers.add(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < NUM_OPS; i++) {
+ mk.commit("/" + nodeName, "^\"prop\":" + i, null, null);
+ }
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ }));
+ }
+ mk.commit("/", sb.toString(), null, null);
+ long time = System.currentTimeMillis();
+ for (Thread t : writers) {
+ t.start();
+ }
+ for (Thread t : writers) {
+ t.join();
+ }
+ time = System.currentTimeMillis() - time;
+ // System.out.println(time);
+ for (Exception e : exceptions) {
+ throw e;
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/ConcurrentUpdatesTest.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL