You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/02/20 08:07:18 UTC
svn commit: r1448015 - in
/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk:
impl/MongoMicroKernel.java prototype/Commit.java
prototype/MongoDocumentStore.java prototype/MongoMK.java
Author: thomasm
Date: Wed Feb 20 07:07:18 2013
New Revision: 1448015
URL: http://svn.apache.org/r1448015
Log:
OAK-619 Lock-free MongoMK implementation (WIP)
Modified:
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java Wed Feb 20 07:07:18 2013
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.mk.util.Nod
import org.apache.jackrabbit.mongomk.api.NodeStore;
import org.apache.jackrabbit.mongomk.api.model.Commit;
import org.apache.jackrabbit.mongomk.api.model.Node;
+import org.apache.jackrabbit.mongomk.impl.blob.MongoBlobStore;
import org.apache.jackrabbit.mongomk.impl.json.JsonUtil;
import org.apache.jackrabbit.mongomk.impl.model.CommitBuilder;
import org.apache.jackrabbit.mongomk.impl.model.MongoCommit;
@@ -48,6 +49,16 @@ public class MongoMicroKernel implements
private final MongoConnection mongoConnection;
private final BlobStore blobStore;
private final NodeStore nodeStore;
+
+ /**
+ * Constructs a new {@code MongoMicroKernel}.
+ *
+ * @param conn Connection to MongoDB.
+ */
+ public MongoMicroKernel(MongoConnection conn) {
+ this(conn, new MongoNodeStore(conn.getDB()),
+ new MongoBlobStore(conn.getDB()));
+ }
/**
* Constructs a new {@code MongoMicroKernel}.
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java Wed Feb 20 07:07:18 2013
@@ -127,7 +127,12 @@ public class Commit {
private void addChangedParent(String path) {
while (true) {
- changedParents.add(path);
+ UpdateOp op = operations.get(path);
+ if (op == null || !op.isNew) {
+ // no need to update the write count
+ // for new nodes
+ changedParents.add(path);
+ }
if (PathUtils.denotesRoot(path)) {
break;
}
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java Wed Feb 20 07:07:18 2013
@@ -17,14 +17,13 @@
package org.apache.jackrabbit.mongomk.prototype;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.mongomk.prototype.UpdateOp.Operation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
@@ -39,54 +38,91 @@ public class MongoDocumentStore implemen
public static final String KEY_PATH = "_id";
- private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class);
-
private final DBCollection nodesCollection;
+
+ private final boolean LOG = false;
+ private final boolean LOG_TIME = true;
+ private long time;
public MongoDocumentStore(DB db) {
nodesCollection = db.getCollection(Collection.NODES.toString());
ensureIndex();
}
+
+ private long start() {
+ return LOG_TIME ? System.currentTimeMillis() : 0;
+ }
+
+ private void end(long start) {
+ if (LOG_TIME) {
+ time += System.currentTimeMillis() - start;
+ }
+ }
+
+ public void finalize() {
+ // TODO should not be needed, but it seems
+ // oak-jcr doesn't call dispose()
+ dispose();
+ }
@Override
public Map<String, Object> find(Collection collection, String path) {
+ log("find", path);
DBCollection dbCollection = getDBCollection(collection);
- DBObject doc = dbCollection.findOne(getByPathQuery(path));
- if (doc == null) {
- return null;
+ long start = start();
+ try {
+ DBObject doc = dbCollection.findOne(getByPathQuery(path));
+ if (doc == null) {
+ return null;
+ }
+ return convertFromDBObject(doc);
+ } finally {
+ end(start);
}
- return convertFromDBObject(doc);
}
@Override
public List<Map<String, Object>> query(Collection collection,
String fromKey, String toKey, int limit) {
+ log("query", fromKey, toKey, limit);
DBCollection dbCollection = getDBCollection(collection);
QueryBuilder queryBuilder = QueryBuilder.start(KEY_PATH);
queryBuilder.greaterThanEquals(fromKey);
queryBuilder.lessThan(toKey);
DBObject query = queryBuilder.get();
- DBCursor cursor = dbCollection.find(query);
- List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
- for (int i=0; i<limit && cursor.hasNext(); i++) {
- DBObject o = cursor.next();
- Map<String, Object> map = convertFromDBObject(o);
- list.add(map);
+ long start = start();
+ try {
+ DBCursor cursor = dbCollection.find(query);
+ List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+ for (int i=0; i<limit && cursor.hasNext(); i++) {
+ DBObject o = cursor.next();
+ Map<String, Object> map = convertFromDBObject(o);
+ list.add(map);
+ }
+ return list;
+ } finally {
+ end(start);
}
- return list;
}
@Override
public void remove(Collection collection, String path) {
+ log("remove", path);
DBCollection dbCollection = getDBCollection(collection);
- WriteResult writeResult = dbCollection.remove(getByPathQuery(path), WriteConcern.SAFE);
- if (writeResult.getError() != null) {
- LOG.error("Remove failed: {}", writeResult.getError());
+ long start = start();
+ try {
+ WriteResult writeResult = dbCollection.remove(getByPathQuery(path), WriteConcern.SAFE);
+ if (writeResult.getError() != null) {
+ throw new MicroKernelException("Remove failed: " + writeResult.getError());
+ }
+ } finally {
+ end(start);
}
}
@Override
public Map<String, Object> createOrUpdate(Collection collection, UpdateOp updateOp) {
+ log("createOrUpdate", updateOp);
DBCollection dbCollection = getDBCollection(collection);
BasicDBObject setUpdates = new BasicDBObject();
@@ -128,6 +164,7 @@ public class MongoDocumentStore implemen
// WriteConcern.SAFE);
// return null;
+ long start = start();
try {
DBObject oldNode = dbCollection.findAndModify(query, null /*fields*/,
null /*sort*/, false /*remove*/, update, false /*returnNew*/,
@@ -135,11 +172,14 @@ public class MongoDocumentStore implemen
return convertFromDBObject(oldNode);
} catch (Exception e) {
throw new MicroKernelException(e);
+ } finally {
+ end(start);
}
}
@Override
public void create(Collection collection, List<UpdateOp> updateOps) {
+ log("create", updateOps);
DBObject[] inserts = new DBObject[updateOps.size()];
for (int i = 0; i < updateOps.size(); i++) {
@@ -170,10 +210,15 @@ public class MongoDocumentStore implemen
}
DBCollection dbCollection = getDBCollection(collection);
- WriteResult writeResult = dbCollection.insert(inserts, WriteConcern.SAFE);
- if (writeResult.getError() != null) {
- LOG.error("Batch create failed: {}", writeResult.getError());
- }
+ long start = start();
+ try {
+ WriteResult writeResult = dbCollection.insert(inserts, WriteConcern.SAFE);
+ if (writeResult.getError() != null) {
+ throw new MicroKernelException("Batch create failed: " + writeResult.getError());
+ }
+ } finally {
+ end(start);
+ }
}
private void ensureIndex() {
@@ -213,7 +258,16 @@ public class MongoDocumentStore implemen
@Override
public void dispose() {
+ if (LOG_TIME) {
+ System.out.println("MongoDB time: " + time);
+ }
nodesCollection.getDB().getMongo().close();
}
+
+ private void log(Object... args) {
+ if (LOG) {
+ System.out.println(Arrays.toString(args));
+ }
+ }
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java Wed Feb 20 07:07:18 2013
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.mongomk.prototype;
import java.io.InputStream;
+import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -134,20 +135,11 @@ public class MongoMK implements MicroKer
this.store = store;
this.blobStore = blobStore;
this.clusterId = clusterId;
- backgroundThread = new Thread(new Runnable() {
- public void run() {
- while (!isDisposed.get()) {
- synchronized (isDisposed) {
- try {
- isDisposed.wait(ASYNC_DELAY);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- runBackgroundOperations();
- }
- }
- }, "MongoMK background thread");
+ // ensure the MK can be garbage collected
+ final WeakReference<MongoMK> ref = new WeakReference<MongoMK>(this);
+ backgroundThread = new Thread(
+ new BackgroundOperation(this, isDisposed),
+ "MongoMK background thread");
backgroundThread.setDaemon(true);
backgroundThread.start();
headRevision = Revision.newRevision(clusterId);
@@ -568,6 +560,10 @@ public class MongoMK implements MicroKer
}
}
+ public DocumentStore getDocumentStore() {
+ return store;
+ }
+
static class Cache<K, V> extends LinkedHashMap<K, V> {
private static final long serialVersionUID = 1L;
@@ -584,8 +580,28 @@ public class MongoMK implements MicroKer
}
- public DocumentStore getDocumentStore() {
- return store;
+ static class BackgroundOperation implements Runnable {
+ private final AtomicBoolean isDisposed;
+ final WeakReference<MongoMK> ref;
+ BackgroundOperation(MongoMK mk, AtomicBoolean isDisposed) {
+ ref = new WeakReference<MongoMK>(mk);
+ this.isDisposed = isDisposed;
+ }
+ public void run() {
+ while (!isDisposed.get()) {
+ synchronized (isDisposed) {
+ try {
+ isDisposed.wait(ASYNC_DELAY);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ MongoMK mk = ref.get();
+ if (mk != null) {
+ mk.runBackgroundOperations();
+ }
+ }
+ }
}
}