You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2013/02/20 08:07:18 UTC

svn commit: r1448015 - in /jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk: impl/MongoMicroKernel.java prototype/Commit.java prototype/MongoDocumentStore.java prototype/MongoMK.java

Author: thomasm
Date: Wed Feb 20 07:07:18 2013
New Revision: 1448015

URL: http://svn.apache.org/r1448015
Log:
OAK-619 Lock-free MongoMK implementation (WIP)

Modified:
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
    jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/MongoMicroKernel.java Wed Feb 20 07:07:18 2013
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.mk.util.Nod
 import org.apache.jackrabbit.mongomk.api.NodeStore;
 import org.apache.jackrabbit.mongomk.api.model.Commit;
 import org.apache.jackrabbit.mongomk.api.model.Node;
+import org.apache.jackrabbit.mongomk.impl.blob.MongoBlobStore;
 import org.apache.jackrabbit.mongomk.impl.json.JsonUtil;
 import org.apache.jackrabbit.mongomk.impl.model.CommitBuilder;
 import org.apache.jackrabbit.mongomk.impl.model.MongoCommit;
@@ -48,6 +49,16 @@ public class MongoMicroKernel implements
     private final MongoConnection mongoConnection;
     private final BlobStore blobStore;
     private final NodeStore nodeStore;
+    
+    /**
+     * Constructs a new {@code MongoMicroKernel}.
+     * 
+     * @param conn Connection to MongoDB.
+     */
+    public MongoMicroKernel(MongoConnection conn) {
+        this(conn, new MongoNodeStore(conn.getDB()),
+                new MongoBlobStore(conn.getDB()));
+    }
 
     /**
      * Constructs a new {@code MongoMicroKernel}.

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/Commit.java Wed Feb 20 07:07:18 2013
@@ -127,7 +127,12 @@ public class Commit {
 
     private void addChangedParent(String path) {
         while (true) {
-            changedParents.add(path);
+            UpdateOp op = operations.get(path);
+            if (op == null || !op.isNew) {
+                // no need to update the write count 
+                // for new nodes
+                changedParents.add(path);
+            }
             if (PathUtils.denotesRoot(path)) {
                 break;
             }

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoDocumentStore.java Wed Feb 20 07:07:18 2013
@@ -17,14 +17,13 @@
 package org.apache.jackrabbit.mongomk.prototype;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.jackrabbit.mk.api.MicroKernelException;
 import org.apache.jackrabbit.mongomk.prototype.UpdateOp.Operation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
@@ -39,54 +38,91 @@ public class MongoDocumentStore implemen
 
     public static final String KEY_PATH = "_id";
 
-    private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class);
-
     private final DBCollection nodesCollection;
+    
+    private final boolean LOG = false;
+    private final boolean LOG_TIME = true;
+    private long time;
 
     public MongoDocumentStore(DB db) {
         nodesCollection = db.getCollection(Collection.NODES.toString());
         ensureIndex();
     }
+    
+    private long start() {
+        return LOG_TIME ? System.currentTimeMillis() : 0;
+    }
+    
+    private void end(long start) {
+        if (LOG_TIME) {
+            time += System.currentTimeMillis() - start;
+        }
+    }
+    
+    public void finalize() {
+        // TODO should not be needed, but it seems
+        // oak-jcr doesn't call dispose()
+        dispose();
+    }
 
     @Override
     public Map<String, Object> find(Collection collection, String path) {
+        log("find", path);
         DBCollection dbCollection = getDBCollection(collection);
-        DBObject doc = dbCollection.findOne(getByPathQuery(path));
-        if (doc == null) {
-            return null;
+        long start = start();
+        try {
+            DBObject doc = dbCollection.findOne(getByPathQuery(path));
+            if (doc == null) {
+                return null;
+            }
+            return convertFromDBObject(doc);
+        } finally {
+            end(start);
         }
-        return convertFromDBObject(doc);
     }
     
     @Override
     public List<Map<String, Object>> query(Collection collection,
             String fromKey, String toKey, int limit) {
+        log("query", fromKey, toKey, limit);
         DBCollection dbCollection = getDBCollection(collection);
         QueryBuilder queryBuilder = QueryBuilder.start(KEY_PATH);
         queryBuilder.greaterThanEquals(fromKey);
         queryBuilder.lessThan(toKey);
         DBObject query = queryBuilder.get();
-        DBCursor cursor = dbCollection.find(query);
-        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
-        for (int i=0; i<limit && cursor.hasNext(); i++) {
-            DBObject o = cursor.next();
-            Map<String, Object> map = convertFromDBObject(o);
-            list.add(map);
+        long start = start();
+        try {
+            DBCursor cursor = dbCollection.find(query);
+            List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+            for (int i=0; i<limit && cursor.hasNext(); i++) {
+                DBObject o = cursor.next();
+                Map<String, Object> map = convertFromDBObject(o);
+                list.add(map);
+            }
+            return list;
+        } finally {
+            end(start);
         }
-        return list;
     }
 
     @Override
     public void remove(Collection collection, String path) {
+        log("remove", path);        
         DBCollection dbCollection = getDBCollection(collection);
-        WriteResult writeResult = dbCollection.remove(getByPathQuery(path), WriteConcern.SAFE);
-        if (writeResult.getError() != null) {
-            LOG.error("Remove failed: {}", writeResult.getError());
+        long start = start();
+        try {
+            WriteResult writeResult = dbCollection.remove(getByPathQuery(path), WriteConcern.SAFE);
+            if (writeResult.getError() != null) {
+                throw new MicroKernelException("Remove failed: " + writeResult.getError());
+            }
+        } finally {
+            end(start);
         }
     }
 
     @Override
     public Map<String, Object> createOrUpdate(Collection collection, UpdateOp updateOp) {
+        log("createOrUpdate", updateOp);        
         DBCollection dbCollection = getDBCollection(collection);
 
         BasicDBObject setUpdates = new BasicDBObject();
@@ -128,6 +164,7 @@ public class MongoDocumentStore implemen
 //                WriteConcern.SAFE);
 //        return null;
 
+        long start = start();
         try {
             DBObject oldNode = dbCollection.findAndModify(query, null /*fields*/,
                     null /*sort*/, false /*remove*/, update, false /*returnNew*/,
