You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/03/12 18:24:35 UTC

svn commit: r1455635 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: MongoJournal.java MongoStore.java

Author: jukka
Date: Tue Mar 12 17:24:34 2013
New Revision: 1455635

URL: http://svn.apache.org/r1455635
Log:
OAK-633: SegmentMK: Hierarchy of journals

Add hierarchy support also for Mongo journals

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java?rev=1455635&r1=1455634&r2=1455635&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoJournal.java Tue Mar 12 17:24:34 2013
@@ -16,62 +16,126 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
-import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableMap.of;
+
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
 
-import com.google.common.collect.ImmutableMap;
 import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
 import com.mongodb.DBCollection;
 import com.mongodb.DBObject;
 
 class MongoJournal implements Journal {
 
-    private static final long UPDATE_INTERVAL =
-            TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SegmentStore store;
 
     private final DBCollection journals;
 
     private final String name;
 
-    private long nextUpdate = System.nanoTime() - 2 * UPDATE_INTERVAL;
-
-    private RecordId head;
+    MongoJournal(SegmentStore store, DBCollection journals, NodeState root) {
+        this.store = checkNotNull(store);
+        this.journals = checkNotNull(journals);
+        this.name = "root";
+
+        DBObject state = journals.findOne(new BasicDBObject("_id", "root"));
+        if (state == null) {
+            SegmentWriter writer = new SegmentWriter(store);
+            RecordId id = writer.writeNode(root).getRecordId();
+            writer.flush();
+            state = new BasicDBObject(of("_id", "root", "head", id.toString()));
+            journals.insert(state);
+        }
+    }
 
-    MongoJournal(DBCollection journals, String name) {
-        this.journals = journals;
-        this.name = name;
-        head = getHead();
+    MongoJournal(SegmentStore store, DBCollection journals, String name) {
+        this.store = checkNotNull(store);
+        this.journals = checkNotNull(journals);
+        this.name = checkNotNull(name);
+        checkArgument(!"root".equals(name));
+
+        DBObject state = journals.findOne(new BasicDBObject("_id", name));
+        if (state == null) {
+            Journal root = store.getJournal("root");
+            String head = root.getHead().toString();
+            state = new BasicDBObject(of(
+                    "_id",    name,
+                    "parent", "root",
+                    "base",   head,
+                    "head",   head));
+            journals.insert(state);
+        }
     }
 
     @Override
-    public synchronized RecordId getHead() {
-        long now = System.nanoTime();
-        if (now >= nextUpdate) {
-            DBObject journal = journals.findOne(new BasicDBObject("_id", name));
-            head = RecordId.fromString(journal.get("head").toString());
-            nextUpdate = now + UPDATE_INTERVAL;
-        }
-        return head;
+    public RecordId getHead() {
+        DBObject state = journals.findOne(new BasicDBObject("_id", name));
+        checkState(state != null);
+        return RecordId.fromString(state.get("head").toString());
     }
 
     @Override
-    public boolean setHead(RecordId base, RecordId head) {
-        DBObject baseObject = new BasicDBObject(
-                ImmutableMap.of("_id", name, "head", base.toString()));
-        DBObject headObject = new BasicDBObject(
-                ImmutableMap.of("_id", name, "head", head.toString()));
-        if (journals.findAndModify(baseObject, headObject) != null) {
-            this.head = head;
-            nextUpdate = System.nanoTime() + UPDATE_INTERVAL;
-            return true;
-        } else if (base.equals(this.head)) {
-            // force an update at next getHead() call
-            nextUpdate = System.nanoTime();
+    public synchronized boolean setHead(RecordId base, RecordId head) {
+        DBObject state = journals.findOne(new BasicDBObject("_id", name));
+        checkState(state != null);
+        if (!base.toString().equals(state.get("head"))) {
+            return false;
         }
-        return false;
+
+        BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
+        builder.add("_id", name);
+        if (state.containsField("parent")) {
+            builder.add("parent", state.get("parent"));
+        }
+        if (state.containsField("base")) {
+            builder.add("base", state.get("base"));
+        }
+        builder.add("head", head.toString());
+        DBObject nextState = builder.get();
+
+        return journals.findAndModify(state, nextState) != null;
     }
 
     @Override
     public void merge() {
-        throw new UnsupportedOperationException();
+        DBObject state = journals.findOne(new BasicDBObject("_id", name));
+        checkState(state != null);
+
+        if (state.containsField("parent")) {
+            RecordId base = RecordId.fromString(state.get("base").toString());
+            RecordId head = RecordId.fromString(state.get("head").toString());
+
+            NodeState before = new SegmentNodeState(store, base);
+            NodeState after = new SegmentNodeState(store, head);
+
+            Journal parent = store.getJournal(state.get("parent").toString());
+            SegmentWriter writer = new SegmentWriter(store);
+            while (!parent.setHead(base, head)) {
+                RecordId newBase = parent.getHead();
+                NodeBuilder builder =
+                        new SegmentNodeState(store, newBase).builder();
+                after.compareAgainstBaseState(before, new MergeDiff(builder));
+                RecordId newHead =
+                        writer.writeNode(builder.getNodeState()).getRecordId();
+                writer.flush();
+
+                base = newBase;
+                head = newHead;
+            }
+
+            base = head;
+
+            BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
+            builder.add("_id", name);
+            builder.add("parent", state.get("parent"));
+            builder.add("base", base.toString());
+            builder.add("head", head.toString());
+            journals.update(state, builder.get());
+        }
     }
-}
\ No newline at end of file
+
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java?rev=1455635&r1=1455634&r2=1455635&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MongoStore.java Tue Mar 12 17:24:34 2013
@@ -16,13 +16,12 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static org.apache.jackrabbit.oak.plugins.memory.MemoryNodeState.EMPTY_NODE;
+
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
-import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeState;
-
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
@@ -43,14 +42,6 @@ public class MongoStore implements Segme
         this.journals = db.getCollection("journals");
 
         this.cache = cache;
-
-        if (journals.findOne(new BasicDBObject("_id", "root")) == null) {
-            SegmentWriter writer = new SegmentWriter(this);
-            RecordId id = writer.writeNode(MemoryNodeState.EMPTY_NODE).getRecordId();
-            writer.flush();
-            journals.insert(new BasicDBObject(ImmutableMap.of(
-                    "_id", "root", "head", id.toString())));
-        }
     }
 
     public MongoStore(DB db, long cacheSize) {
@@ -64,7 +55,11 @@ public class MongoStore implements Segme
 
     @Override
     public Journal getJournal(String name) {
-        return new MongoJournal(journals, name);
+        if ("root".equals(name)) {
+            return new MongoJournal(this, journals, EMPTY_NODE);
+        } else {
+            return new MongoJournal(this, journals, name);
+        }
     }
 
     @Override