You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/03/13 11:41:01 UTC
svn commit: r1455872 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment:
MongoJournal.java MongoStore.java
Author: jukka
Date: Wed Mar 13 10:41:01 2013
New Revision: 1455872
URL: http://svn.apache.org/r1455872
Log:
OAK-593: Segment-based MK
Cache Mongo journal documents locally
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java?rev=1455872&r1=1455871&r2=1455872&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java Wed Mar 13 10:41:01 2013
@@ -23,6 +23,8 @@ import static com.google.common.collect.
import static com.mongodb.ReadPreference.nearest;
import static com.mongodb.ReadPreference.primaryPreferred;
+import java.util.concurrent.TimeUnit;
+
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -30,11 +32,15 @@ import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
+import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
class MongoJournal implements Journal {
+ private static final long UPDATE_INTERVAL =
+ TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
private final SegmentStore store;
private final DBCollection journals;
@@ -43,6 +49,10 @@ class MongoJournal implements Journal {
private final String name;
+ private DBObject state;
+
+ private long stateLastUpdated;
+
MongoJournal(
SegmentStore store, DBCollection journals,
WriteConcern concern, NodeState root) {
@@ -52,8 +62,7 @@ class MongoJournal implements Journal {
this.name = "root";
DBObject id = new BasicDBObject("_id", "root");
- DBObject fields = new BasicDBObject("head", 1);
- DBObject state = journals.findOne(id, fields, primaryPreferred());
+ state = journals.findOne(id, null, primaryPreferred());
if (state == null) {
SegmentWriter writer = new SegmentWriter(store);
RecordId head = writer.writeNode(root).getRecordId();
@@ -61,8 +70,16 @@ class MongoJournal implements Journal {
state = new BasicDBObject(of(
"_id", "root",
"head", head.toString()));
- journals.insert(state, concern);
+ try {
+ journals.insert(state, concern);
+ } catch (MongoException.DuplicateKey e) {
+ // Someone else managed to concurrently create the journal,
+ // so let's just re-read it from the database
+ state = journals.findOne(id, null, primaryPreferred());
+ checkState(state != null);
+ }
}
+ stateLastUpdated = System.nanoTime();
}
MongoJournal(
@@ -75,7 +92,7 @@ class MongoJournal implements Journal {
checkArgument(!"root".equals(name));
DBObject id = new BasicDBObject("_id", name);
- DBObject state = journals.findOne(id, null, primaryPreferred());
+ state = journals.findOne(id, null, primaryPreferred());
if (state == null) {
Journal root = store.getJournal("root");
String head = root.getHead().toString();
@@ -84,27 +101,37 @@ class MongoJournal implements Journal {
"parent", "root",
"base", head,
"head", head));
- journals.insert(state, concern);
+ try {
+ journals.insert(state, concern);
+ } catch (MongoException.DuplicateKey e) {
+ // Someone else managed to concurrently create the journal,
+ // so let's just re-read it from the database
+ state = journals.findOne(id, null, primaryPreferred());
+ checkState(state != null);
+ }
}
+ stateLastUpdated = System.nanoTime();
}
@Override
public RecordId getHead() {
- DBObject id = new BasicDBObject("_id", name);
- DBObject state = journals.findOne(id, null, nearest());
- if (state == null) {
- state = journals.findOne(id, null, primaryPreferred());
+ long now = System.nanoTime();
+ if (stateLastUpdated + UPDATE_INTERVAL < now) {
+ DBObject id = new BasicDBObject("_id", name);
+ DBObject freshState = journals.findOne(id, null, nearest());
+ if (freshState == null) {
+ freshState = journals.findOne(id, null, primaryPreferred());
+ checkState(freshState != null);
+ }
+ state = freshState;
+ stateLastUpdated = now;
}
- checkState(state != null);
return RecordId.fromString(state.get("head").toString());
}
@Override
public synchronized boolean setHead(RecordId base, RecordId head) {
- DBObject id = new BasicDBObject("_id", name);
- DBObject state = journals.findOne(id, null, primaryPreferred());
- checkState(state != null);
- if (!base.toString().equals(state.get("head"))) {
+ if (!base.equals(getHead())) {
return false;
}
@@ -121,7 +148,15 @@ class MongoJournal implements Journal {
WriteResult result =
journals.update(state, nextState, false, false, concern);
- return result.getN() != 0;
+ if (result.getN() == 1) {
+ state = nextState;
+ stateLastUpdated = System.nanoTime();
+ return true;
+ } else {
+ // force refresh when next accessed
+ stateLastUpdated -= UPDATE_INTERVAL;
+ return false;
+ }
}
@Override
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java?rev=1455872&r1=1455871&r2=1455872&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java Wed Mar 13 10:41:01 2013
@@ -16,16 +16,19 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.ImmutableMap.of;
import static com.mongodb.ReadPreference.nearest;
import static com.mongodb.ReadPreference.primary;
import static org.apache.jackrabbit.oak.plugins.memory.MemoryNodeState.EMPTY_NODE;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
@@ -37,17 +40,20 @@ public class MongoStore implements Segme
private final WriteConcern concern = WriteConcern.SAFE; // TODO: MAJORITY?
+ private final DB db;
+
private final DBCollection segments;
- private final DBCollection journals;
+ private final Map<String, Journal> journals = Maps.newHashMap();
private final SegmentCache cache;
public MongoStore(DB db, SegmentCache cache) {
+ this.db = checkNotNull(db);
this.segments = db.getCollection("segments");
- this.journals = db.getCollection("journals");
-
this.cache = cache;
+ journals.put("root", new MongoJournal(
+ this, db.getCollection("journals"), concern, EMPTY_NODE));
}
public MongoStore(DB db, long cacheSize) {
@@ -60,12 +66,14 @@ public class MongoStore implements Segme
}
@Override
- public Journal getJournal(String name) {
- if ("root".equals(name)) {
- return new MongoJournal(this, journals, concern, EMPTY_NODE);
- } else {
- return new MongoJournal(this, journals, concern, name);
+ public synchronized Journal getJournal(String name) {
+ Journal journal = journals.get(name);
+ if (journal == null) {
+ journal = new MongoJournal(
+ this, db.getCollection("journals"), concern, name);
+ journals.put(name, journal);
}
+ return journal;
}
@Override