You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2013/10/16 15:23:58 UTC
svn commit: r1532757 [1/2] - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/mongomk/
test/java/org/apache/jackrabbit/oak/plugins/mongomk/
Author: mreutegg
Date: Wed Oct 16 13:23:58 2013
New Revision: 1532757
URL: http://svn.apache.org/r1532757
Log:
OAK-1102: Isolate MicroKernel specific code in MongoMK
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoNodeStore.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/DocumentSplitTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/mongomk/SimpleTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java?rev=1532757&r1=1532756&r2=1532757&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/Commit.java Wed Oct 16 13:23:58 2013
@@ -43,7 +43,7 @@ public class Commit {
private static final Logger LOG = LoggerFactory.getLogger(Commit.class);
- private final MongoMK mk;
+ private final MongoNodeStore nodeStore;
private final Revision baseRevision;
private final Revision revision;
private HashMap<String, UpdateOp> operations = new HashMap<String, UpdateOp>();
@@ -58,10 +58,10 @@ public class Commit {
private HashSet<String> addedNodes = new HashSet<String>();
private HashSet<String> removedNodes = new HashSet<String>();
- Commit(MongoMK mk, Revision baseRevision, Revision revision) {
+ Commit(MongoNodeStore nodeStore, Revision baseRevision, Revision revision) {
this.baseRevision = baseRevision;
this.revision = revision;
- this.mk = mk;
+ this.nodeStore = nodeStore;
}
UpdateOp getUpdateOperationForNode(String path) {
@@ -169,7 +169,7 @@ public class Commit {
// other readers. branch commits use the base revision to indicate
// the visibility of the commit
String commitValue = baseBranchRevision != null ? baseBranchRevision.toString() : "c";
- DocumentStore store = mk.getDocumentStore();
+ DocumentStore store = nodeStore.getDocumentStore();
String commitRootPath = null;
if (baseBranchRevision != null) {
// branch commits always use root node as commit root
@@ -271,7 +271,7 @@ public class Commit {
}
private void rollback(ArrayList<UpdateOp> newDocuments, ArrayList<UpdateOp> changed) {
- DocumentStore store = mk.getDocumentStore();
+ DocumentStore store = nodeStore.getDocumentStore();
for (UpdateOp op : changed) {
UpdateOp reverse = op.getReverseOperation();
store.createOrUpdate(Collection.NODES, reverse);
@@ -294,7 +294,7 @@ public class Commit {
final AtomicReference<List<Revision>> collisions = new AtomicReference<List<Revision>>();
Revision newestRev = null;
if (doc != null) {
- newestRev = doc.getNewestRevision(mk, revision,
+ newestRev = doc.getNewestRevision(nodeStore, revision,
new CollisionHandler() {
@Override
void concurrentModification(Revision other) {
@@ -316,7 +316,7 @@ public class Commit {
conflictMessage = "The node " +
op.getId() + " was already added in revision\n" +
newestRev;
- } else if (mk.isRevisionNewer(newestRev, baseRevision)
+ } else if (nodeStore.isRevisionNewer(newestRev, baseRevision)
&& (op.isDelete() || isConflicting(doc, op))) {
conflictMessage = "The node " +
op.getId() + " was changed in revision\n" + newestRev +
@@ -327,7 +327,7 @@ public class Commit {
if (conflictMessage != null) {
conflictMessage += ", before\n" + revision +
"; document:\n" + (doc == null ? "" : doc.format()) +
- ",\nrevision order:\n" + mk.getRevisionComparator();
+ ",\nrevision order:\n" + nodeStore.getRevisionComparator();
throw new MicroKernelException(conflictMessage);
}
// if we get here the modification was successful
@@ -336,13 +336,13 @@ public class Commit {
if (collisions.get() != null && isConflicting(doc, op)) {
for (Revision r : collisions.get()) {
// mark collisions on commit root
- new Collision(doc, r, op, revision, mk).mark(store);
+ new Collision(doc, r, op, revision, nodeStore).mark(store);
}
}
}
if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
- mk.addSplitCandidate(doc.getId());
+ nodeStore.addSplitCandidate(doc.getId());
}
}
@@ -365,7 +365,7 @@ public class Commit {
// or document did not exist before
return false;
}
- return doc.isConflicting(op, baseRevision, mk);
+ return doc.isConflicting(op, baseRevision, nodeStore);
}
/**
@@ -404,9 +404,7 @@ public class Commit {
boolean isNew = op != null && op.isNew();
boolean isWritten = op != null;
boolean isDelete = op != null && op.isDelete();
- mk.applyChanges(revision, path,
- isNew, isDelete, isWritten, isBranchCommit,
- added, removed);
+ nodeStore.applyChanges(revision, path, isNew, isDelete, isWritten, isBranchCommit, added, removed);
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java?rev=1532757&r1=1532756&r2=1532757&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/mongomk/MongoMK.java Wed Oct 16 13:23:58 2013
@@ -17,24 +17,12 @@
package org.apache.jackrabbit.oak.plugins.mongomk;
import java.io.InputStream;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -52,29 +40,20 @@ import org.apache.jackrabbit.oak.cache.C
import org.apache.jackrabbit.oak.cache.EmpiricalWeigher;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.mongomk.Node.Children;
-import org.apache.jackrabbit.oak.plugins.mongomk.Revision.RevisionComparator;
import org.apache.jackrabbit.oak.plugins.mongomk.blob.MongoBlobStore;
-import org.apache.jackrabbit.oak.plugins.mongomk.util.LoggingDocumentStoreWrapper;
-import org.apache.jackrabbit.oak.plugins.mongomk.util.TimingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.mongomk.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Weigher;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
import com.mongodb.DB;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* A MicroKernel implementation that stores the data in a MongoDB.
*/
-public class MongoMK implements MicroKernel, RevisionContext {
+public class MongoMK implements MicroKernel {
/**
* The threshold where special handling for many child node starts.
@@ -85,41 +64,20 @@ public class MongoMK implements MicroKer
/**
* Enable the LIRS cache.
*/
- static final boolean LIRS_CACHE = Boolean.parseBoolean(
- System.getProperty("oak.mongoMK.lirsCache", "false"));
+ static final boolean LIRS_CACHE = Boolean.parseBoolean(System.getProperty("oak.mongoMK.lirsCache", "false"));
private static final Logger LOG = LoggerFactory.getLogger(MongoMK.class);
/**
- * Do not cache more than this number of children for a document.
- */
- private static final int NUM_CHILDREN_CACHE_LIMIT = Integer.getInteger(
- "oak.mongoMK.childrenCacheLimit", 16 * 1024);
-
- /**
- * When trying to access revisions that are older than this many
- * milliseconds, a warning is logged. The default is one minute.
- */
- private static final int WARN_REVISION_AGE =
- Integer.getInteger("oak.mongoMK.revisionAge", 60 * 1000);
-
- /**
- * Enable background operations
- */
- private static final boolean ENABLE_BACKGROUND_OPS = Boolean.parseBoolean(
- System.getProperty("oak.mongoMK.backgroundOps", "true"));
-
- /**
* Enable fast diff operations.
*/
private static final boolean FAST_DIFF = Boolean.parseBoolean(
System.getProperty("oak.mongoMK.fastDiff", "true"));
/**
- * How long to remember the relative order of old revision of all cluster
- * nodes, in milliseconds. The default is one hour.
+ * The MongoDB store.
*/
- private static final int REMEMBER_REVISION_ORDER_MILLIS = 60 * 60 * 1000;
+ protected final MongoNodeStore nodeStore;
/**
* The MongoDB store (might be used by multiple MongoMKs).
@@ -127,581 +85,57 @@ public class MongoMK implements MicroKer
protected final DocumentStore store;
/**
- * The delay for asynchronous operations (delayed commit propagation and
- * cache update).
- */
- protected int asyncDelay = 1000;
-
- /**
- * Whether this instance is disposed.
- */
- private final AtomicBoolean isDisposed = new AtomicBoolean();
-
- /**
* The MongoDB blob store.
*/
private final BlobStore blobStore;
-
- /**
- * The cluster instance info.
- */
- private final ClusterNodeInfo clusterNodeInfo;
-
- /**
- * The unique cluster id, similar to the unique machine id in MongoDB.
- */
- private final int clusterId;
-
- /**
- * The splitting point in milliseconds. If a document is split, revisions
- * older than this number of milliseconds are moved to a different document.
- * The default is 0, meaning documents are never split. Revisions that are
- * newer than this are kept in the newest document.
- */
- private final long splitDocumentAgeMillis;
-
- /**
- * The node cache.
- *
- * Key: path@rev, value: node
- */
- private final Cache<String, Node> nodeCache;
- private final CacheStats nodeCacheStats;
/**
- * Child node cache.
- *
- * Key: path@rev, value: children
- */
- private final Cache<String, Node.Children> nodeChildrenCache;
- private final CacheStats nodeChildrenCacheStats;
-
- /**
* Diff cache.
*/
private final Cache<String, Diff> diffCache;
private final CacheStats diffCacheStats;
- /**
- * Child doc cache.
- */
- private final Cache<String, NodeDocument.Children> docChildrenCache;
- private final CacheStats docChildrenCacheStats;
-
- /**
- * The unsaved last revisions. This contains the parents of all changed
- * nodes, once those nodes are committed but the parent node itself wasn't
- * committed yet. The parents are not immediately persisted as this would
- * cause each commit to change all parents (including the root node), which
- * would limit write scalability.
- *
- * Key: path, value: revision.
- */
- private final UnsavedModifications unsavedLastRevisions = new UnsavedModifications();
-
- /**
- * Set of IDs for documents that may need to be split.
- */
- private final Map<String, String> splitCandidates = Maps.newConcurrentMap();
-
- /**
- * The last known revision for each cluster instance.
- *
- * Key: the machine id, value: revision.
- */
- private final Map<Integer, Revision> lastKnownRevision =
- new ConcurrentHashMap<Integer, Revision>();
-
- /**
- * The last known head revision. This is the last-known revision.
- */
- private volatile Revision headRevision;
-
- private Thread backgroundThread;
-
- /**
- * Enable using simple revisions (just a counter). This feature is useful
- * for testing.
- */
- private AtomicInteger simpleRevisionCounter;
-
- /**
- * The comparator for revisions.
- */
- private final RevisionComparator revisionComparator;
-
- /**
- * Unmerged branches of this MongoMK instance.
- */
- // TODO at some point, open (unmerged) branches
- // need to be garbage collected (in-memory and on disk)
- private final UnmergedBranches branches;
-
- private boolean stopBackground;
-
MongoMK(Builder builder) {
- if (builder.isUseSimpleRevision()) {
- this.simpleRevisionCounter = new AtomicInteger(0);
- }
-
- DocumentStore s = builder.getDocumentStore();
- if (builder.getTiming()) {
- s = new TimingDocumentStoreWrapper(s);
- }
- if (builder.getLogging()) {
- s = new LoggingDocumentStoreWrapper(s);
- }
- this.store = s;
+ this.nodeStore = builder.getNodeStore();
+ this.store = nodeStore.getDocumentStore();
this.blobStore = builder.getBlobStore();
- int cid = builder.getClusterId();
- splitDocumentAgeMillis = builder.getSplitDocumentAgeMillis();
- cid = Integer.getInteger("oak.mongoMK.clusterId", cid);
- if (cid == 0) {
- clusterNodeInfo = ClusterNodeInfo.getInstance(store);
- // TODO we should ensure revisions generated from now on
- // are never "older" than revisions already in the repository for
- // this cluster id
- cid = clusterNodeInfo.getId();
- } else {
- clusterNodeInfo = null;
- }
- this.clusterId = cid;
-
- this.revisionComparator = new RevisionComparator(clusterId);
- this.asyncDelay = builder.getAsyncDelay();
- this.branches = new UnmergedBranches(getRevisionComparator());
-
- //TODO Make stats collection configurable as it add slight overhead
- //TODO Expose the stats as JMX beans
-
- nodeCache = builder.buildCache(builder.getNodeCacheSize());
- nodeCacheStats = new CacheStats(nodeCache, "MongoMk-Node",
- builder.getWeigher(), builder.getNodeCacheSize());
-
- nodeChildrenCache = builder.buildCache(builder.getChildrenCacheSize());
- nodeChildrenCacheStats = new CacheStats(nodeChildrenCache, "MongoMk-NodeChildren",
- builder.getWeigher(), builder.getChildrenCacheSize());
diffCache = builder.buildCache(builder.getDiffCacheSize());
diffCacheStats = new CacheStats(diffCache, "MongoMk-DiffCache",
builder.getWeigher(), builder.getDiffCacheSize());
- docChildrenCache = builder.buildCache(builder.getDocChildrenCacheSize());
- docChildrenCacheStats = new CacheStats(docChildrenCache, "MongoMk-DocChildren",
- builder.getWeigher(), builder.getDocChildrenCacheSize());
-
- init();
- // initial reading of the revisions of other cluster nodes
- backgroundRead();
- getRevisionComparator().add(headRevision, Revision.newRevision(0));
- headRevision = newRevision();
- LOG.info("Initialized MongoMK with clusterNodeId: {}", clusterId);
+ LOG.info("Initialized MongoMK with clusterNodeId: {}", nodeStore.getClusterId());
}
- void init() {
- headRevision = newRevision();
- Node n = readNode("/", headRevision);
- if (n == null) {
- // root node is missing: repository is not initialized
- Commit commit = new Commit(this, null, headRevision);
- n = new Node("/", headRevision);
- commit.addNode(n);
- commit.applyToDocumentStore();
- } else {
- // initialize branchCommits
- branches.init(store, this);
- }
- backgroundThread = new Thread(
- new BackgroundOperation(this, isDisposed),
- "MongoMK background thread");
- backgroundThread.setDaemon(true);
- backgroundThread.start();
- }
-
- /**
- * Create a new revision.
- *
- * @return the revision
- */
- Revision newRevision() {
- if (simpleRevisionCounter != null) {
- return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId);
- }
- return Revision.newRevision(clusterId);
- }
-
- void runBackgroundOperations() {
- if (isDisposed.get()) {
- return;
- }
- backgroundRenewClusterIdLease();
- if (simpleRevisionCounter != null) {
- // only when using timestamp
- return;
- }
- if (!ENABLE_BACKGROUND_OPS || stopBackground) {
- return;
- }
- synchronized (this) {
- try {
- backgroundSplit();
- backgroundWrite();
- backgroundRead();
- } catch (RuntimeException e) {
- if (isDisposed.get()) {
- return;
- }
- LOG.warn("Background operation failed: " + e.toString(), e);
- }
- }
+ public void dispose() {
+ nodeStore.dispose();
}
- private void backgroundRenewClusterIdLease() {
- if (clusterNodeInfo == null) {
- return;
- }
- clusterNodeInfo.renewLease(asyncDelay);
- }
-
void backgroundRead() {
- String id = Utils.getIdFromPath("/");
- NodeDocument doc = store.find(Collection.NODES, id, asyncDelay);
- if (doc == null) {
- return;
- }
- Map<Integer, Revision> lastRevMap = doc.getLastRev();
-
- RevisionComparator revisionComparator = getRevisionComparator();
- boolean hasNewRevisions = false;
- // the (old) head occurred first
- Revision headSeen = Revision.newRevision(0);
- // then we saw this new revision (from another cluster node)
- Revision otherSeen = Revision.newRevision(0);
- for (Entry<Integer, Revision> e : lastRevMap.entrySet()) {
- int machineId = e.getKey();
- if (machineId == clusterId) {
- continue;
- }
- Revision r = e.getValue();
- Revision last = lastKnownRevision.get(machineId);
- if (last == null || r.compareRevisionTime(last) > 0) {
- lastKnownRevision.put(machineId, r);
- hasNewRevisions = true;
- revisionComparator.add(r, otherSeen);
- }
- }
- if (hasNewRevisions) {
- // TODO invalidating the whole cache is not really needed,
- // instead only those children that are cached could be checked
- store.invalidateCache();
- // TODO only invalidate affected items
- docChildrenCache.invalidateAll();
- // add a new revision, so that changes are visible
- Revision r = Revision.newRevision(clusterId);
- // the latest revisions of the current cluster node
- // happened before the latest revisions of other cluster nodes
- revisionComparator.add(r, headSeen);
- // the head revision is after other revisions
- headRevision = Revision.newRevision(clusterId);
- }
- revisionComparator.purge(Revision.getCurrentTimestamp() - REMEMBER_REVISION_ORDER_MILLIS);
- }
-
- private void backgroundSplit() {
- for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext();) {
- String id = it.next();
- NodeDocument doc = store.find(Collection.NODES, id);
- if (doc == null) {
- continue;
- }
- for (UpdateOp op : doc.split(this)) {
- NodeDocument before = store.createOrUpdate(Collection.NODES, op);
- if (before != null) {
- NodeDocument after = store.find(Collection.NODES, op.getId());
- if (after != null) {
- LOG.info("Split operation on {}. Size before: {}, after: {}",
- new Object[]{id, before.getMemory(), after.getMemory()});
- }
- }
- }
- it.remove();
- }
+ nodeStore.backgroundRead();
}
void backgroundWrite() {
- if (unsavedLastRevisions.getPaths().size() == 0) {
- return;
- }
- ArrayList<String> paths = new ArrayList<String>(unsavedLastRevisions.getPaths());
- // sort by depth (high depth first), then path
- Collections.sort(paths, new Comparator<String>() {
-
- @Override
- public int compare(String o1, String o2) {
- int d1 = Utils.pathDepth(o1);
- int d2 = Utils.pathDepth(o1);
- if (d1 != d2) {
- return Integer.signum(d1 - d2);
- }
- return o1.compareTo(o2);
- }
-
- });
-
- long now = Revision.getCurrentTimestamp();
- UpdateOp updateOp = null;
- Revision lastRev = null;
- List<String> ids = new ArrayList<String>();
- for (int i = 0; i < paths.size(); i++) {
- String p = paths.get(i);
- Revision r = unsavedLastRevisions.get(p);
- if (r == null) {
- continue;
- }
- // FIXME: with below code fragment the root (and other nodes
- // 'close' to the root) will not be updated in MongoDB when there
- // are frequent changes.
- if (Revision.getTimestampDifference(now, r.getTimestamp()) < asyncDelay) {
- continue;
- }
- int size = ids.size();
- if (updateOp == null) {
- // create UpdateOp
- Commit commit = new Commit(this, null, r);
- commit.touchNode(p);
- updateOp = commit.getUpdateOperationForNode(p);
- lastRev = r;
- ids.add(Utils.getIdFromPath(p));
- } else if (r.equals(lastRev)) {
- // use multi update when possible
- ids.add(Utils.getIdFromPath(p));
- }
- // update if this is the last path or
- // revision is not equal to last revision
- if (i + 1 >= paths.size() || size == ids.size()) {
- store.update(Collection.NODES, ids, updateOp);
- for (String id : ids) {
- unsavedLastRevisions.remove(Utils.getPathFromId(id));
- }
- ids.clear();
- updateOp = null;
- lastRev = null;
- }
- }
- }
-
- public void dispose() {
- // force background write (with asyncDelay > 0, the root wouldn't be written)
- // TODO make this more obvious / explicit
- // TODO tests should also work if this is not done
- asyncDelay = 0;
- runBackgroundOperations();
- if (!isDisposed.getAndSet(true)) {
- synchronized (isDisposed) {
- isDisposed.notifyAll();
- }
- try {
- backgroundThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- if (clusterNodeInfo != null) {
- clusterNodeInfo.dispose();
- }
- store.dispose();
- LOG.info("Disposed MongoMK with clusterNodeId: {}", clusterId);
- }
+ nodeStore.backgroundWrite();
}
- /**
- * Get the node for the given path and revision. The returned object might
- * not be modified directly.
- *
- * @param path the path of the node.
- * @param rev the read revision.
- * @return the node or <code>null</code> if the node does not exist at the
- * given revision.
- */
- @CheckForNull
- Node getNode(final @Nonnull String path, final @Nonnull Revision rev) {
- checkRevisionAge(checkNotNull(rev), checkNotNull(path));
- try {
- String key = path + "@" + rev;
- Node node = nodeCache.get(key, new Callable<Node>() {
- @Override
- public Node call() throws Exception {
- Node n = readNode(path, rev);
- if (n == null) {
- n = Node.MISSING;
- }
- return n;
- }
- });
- return node == Node.MISSING ? null : node;
- } catch (ExecutionException e) {
- throw new MicroKernelException(e);
- }
+ MongoNodeStore getNodeStore() {
+ return nodeStore;
}
- /**
- * Enqueue the document with the given id as a split candidate.
- *
- * @param id the id of the document to check if it needs to be split.
- */
- void addSplitCandidate(String id) {
- splitCandidates.put(id, id);
- }
-
- private void checkRevisionAge(Revision r, String path) {
- // TODO only log if there are new revisions available for the given node
- if (LOG.isDebugEnabled()) {
- if (headRevision.getTimestamp() - r.getTimestamp() > WARN_REVISION_AGE) {
- LOG.debug("Requesting an old revision for path " + path + ", " +
- ((headRevision.getTimestamp() - r.getTimestamp()) / 1000) + " seconds old");
- }
- }
- }
-
- /**
- * Checks that revision x is newer than another revision.
- *
- * @param x the revision to check
- * @param previous the presumed earlier revision
- * @return true if x is newer
- */
- boolean isRevisionNewer(@Nonnull Revision x, @Nonnull Revision previous) {
- return getRevisionComparator().compare(x, previous) > 0;
- }
-
- public long getSplitDocumentAgeMillis() {
- return this.splitDocumentAgeMillis;
- }
-
- public Children getChildren(final String path, final Revision rev, final int limit) throws MicroKernelException {
- checkRevisionAge(rev, path);
- String key = path + "@" + rev;
- Children children;
- try {
- children = nodeChildrenCache.get(key, new Callable<Children>() {
- @Override
- public Children call() throws Exception {
- return readChildren(path, rev, limit);
- }
- });
- } catch (ExecutionException e) {
- throw new MicroKernelException("Error occurred while fetching children nodes for path "+path, e);
- }
-
- //In case the limit > cached children size and there are more child nodes
- //available then refresh the cache
- if (children.hasMore) {
- if (limit > children.children.size()) {
- children = readChildren(path, rev, limit);
- if (children != null) {
- nodeChildrenCache.put(key, children);
- }
- }
- }
- return children;
- }
-
- Node.Children readChildren(String path, Revision rev, int limit) {
- // TODO use offset, to avoid O(n^2) and running out of memory
- // to do that, use the *name* of the last entry of the previous batch of children
- // as the starting point
- Iterable<NodeDocument> docs;
- Children c = new Children();
- int rawLimit = limit;
- Set<Revision> validRevisions = new HashSet<Revision>();
- do {
- c.children.clear();
- c.hasMore = true;
- docs = readChildren(path, rawLimit);
- int numReturned = 0;
- for (NodeDocument doc : docs) {
- numReturned++;
- // filter out deleted children
- if (doc.isDeleted(this, rev, validRevisions)) {
- continue;
- }
- String p = Utils.getPathFromId(doc.getId());
- if (c.children.size() < limit) {
- // add to children until limit is reached
- c.children.add(p);
- }
- }
- if (numReturned < rawLimit) {
- // fewer documents returned than requested
- // -> no more documents
- c.hasMore = false;
- }
- // double rawLimit for next round
- rawLimit = (int) Math.min(((long) rawLimit) * 2, Integer.MAX_VALUE);
- } while (c.children.size() < limit && c.hasMore);
- return c;
+ ClusterNodeInfo getClusterInfo() {
+ return nodeStore.getClusterInfo();
}
- @Nonnull
- Iterable<NodeDocument> readChildren(final String path, int limit) {
- String from = Utils.getKeyLowerLimit(path);
- String to = Utils.getKeyUpperLimit(path);
- if (limit > NUM_CHILDREN_CACHE_LIMIT) {
- // do not use cache
- return store.query(Collection.NODES, from, to, limit);
- }
- // check cache
- NodeDocument.Children c = docChildrenCache.getIfPresent(path);
- if (c == null) {
- c = new NodeDocument.Children();
- List<NodeDocument> docs = store.query(Collection.NODES, from, to, limit);
- for (NodeDocument doc : docs) {
- String p = Utils.getPathFromId(doc.getId());
- c.childNames.add(PathUtils.getName(p));
- }
- c.isComplete = docs.size() < limit;
- docChildrenCache.put(path, c);
- } else if (c.childNames.size() < limit && !c.isComplete) {
- // fetch more and update cache
- String lastName = c.childNames.get(c.childNames.size() - 1);
- String lastPath = PathUtils.concat(path, lastName);
- from = Utils.getIdFromPath(lastPath);
- int remainingLimit = limit - c.childNames.size();
- List<NodeDocument> docs = store.query(Collection.NODES,
- from, to, remainingLimit);
- NodeDocument.Children clone = c.clone();
- for (NodeDocument doc : docs) {
- String p = Utils.getPathFromId(doc.getId());
- clone.childNames.add(PathUtils.getName(p));
- }
- clone.isComplete = docs.size() < remainingLimit;
- docChildrenCache.put(path, clone);
- c = clone;
- }
- return Iterables.transform(c.childNames, new Function<String, NodeDocument>() {
- @Override
- public NodeDocument apply(String name) {
- String p = PathUtils.concat(path, name);
- return store.find(Collection.NODES, Utils.getIdFromPath(p));
- }
- });
+ int getPendingWriteCount() {
+ return nodeStore.getPendingWriteCount();
}
- @CheckForNull
- private Node readNode(String path, Revision readRevision) {
- String id = Utils.getIdFromPath(path);
- Revision lastRevision = getPendingModifications().get(path);
- NodeDocument doc = store.find(Collection.NODES, id);
- if (doc == null) {
- return null;
- }
- return doc.getNodeAtRevision(this, readRevision, lastRevision);
- }
-
@Override
public String getHeadRevision() throws MicroKernelException {
- return headRevision.toString();
+ return nodeStore.getHeadRevision().toString();
}
@Override @Nonnull
@@ -767,8 +201,8 @@ public class MongoMK implements MicroKer
}
Revision fromRev = Revision.fromString(fromRevisionId);
Revision toRev = Revision.fromString(toRevisionId);
- Node from = getNode(path, fromRev);
- Node to = getNode(path, toRev);
+ Node from = nodeStore.getNode(path, fromRev);
+ Node to = nodeStore.getNode(path, toRev);
if (from == null || to == null) {
// TODO implement correct behavior if the node does't/didn't exist
@@ -798,8 +232,8 @@ public class MongoMK implements MicroKer
// use a MongoDB index instead
int max = MANY_CHILDREN_THRESHOLD;
Children fromChildren, toChildren;
- fromChildren = getChildren(path, fromRev, max);
- toChildren = getChildren(path, toRev, max);
+ fromChildren = nodeStore.getChildren(path, fromRev, max);
+ toChildren = nodeStore.getChildren(path, toRev, max);
if (!fromChildren.hasMore && !toChildren.hasMore) {
diffFewChildren(w, fromChildren, fromRev, toChildren, toRev);
} else {
@@ -807,8 +241,8 @@ public class MongoMK implements MicroKer
diffManyChildren(w, path, fromRev, toRev);
} else {
max = Integer.MAX_VALUE;
- fromChildren = getChildren(path, fromRev, max);
- toChildren = getChildren(path, toRev, max);
+ fromChildren = nodeStore.getChildren(path, fromRev, max);
+ toChildren = nodeStore.getChildren(path, toRev, max);
diffFewChildren(w, fromChildren, fromRev, toChildren, toRev);
}
}
@@ -825,8 +259,8 @@ public class MongoMK implements MicroKer
for (NodeDocument doc : list) {
String id = doc.getId();
String p = Utils.getPathFromId(id);
- Node fromNode = getNode(p, fromRev);
- Node toNode = getNode(p, toRev);
+ Node fromNode = nodeStore.getNode(p, fromRev);
+ Node toNode = nodeStore.getNode(p, toRev);
if (fromNode != null) {
// exists in fromRev
if (toNode != null) {
@@ -858,8 +292,8 @@ public class MongoMK implements MicroKer
if (!childrenSet.contains(n)) {
w.tag('-').value(n).newline();
} else {
- Node n1 = getNode(n, fromRev);
- Node n2 = getNode(n, toRev);
+ Node n1 = nodeStore.getNode(n, fromRev);
+ Node n2 = nodeStore.getNode(n, toRev);
// this is not fully correct:
// a change is detected if the node changed recently,
// even if the revisions are well in the past
@@ -883,9 +317,9 @@ public class MongoMK implements MicroKer
if (!PathUtils.isAbsolute(path)) {
throw new MicroKernelException("Path is not absolute: " + path);
}
- revisionId = revisionId != null ? revisionId : headRevision.toString();
+ revisionId = revisionId != null ? revisionId : nodeStore.getHeadRevision().toString();
Revision rev = Revision.fromString(revisionId);
- Node n = getNode(path, rev);
+ Node n = nodeStore.getNode(path, rev);
return n != null;
}
@@ -903,9 +337,9 @@ public class MongoMK implements MicroKer
if (depth != 0) {
throw new MicroKernelException("Only depth 0 is supported, depth is " + depth);
}
- revisionId = revisionId != null ? revisionId : headRevision.toString();
+ revisionId = revisionId != null ? revisionId : nodeStore.getHeadRevision().toString();
Revision rev = Revision.fromString(revisionId);
- Node n = getNode(path, rev);
+ Node n = nodeStore.getNode(path, rev);
if (n == null) {
return null;
}
@@ -923,7 +357,7 @@ public class MongoMK implements MicroKer
long m = ((long) maxChildNodes) + offset;
max = (int) Math.min(m, Integer.MAX_VALUE);
}
- Children c = getChildren(path, rev, max);
+ Children c = nodeStore.getChildren(path, rev, max);
for (long i = offset; i < c.children.size(); i++) {
if (maxChildNodes-- <= 0) {
break;
@@ -942,228 +376,116 @@ public class MongoMK implements MicroKer
}
@Override
- public synchronized String commit(String rootPath, String json, String baseRevId,
+ public String commit(String rootPath, String json, String baseRevId,
String message) throws MicroKernelException {
- Revision baseRev;
- if (baseRevId == null) {
- baseRev = headRevision;
- baseRevId = baseRev.toString();
- } else {
- baseRev = Revision.fromString(baseRevId);
- }
- JsopReader t = new JsopTokenizer(json);
- Revision rev = newRevision();
- Commit commit = new Commit(this, baseRev, rev);
- while (true) {
- int r = t.read();
- if (r == JsopReader.END) {
- break;
+ synchronized (nodeStore) {
+ Revision baseRev;
+ if (baseRevId == null) {
+ baseRev = nodeStore.getHeadRevision();
+ baseRevId = baseRev.toString();
+ } else {
+ baseRev = Revision.fromString(baseRevId);
}
- String path = PathUtils.concat(rootPath, t.readString());
- switch (r) {
- case '+':
- t.read(':');
- t.read('{');
- parseAddNode(commit, t, path);
- break;
- case '-':
- commit.removeNode(path);
- markAsDeleted(path, commit, true);
- commit.removeNodeDiff(path);
- break;
- case '^':
- t.read(':');
- String value;
- if (t.matches(JsopReader.NULL)) {
- value = null;
- } else {
- value = t.readRawValue().trim();
- }
- String p = PathUtils.getParentPath(path);
- String propertyName = PathUtils.getName(path);
- commit.updateProperty(p, propertyName, value);
- commit.updatePropertyDiff(p, propertyName, value);
- break;
- case '>': {
- // TODO support moving nodes that were modified within this commit
- t.read(':');
- String sourcePath = path;
- String targetPath = t.readString();
- if (!PathUtils.isAbsolute(targetPath)) {
- targetPath = PathUtils.concat(rootPath, targetPath);
- }
- if (!nodeExists(sourcePath, baseRevId)) {
- throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
- } else if (nodeExists(targetPath, baseRevId)) {
- throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+ JsopReader t = new JsopTokenizer(json);
+ Revision rev = nodeStore.newRevision();
+ Commit commit = new Commit(nodeStore, baseRev, rev);
+ while (true) {
+ int r = t.read();
+ if (r == JsopReader.END) {
+ break;
+ }
+ String path = PathUtils.concat(rootPath, t.readString());
+ switch (r) {
+ case '+':
+ t.read(':');
+ t.read('{');
+ parseAddNode(commit, t, path);
+ break;
+ case '-':
+ commit.removeNode(path);
+ nodeStore.markAsDeleted(path, commit, true);
+ commit.removeNodeDiff(path);
+ break;
+ case '^':
+ t.read(':');
+ String value;
+ if (t.matches(JsopReader.NULL)) {
+ value = null;
+ } else {
+ value = t.readRawValue().trim();
+ }
+ String p = PathUtils.getParentPath(path);
+ String propertyName = PathUtils.getName(path);
+ commit.updateProperty(p, propertyName, value);
+ commit.updatePropertyDiff(p, propertyName, value);
+ break;
+ case '>': {
+ // TODO support moving nodes that were modified within this commit
+ t.read(':');
+ String sourcePath = path;
+ String targetPath = t.readString();
+ if (!PathUtils.isAbsolute(targetPath)) {
+ targetPath = PathUtils.concat(rootPath, targetPath);
+ }
+ if (!nodeExists(sourcePath, baseRevId)) {
+ throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+ } else if (nodeExists(targetPath, baseRevId)) {
+ throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+ }
+ commit.moveNode(sourcePath, targetPath);
+ nodeStore.moveNode(sourcePath, targetPath, commit);
+ break;
+ }
+ case '*': {
+ // TODO support copying nodes that were modified within this commit
+ t.read(':');
+ String sourcePath = path;
+ String targetPath = t.readString();
+ if (!PathUtils.isAbsolute(targetPath)) {
+ targetPath = PathUtils.concat(rootPath, targetPath);
+ }
+ if (!nodeExists(sourcePath, baseRevId)) {
+ throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
+ } else if (nodeExists(targetPath, baseRevId)) {
+ throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+ }
+ commit.copyNode(sourcePath, targetPath);
+ nodeStore.copyNode(sourcePath, targetPath, commit);
+ break;
+ }
+ default:
+ throw new MicroKernelException("token: " + (char) t.getTokenType());
}
- commit.moveNode(sourcePath, targetPath);
- moveNode(sourcePath, targetPath, commit);
- break;
}
- case '*': {
- // TODO support copying nodes that were modified within this commit
- t.read(':');
- String sourcePath = path;
- String targetPath = t.readString();
- if (!PathUtils.isAbsolute(targetPath)) {
- targetPath = PathUtils.concat(rootPath, targetPath);
- }
- if (!nodeExists(sourcePath, baseRevId)) {
- throw new MicroKernelException("Node not found: " + sourcePath + " in revision " + baseRevId);
- } else if (nodeExists(targetPath, baseRevId)) {
- throw new MicroKernelException("Node already exists: " + targetPath + " in revision " + baseRevId);
+ if (baseRev.isBranch()) {
+ rev = rev.asBranchRevision();
+ // remember branch commit
+ Branch b = nodeStore.getBranches().getBranch(baseRev);
+ if (b == null) {
+ // baseRev is marker for new branch
+ b = nodeStore.getBranches().create(baseRev.asTrunkRevision(), rev);
+ } else {
+ b.addCommit(rev);
}
- commit.copyNode(sourcePath, targetPath);
- copyNode(sourcePath, targetPath, commit);
- break;
- }
- default:
- throw new MicroKernelException("token: " + (char) t.getTokenType());
- }
- }
- if (baseRev.isBranch()) {
- rev = rev.asBranchRevision();
- // remember branch commit
- Branch b = branches.getBranch(baseRev);
- if (b == null) {
- // baseRev is marker for new branch
- b = branches.create(baseRev.asTrunkRevision(), rev);
- } else {
- b.addCommit(rev);
- }
- boolean success = false;
- try {
- // prepare commit
- commit.prepare(baseRev);
- success = true;
- } finally {
- if (!success) {
- b.removeCommit(rev);
- if (!b.hasCommits()) {
- branches.remove(b);
+ boolean success = false;
+ try {
+ // prepare commit
+ commit.prepare(baseRev);
+ success = true;
+ } finally {
+ if (!success) {
+ b.removeCommit(rev);
+ if (!b.hasCommits()) {
+ nodeStore.getBranches().remove(b);
+ }
}
}
- }
-
- return rev.toString();
- } else {
- commit.apply();
- headRevision = commit.getRevision();
- return rev.toString();
- }
- }
-
- //------------------------< RevisionContext >-------------------------------
- @Override
- public UnmergedBranches getBranches() {
- return branches;
- }
-
- @Override
- public UnsavedModifications getPendingModifications() {
- return unsavedLastRevisions;
- }
-
- @Override
- public RevisionComparator getRevisionComparator() {
- return revisionComparator;
- }
-
- @Override
- public void publishRevision(Revision foreignRevision, Revision changeRevision) {
- RevisionComparator revisionComparator = getRevisionComparator();
- if (revisionComparator.compare(headRevision, foreignRevision) >= 0) {
- // already visible
- return;
- }
- int clusterNodeId = foreignRevision.getClusterId();
- if (clusterNodeId == this.clusterId) {
- return;
- }
- // the (old) head occurred first
- Revision headSeen = Revision.newRevision(0);
- // then we saw this new revision (from another cluster node)
- Revision otherSeen = Revision.newRevision(0);
- // and after that, the current change
- Revision changeSeen = Revision.newRevision(0);
- revisionComparator.add(foreignRevision, otherSeen);
- // TODO invalidating the whole cache is not really needed,
- // but how to ensure we invalidate the right part of the cache?
- // possibly simply wait for the background thread to pick
- // up the changes, but this depends on how often this method is called
- store.invalidateCache();
- // the latest revisions of the current cluster node
- // happened before the latest revisions of other cluster nodes
- revisionComparator.add(headRevision, headSeen);
- revisionComparator.add(changeRevision, changeSeen);
- // the head revision is after other revisions
- headRevision = Revision.newRevision(clusterId);
- }
-
- @Override
- public int getClusterId() {
- return clusterId;
- }
-
- private void copyNode(String sourcePath, String targetPath, Commit commit) {
- moveOrCopyNode(false, sourcePath, targetPath, commit);
- }
-
- private void moveNode(String sourcePath, String targetPath, Commit commit) {
- moveOrCopyNode(true, sourcePath, targetPath, commit);
- }
-
- private void moveOrCopyNode(boolean move,
- String sourcePath,
- String targetPath,
- Commit commit) {
- // TODO Optimize - Move logic would not work well with very move of very large subtrees
- // At minimum we can optimize by traversing breadth wise and collect node id
- // and fetch them via '$in' queries
-
- // TODO Transient Node - Current logic does not account for operations which are part
- // of this commit i.e. transient nodes. If its required it would need to be looked
- // into
-
- Node n = getNode(sourcePath, commit.getBaseRevision());
-
- // Node might be deleted already
- if (n == null) {
- return;
- }
-
- Node newNode = new Node(targetPath, commit.getRevision());
- n.copyTo(newNode);
-
- commit.addNode(newNode);
- if (move) {
- markAsDeleted(sourcePath, commit, false);
- }
- Node.Children c = getChildren(sourcePath, commit.getBaseRevision(), Integer.MAX_VALUE);
- for (String srcChildPath : c.children) {
- String childName = PathUtils.getName(srcChildPath);
- String destChildPath = PathUtils.concat(targetPath, childName);
- moveOrCopyNode(move, srcChildPath, destChildPath, commit);
- }
- }
-
- private void markAsDeleted(String path, Commit commit, boolean subTreeAlso) {
- Revision rev = commit.getBaseRevision();
- checkState(rev != null, "Base revision of commit must not be null");
- commit.removeNode(path);
-
- if (subTreeAlso) {
- // recurse down the tree
- // TODO causes issue with large number of children
- Node n = getNode(path, rev);
-
- if (n != null) {
- Node.Children c = getChildren(path, rev, Integer.MAX_VALUE);
- for (String childPath : c.children) {
- markAsDeleted(childPath, commit, true);
- }
+ return rev.toString();
+ } else {
+ commit.apply();
+ nodeStore.setHeadRevision(commit.getRevision());
+ return rev.toString();
}
}
}
@@ -1193,42 +515,44 @@ public class MongoMK implements MicroKer
// nothing is written when the branch is created, the returned
// revision simply acts as a reference to the branch base revision
Revision revision = trunkRevisionId != null
- ? Revision.fromString(trunkRevisionId) : headRevision;
+ ? Revision.fromString(trunkRevisionId) : nodeStore.getHeadRevision();
return revision.asBranchRevision().toString();
}
@Override
- public synchronized String merge(String branchRevisionId, String message)
+ public String merge(String branchRevisionId, String message)
throws MicroKernelException {
- // TODO improve implementation if needed
- Revision revision = Revision.fromString(branchRevisionId);
- if (!revision.isBranch()) {
- throw new MicroKernelException("Not a branch: " + branchRevisionId);
- }
-
- // make branch commits visible
- UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
- Branch b = branches.getBranch(revision);
- Revision mergeCommit = newRevision();
- NodeDocument.setModified(op, mergeCommit);
- if (b != null) {
- for (Revision rev : b.getCommits()) {
- rev = rev.asTrunkRevision();
- NodeDocument.setRevision(op, rev, "c-" + mergeCommit.toString());
- op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
- }
- if (store.findAndUpdate(Collection.NODES, op) != null) {
- // remove from branchCommits map after successful update
- b.applyTo(unsavedLastRevisions, mergeCommit);
- branches.remove(b);
+ synchronized (nodeStore) {
+ // TODO improve implementation if needed
+ Revision revision = Revision.fromString(branchRevisionId);
+ if (!revision.isBranch()) {
+ throw new MicroKernelException("Not a branch: " + branchRevisionId);
+ }
+
+ // make branch commits visible
+ UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
+ Branch b = nodeStore.getBranches().getBranch(revision);
+ Revision mergeCommit = nodeStore.newRevision();
+ NodeDocument.setModified(op, mergeCommit);
+ if (b != null) {
+ for (Revision rev : b.getCommits()) {
+ rev = rev.asTrunkRevision();
+ NodeDocument.setRevision(op, rev, "c-" + mergeCommit.toString());
+ op.containsMapEntry(NodeDocument.COLLISIONS, rev, false);
+ }
+ if (store.findAndUpdate(Collection.NODES, op) != null) {
+ // remove from branchCommits map after successful update
+ b.applyTo(nodeStore.getPendingModifications(), mergeCommit);
+ nodeStore.getBranches().remove(b);
+ } else {
+ throw new MicroKernelException("Conflicting concurrent change. Update operation failed: " + op);
+ }
} else {
- throw new MicroKernelException("Conflicting concurrent change. Update operation failed: " + op);
+ // no commits in this branch -> do nothing
}
- } else {
- // no commits in this branch -> do nothing
+ nodeStore.setHeadRevision(mergeCommit);
+ return mergeCommit.toString();
}
- headRevision = mergeCommit;
- return headRevision.toString();
}
@Override
@@ -1240,8 +564,8 @@ public class MongoMK implements MicroKer
Revision r = Revision.fromString(branchRevisionId);
Revision base = newBaseRevisionId != null ?
Revision.fromString(newBaseRevisionId) :
- headRevision;
- Branch b = branches.getBranch(r);
+ nodeStore.getHeadRevision();
+ Branch b = nodeStore.getBranches().getBranch(r);
if (b == null) {
// empty branch
return base.asBranchRevision().toString();
@@ -1251,7 +575,7 @@ public class MongoMK implements MicroKer
}
// add a pseudo commit to make sure current head of branch
// has a higher revision than base of branch
- Revision head = newRevision().asBranchRevision();
+ Revision head = nodeStore.newRevision().asBranchRevision();
b.rebase(head, base);
return head.toString();
}
@@ -1288,118 +612,12 @@ public class MongoMK implements MicroKer
return store;
}
- public void setAsyncDelay(int delay) {
- this.asyncDelay = delay;
- }
-
- public int getAsyncDelay() {
- return asyncDelay;
- }
-
- /**
- * Apply the changes of a node to the cache.
- *
- * @param rev the revision
- * @param path the path
- * @param isNew whether this is a new node
- * @param isDelete whether the node is deleted
- * @param isWritten whether the MongoDB documented was added / updated
- * @param isBranchCommit whether this is from a branch commit
- * @param added the list of added child nodes
- * @param removed the list of removed child nodes
- *
- */
- public void applyChanges(Revision rev, String path,
- boolean isNew, boolean isDelete, boolean isWritten,
- boolean isBranchCommit, ArrayList<String> added,
- ArrayList<String> removed) {
- UnsavedModifications unsaved = unsavedLastRevisions;
- if (isBranchCommit) {
- Revision branchRev = rev.asBranchRevision();
- unsaved = branches.getBranch(branchRev).getModifications(branchRev);
- }
- // track unsaved modifications of nodes that were not
- // written in the commit (implicitly modified parent)
- // or any modification if this is a branch commit
- if (!isWritten || isBranchCommit) {
- Revision prev = unsaved.put(path, rev);
- if (prev != null) {
- if (isRevisionNewer(prev, rev)) {
- // revert
- unsaved.put(path, prev);
- String msg = String.format("Attempt to update " +
- "unsavedLastRevision for %s with %s, which is " +
- "older than current %s.",
- path, rev, prev);
- throw new MicroKernelException(msg);
- }
- }
- } else {
- // the document was updated:
- // we no longer need to update it in a background process
- unsaved.remove(path);
- }
- String key = path + "@" + rev;
- Children c = nodeChildrenCache.getIfPresent(key);
- if (isNew || (!isDelete && c != null)) {
- Children c2 = new Children();
- TreeSet<String> set = new TreeSet<String>();
- if (c != null) {
- set.addAll(c.children);
- }
- set.removeAll(removed);
- for (String name : added) {
- // make sure the name string does not contain
- // unnecessary baggage
- set.add(new String(name));
- }
- c2.children.addAll(set);
- nodeChildrenCache.put(key, c2);
- }
- if (!added.isEmpty()) {
- NodeDocument.Children docChildren = docChildrenCache.getIfPresent(path);
- if (docChildren != null) {
- int currentSize = docChildren.childNames.size();
- TreeSet<String> names = new TreeSet<String>(docChildren.childNames);
- // incomplete cache entries must not be updated with
- // names at the end of the list because there might be
- // a next name in MongoDB smaller than the one added
- if (!docChildren.isComplete) {
- for (String childPath : added) {
- String name = PathUtils.getName(childPath);
- if (names.higher(name) != null) {
- // make sure the name string does not contain
- // unnecessary baggage
- names.add(new String(name));
- }
- }
- } else {
- // add all
- for (String childPath : added) {
- // make sure the name string does not contain
- // unnecessary baggage
- names.add(new String(PathUtils.getName(childPath)));
- }
- }
- // any changes?
- if (names.size() != currentSize) {
- // create new cache entry with updated names
- boolean complete = docChildren.isComplete;
- docChildren = new NodeDocument.Children();
- docChildren.isComplete = complete;
- docChildren.childNames.addAll(names);
- docChildrenCache.put(path, docChildren);
- }
- }
- }
- }
-
public CacheStats getNodeCacheStats() {
- return nodeCacheStats;
+ return nodeStore.getNodeCacheStats();
}
public CacheStats getNodeChildrenCacheStats() {
- return nodeChildrenCacheStats;
+ return nodeStore.getNodeChildrenCacheStats();
}
public CacheStats getDiffCacheStats() {
@@ -1407,65 +625,20 @@ public class MongoMK implements MicroKer
}
public CacheStats getDocChildrenCacheStats() {
- return docChildrenCacheStats;
- }
-
- public ClusterNodeInfo getClusterInfo() {
- return clusterNodeInfo;
- }
-
- public int getPendingWriteCount() {
- return unsavedLastRevisions.getPaths().size();
+ return nodeStore.getDocChildrenCacheStats();
}
public boolean isCached(String path) {
return store.isCached(Collection.NODES, Utils.getIdFromPath(path));
}
- public void stopBackground() {
- stopBackground = true;
- }
-
- /**
- * A background thread.
- */
- static class BackgroundOperation implements Runnable {
- final WeakReference<MongoMK> ref;
- private final AtomicBoolean isDisposed;
- private int delay;
-
- BackgroundOperation(MongoMK mk, AtomicBoolean isDisposed) {
- ref = new WeakReference<MongoMK>(mk);
- delay = mk.getAsyncDelay();
- this.isDisposed = isDisposed;
- }
-
- @Override
- public void run() {
- while (delay != 0 && !isDisposed.get()) {
- synchronized (isDisposed) {
- try {
- isDisposed.wait(delay);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- MongoMK mk = ref.get();
- if (mk != null) {
- mk.runBackgroundOperations();
- delay = mk.getAsyncDelay();
- }
- }
- }
- }
-
/**
* A (cached) result of the diff operation.
*/
private static class Diff implements CacheValue {
-
+
final String diff;
-
+
Diff(String diff) {
this.diff = diff;
}
@@ -1474,7 +647,7 @@ public class MongoMK implements MicroKer
public int getMemory() {
return diff.length() * 2;
}
-
+
}
/**
@@ -1482,6 +655,7 @@ public class MongoMK implements MicroKer
*/
public static class Builder {
private static final long DEFAULT_MEMORY_CACHE_SIZE = 256 * 1024 * 1024;
+ private MongoNodeStore nodeStore;
private DocumentStore documentStore;
private BlobStore blobStore;
private int clusterId = Integer.getInteger("oak.mongoMK.clusterId", 0);
@@ -1557,6 +731,13 @@ public class MongoMK implements MicroKer
return documentStore;
}
+ public MongoNodeStore getNodeStore() {
+ if (nodeStore == null) {
+ nodeStore = new MongoNodeStore(this);
+ }
+ return nodeStore;
+ }
+
/**
* Set the blob store to use. By default an in-memory store is used.
*