You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/03/13 11:41:01 UTC

svn commit: r1455872 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: MongoJournal.java MongoStore.java

Author: jukka
Date: Wed Mar 13 10:41:01 2013
New Revision: 1455872

URL: http://svn.apache.org/r1455872
Log:
OAK-593: Segment-based MK

Cache Mongo journal documents locally

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java?rev=1455872&r1=1455871&r2=1455872&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java Wed Mar 13 10:41:01 2013
@@ -23,6 +23,8 @@ import static com.google.common.collect.
 import static com.mongodb.ReadPreference.nearest;
 import static com.mongodb.ReadPreference.primaryPreferred;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
@@ -30,11 +32,15 @@ import com.mongodb.BasicDBObject;
 import com.mongodb.BasicDBObjectBuilder;
 import com.mongodb.DBCollection;
 import com.mongodb.DBObject;
+import com.mongodb.MongoException;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
 
 class MongoJournal implements Journal {
 
+    private static final long UPDATE_INTERVAL =
+            TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
     private final SegmentStore store;
 
     private final DBCollection journals;
@@ -43,6 +49,10 @@ class MongoJournal implements Journal {
 
     private final String name;
 
+    private DBObject state;
+
+    private long stateLastUpdated;
+
     MongoJournal(
             SegmentStore store, DBCollection journals,
             WriteConcern concern, NodeState root) {
@@ -52,8 +62,7 @@ class MongoJournal implements Journal {
         this.name = "root";
 
         DBObject id = new BasicDBObject("_id", "root");
-        DBObject fields = new BasicDBObject("head", 1);
-        DBObject state = journals.findOne(id, fields, primaryPreferred());
+        state = journals.findOne(id, null, primaryPreferred());
         if (state == null) {
             SegmentWriter writer = new SegmentWriter(store);
             RecordId head = writer.writeNode(root).getRecordId();
@@ -61,8 +70,16 @@ class MongoJournal implements Journal {
             state = new BasicDBObject(of(
                     "_id",  "root",
                     "head", head.toString()));
-            journals.insert(state, concern);
+            try {
+                journals.insert(state, concern);
+            } catch (MongoException.DuplicateKey e) {
+                // Someone else managed to concurrently create the journal,
+                // so let's just re-read it from the database
+                state = journals.findOne(id, null, primaryPreferred());
+                checkState(state != null);
+            }
         }
+        stateLastUpdated = System.nanoTime();
     }
 
     MongoJournal(
@@ -75,7 +92,7 @@ class MongoJournal implements Journal {
         checkArgument(!"root".equals(name));
 
         DBObject id = new BasicDBObject("_id", name);
-        DBObject state = journals.findOne(id, null, primaryPreferred());
+        state = journals.findOne(id, null, primaryPreferred());
         if (state == null) {
             Journal root = store.getJournal("root");
             String head = root.getHead().toString();
@@ -84,27 +101,37 @@ class MongoJournal implements Journal {
                     "parent", "root",
                     "base",   head,
                     "head",   head));
-            journals.insert(state, concern);
+            try {
+                journals.insert(state, concern);
+            } catch (MongoException.DuplicateKey e) {
+                // Someone else managed to concurrently create the journal,
+                // so let's just re-read it from the database
+                state = journals.findOne(id, null, primaryPreferred());
+                checkState(state != null);
+            }
         }
+        stateLastUpdated = System.nanoTime();
     }
 
     @Override
     public RecordId getHead() {
-        DBObject id = new BasicDBObject("_id", name);
-        DBObject state = journals.findOne(id, null, nearest());
-        if (state == null) {
-            state = journals.findOne(id, null, primaryPreferred());
+        long now = System.nanoTime();
+        if (stateLastUpdated + UPDATE_INTERVAL < now) {
+            DBObject id = new BasicDBObject("_id", name);
+            DBObject freshState = journals.findOne(id, null, nearest());
+            if (freshState == null) {
+                freshState = journals.findOne(id, null, primaryPreferred());
+                checkState(freshState != null);
+            }
+            state = freshState;
+            stateLastUpdated = now;
         }
-        checkState(state != null);
         return RecordId.fromString(state.get("head").toString());
     }
 
     @Override
     public synchronized boolean setHead(RecordId base, RecordId head) {
-        DBObject id = new BasicDBObject("_id", name);
-        DBObject state = journals.findOne(id, null, primaryPreferred());
-        checkState(state != null);
-        if (!base.toString().equals(state.get("head"))) {
+        if (!base.equals(getHead())) {
             return false;
         }
 
@@ -121,7 +148,15 @@ class MongoJournal implements Journal {
 
         WriteResult result =
                 journals.update(state, nextState, false, false, concern);
-        return result.getN() != 0;
+        if (result.getN() == 1) {
+            state = nextState;
+            stateLastUpdated = System.nanoTime();
+            return true;
+        } else {
+            // force refresh when next accessed
+            stateLastUpdated -= UPDATE_INTERVAL;
+            return false;
+        }
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java?rev=1455872&r1=1455871&r2=1455872&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java Wed Mar 13 10:41:01 2013
@@ -16,16 +16,19 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.ImmutableMap.of;
 import static com.mongodb.ReadPreference.nearest;
 import static com.mongodb.ReadPreference.primary;
 import static org.apache.jackrabbit.oak.plugins.memory.MemoryNodeState.EMPTY_NODE;
 
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
 import com.mongodb.DBCollection;
@@ -37,17 +40,20 @@ public class MongoStore implements Segme
 
     private final WriteConcern concern = WriteConcern.SAFE; // TODO: MAJORITY?
 
+    private final DB db;
+
     private final DBCollection segments;
 
-    private final DBCollection journals;
+    private final Map<String, Journal> journals = Maps.newHashMap();
 
     private final SegmentCache cache;
 
     public MongoStore(DB db, SegmentCache cache) {
+        this.db = checkNotNull(db);
         this.segments = db.getCollection("segments");
-        this.journals = db.getCollection("journals");
-
         this.cache = cache;
+        journals.put("root", new MongoJournal(
+                this, db.getCollection("journals"), concern, EMPTY_NODE));
     }
 
     public MongoStore(DB db, long cacheSize) {
@@ -60,12 +66,14 @@ public class MongoStore implements Segme
     }
 
     @Override
-    public Journal getJournal(String name) {
-        if ("root".equals(name)) {
-            return new MongoJournal(this, journals, concern, EMPTY_NODE);
-        } else {
-            return new MongoJournal(this, journals, concern, name);
+    public synchronized Journal getJournal(String name) {
+        Journal journal = journals.get(name);
+        if (journal == null) {
+            journal = new MongoJournal(
+                    this, db.getCollection("journals"), concern, name);
+            journals.put(name, journal);
         }
+        return journal;
     }
 
     @Override