@@ -135,11 +172,14 @@ public class MongoDocumentStore implemen
             return convertFromDBObject(oldNode);
         } catch (Exception e) {
             throw new MicroKernelException(e);
+        } finally {
+            end(start);
         }
     }
 
     @Override
     public void create(Collection collection, List<UpdateOp> updateOps) {
+        log("create", updateOps);        
         DBObject[] inserts = new DBObject[updateOps.size()];
 
         for (int i = 0; i < updateOps.size(); i++) {
@@ -170,10 +210,15 @@ public class MongoDocumentStore implemen
         }
 
         DBCollection dbCollection = getDBCollection(collection);
-        WriteResult writeResult = dbCollection.insert(inserts, WriteConcern.SAFE);
-        if (writeResult.getError() != null) {
-            LOG.error("Batch create failed: {}", writeResult.getError());
-        }
+        long start = start();
+        try {
+            WriteResult writeResult = dbCollection.insert(inserts, WriteConcern.SAFE);
+            if (writeResult.getError() != null) {
+                throw new MicroKernelException("Batch create failed: " + writeResult.getError());
+            }
+        } finally {
+            end(start);
+        }        
     }
 
     private void ensureIndex() {
@@ -213,7 +258,16 @@ public class MongoDocumentStore implemen
     
     @Override
     public void dispose() {
+        if (LOG_TIME) {
+            System.out.println("MongoDB time: " + time);
+        }
         nodesCollection.getDB().getMongo().close();
     }
+    
+    private void log(Object... args) {
+        if (LOG) {
+            System.out.println(Arrays.toString(args));
+        }
+    }
 
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java?rev=1448015&r1=1448014&r2=1448015&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java (original)
+++ jackrabbit/oak/trunk/oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/prototype/MongoMK.java Wed Feb 20 07:07:18 2013
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.mongomk.prototype;
 
 import java.io.InputStream;
+import java.lang.ref.WeakReference;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -134,20 +135,11 @@ public class MongoMK implements MicroKer
         this.store = store;
         this.blobStore = blobStore;
         this.clusterId = clusterId;
-        backgroundThread = new Thread(new Runnable() {
-            public void run() {
-                while (!isDisposed.get()) {
-                    synchronized (isDisposed) {
-                        try {
-                            isDisposed.wait(ASYNC_DELAY);
-                        } catch (InterruptedException e) {
-                            // ignore
-                        }
-                    }
-                    runBackgroundOperations();
-                }
-            }
-        }, "MongoMK background thread");
+        // ensure the MK can be garbage collected
+        final WeakReference<MongoMK> ref = new WeakReference<MongoMK>(this);
+        backgroundThread = new Thread(
+            new BackgroundOperation(this, isDisposed),
+            "MongoMK background thread");
         backgroundThread.setDaemon(true);
         backgroundThread.start();
         headRevision = Revision.newRevision(clusterId);
@@ -568,6 +560,10 @@ public class MongoMK implements MicroKer
         }
     }
 
+    public DocumentStore getDocumentStore() {
+        return store;
+    }
+    
     static class Cache<K, V> extends LinkedHashMap<K, V> {
 
         private static final long serialVersionUID = 1L;
@@ -584,8 +580,28 @@ public class MongoMK implements MicroKer
 
     }
 
-    public DocumentStore getDocumentStore() {
-        return store;
+    static class BackgroundOperation implements Runnable {
+        private final AtomicBoolean isDisposed;
+        final WeakReference<MongoMK> ref;
+        BackgroundOperation(MongoMK mk, AtomicBoolean isDisposed) {
+            ref = new WeakReference<MongoMK>(mk);
+            this.isDisposed = isDisposed;
+        }
+        public void run() {
+            while (!isDisposed.get()) {
+                synchronized (isDisposed) {
+                    try {
+                        isDisposed.wait(ASYNC_DELAY);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+                MongoMK mk = ref.get();
+                if (mk != null) {
+                    mk.runBackgroundOperations();
+                }
+            }
+        }
     }
 
 }