You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/05/07 13:16:32 UTC
svn commit: r1479858 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak:
kernel/ plugins/segment/
Author: jukka
Date: Tue May 7 11:16:32 2013
New Revision: 1479858
URL: http://svn.apache.org/r1479858
Log:
OAK-775: Implement backward compatible observation
Simplify Kernel- and SegmentNodeStore to make it easier to inject local observation support
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/JsopDiff.java Tue May 7 11:16:32 2013
@@ -22,12 +22,9 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.api.Type.STRINGS;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
import javax.jcr.PropertyType;
-import org.apache.jackrabbit.mk.api.MicroKernel;
import org.apache.jackrabbit.mk.json.JsopBuilder;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -35,50 +32,36 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TODO document
*/
class JsopDiff implements NodeStateDiff {
- private static final Logger log = LoggerFactory.getLogger(JsopDiff.class);
- private final MicroKernel kernel;
+ private final KernelNodeStore store;
protected final JsopBuilder jsop;
protected final String path;
- public JsopDiff(MicroKernel kernel, JsopBuilder jsop, String path) {
- this.kernel = kernel;
+ JsopDiff(KernelNodeStore store, JsopBuilder jsop, String path) {
+ this.store = store;
this.jsop = jsop;
this.path = path;
}
- public JsopDiff(MicroKernel kernel) {
- this(kernel, new JsopBuilder(), "/");
+ JsopDiff(KernelNodeStore store) {
+ this(store, new JsopBuilder(), "/");
}
public static void diffToJsop(
- MicroKernel kernel, NodeState before, NodeState after,
+ KernelNodeStore store, NodeState before, NodeState after,
String path, JsopBuilder jsop) {
- after.compareAgainstBaseState(before, new JsopDiff(kernel, jsop, path));
- }
-
- public static String diffToJsop(NodeState before, NodeState after) {
- JsopDiff diff = new JsopDiff(null) {
- @Override
- protected String writeBlob(Blob blob) {
- return "Blob{" + Arrays.toString(blob.sha256()) + '}';
- }
- };
- after.compareAgainstBaseState(before, diff);
- return diff.toString();
+ after.compareAgainstBaseState(before, new JsopDiff(store, jsop, path));
}
protected JsopDiff createChildDiff(JsopBuilder jsop, String path) {
- return new JsopDiff(kernel, jsop, path);
+ return new JsopDiff(store, jsop, path);
}
//-----------------------------------------------------< NodeStateDiff >--
@@ -194,24 +177,18 @@ class JsopDiff implements NodeStateDiff
* @param blob blob to persist
* @return id of the persisted blob
*/
- protected String writeBlob(Blob blob) {
- String blobId;
+ private String writeBlob(Blob blob) {
+ KernelBlob kernelBlob;
if (blob instanceof KernelBlob) {
- blobId = ((KernelBlob) blob).getBinaryID();
+ kernelBlob = (KernelBlob) blob;
} else {
- InputStream is = blob.getNewStream();
- blobId = kernel.write(is);
- close(is);
+ try {
+ kernelBlob = store.createBlob(blob.getNewStream());
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
- return blobId;
+ return kernelBlob.getBinaryID();
}
- private static void close(InputStream stream) {
- try {
- stream.close();
- }
- catch (IOException e) {
- log.warn("Error closing stream", e);
- }
- }
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java Tue May 7 11:16:32 2013
@@ -64,7 +64,6 @@ public class KernelNodeStore extends Abs
*/
private KernelNodeState root;
-
public KernelNodeStore(final MicroKernel kernel, long cacheSize) {
this.kernel = checkNotNull(kernel);
this.cache = CacheBuilder.newBuilder()
@@ -160,4 +159,12 @@ public class KernelNodeStore extends Abs
}
}
+ NodeState commit(String jsop, String baseRevision) {
+ return getRootState(kernel.commit("", jsop, baseRevision, null));
+ }
+
+ NodeState merge(String headRevision) {
+ return getRootState(kernel.merge(headRevision, null));
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStoreBranch.java Tue May 7 11:16:32 2013
@@ -43,10 +43,7 @@ class KernelNodeStoreBranch extends Abst
private final KernelNodeStore store;
/** Root state of the base revision of this branch */
- private NodeState base;
-
- /** Revision of the base state of this branch*/
- private String baseRevision;
+ private KernelNodeState base;
/** Root state of the transient head revision on top of persisted branch, null if merged. */
private NodeState head;
@@ -60,7 +57,6 @@ class KernelNodeStoreBranch extends Abst
KernelNodeStoreBranch(KernelNodeStore store, KernelNodeState root) {
this.store = store;
this.base = root;
- this.baseRevision = root.getRevision();
this.head = root;
}
@@ -151,25 +147,25 @@ class KernelNodeStoreBranch extends Abst
head = null; // Mark as merged
return base;
} else {
- MicroKernel kernel = store.getKernel();
- String newRevision;
- JsopDiff diff = new JsopDiff(kernel);
+ NodeState newRoot;
+ JsopDiff diff = new JsopDiff(store);
if (headRevision == null) {
// no branch created yet, commit directly
head.compareAgainstBaseState(base, diff);
- newRevision = kernel.commit("", diff.toString(), baseRevision, null);
+ newRoot = store.commit(diff.toString(), base.getRevision());
} else {
// commit into branch and merge
head.compareAgainstBaseState(store.getRootState(headRevision), diff);
String jsop = diff.toString();
if (!jsop.isEmpty()) {
- headRevision = kernel.commit("", jsop, headRevision, null);
+ headRevision = store.getKernel().commit(
+ "", jsop, headRevision, null);
}
- newRevision = kernel.merge(headRevision, null);
+ newRoot = store.merge(headRevision);
headRevision = null;
}
head = null; // Mark as merged
- return store.getRootState(newRevision);
+ return newRoot;
}
} catch (MicroKernelException e) {
head = oldRoot;
@@ -186,7 +182,6 @@ class KernelNodeStoreBranch extends Abst
// Nothing was written to this branch: set new base revision
head = root;
base = root;
- baseRevision = root.getRevision();
} else if (headRevision == null) {
// Nothing written to persistent branch yet
// perform rebase in memory
@@ -194,14 +189,12 @@ class KernelNodeStoreBranch extends Abst
getHead().compareAgainstBaseState(getBase(), new ConflictAnnotatingRebaseDiff(builder));
head = builder.getNodeState();
base = root;
- baseRevision = root.getRevision();
} else {
// perform rebase in kernel
persistTransientHead();
headRevision = store.getKernel().rebase(headRevision, root.getRevision());
head = store.getRootState(headRevision);
base = root;
- baseRevision = root.getRevision();
}
}
@@ -224,7 +217,7 @@ class KernelNodeStoreBranch extends Abst
MicroKernel kernel = store.getKernel();
if (headRevision == null) {
// create the branch if this is the first commit
- headRevision = kernel.branch(baseRevision);
+ headRevision = kernel.branch(base.getRevision());
}
// persist transient changes first
@@ -235,14 +228,13 @@ class KernelNodeStoreBranch extends Abst
}
private void persistTransientHead() {
- NodeState oldBase = base;
- String oldBaseRevision = baseRevision;
+ KernelNodeState oldBase = base;
NodeState oldHead = head;
String oldHeadRevision = headRevision;
boolean success = false;
try {
MicroKernel kernel = store.getKernel();
- JsopDiff diff = new JsopDiff(store.getKernel());
+ JsopDiff diff = new JsopDiff(store);
if (headRevision == null) {
// no persistent branch yet
if (head.equals(base)) {
@@ -251,7 +243,7 @@ class KernelNodeStoreBranch extends Abst
return;
} else {
// create branch
- headRevision = kernel.branch(baseRevision);
+ headRevision = kernel.branch(base.getRevision());
head.compareAgainstBaseState(base, diff);
}
} else {
@@ -274,7 +266,6 @@ class KernelNodeStoreBranch extends Abst
// revert to old state if unsuccessful
if (!success) {
base = oldBase;
- baseRevision = oldBaseRevision;
head = oldHead;
headRevision = oldHeadRevision;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Tue May 7 11:16:32 2013
@@ -22,6 +22,8 @@ import java.io.InputStream;
import javax.annotation.Nonnull;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.commit.EmptyObserver;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.AbstractNodeStore;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
@@ -34,24 +36,37 @@ public class SegmentNodeStore extends Ab
private final SegmentReader reader;
+ private final Observer observer;
+
+ private SegmentNodeState root;
+
public SegmentNodeStore(SegmentStore store, String journal) {
this.store = store;
this.journal = store.getJournal(journal);
this.reader = new SegmentReader(store);
+ this.observer = EmptyObserver.INSTANCE;
+ this.root = new SegmentNodeState(store, this.journal.getHead());
}
public SegmentNodeStore(SegmentStore store) {
this(store, "root");
}
+ boolean setHead(SegmentNodeState base, SegmentNodeState head) {
+ return journal.setHead(base.getRecordId(), head.getRecordId());
+ }
+
@Override @Nonnull
- public NodeState getRoot() {
- return new SegmentNodeState(store, journal.getHead());
+ public synchronized SegmentNodeState getRoot() {
+ NodeState before = root;
+ root = new SegmentNodeState(store, journal.getHead());
+ observer.contentChanged(before, root);
+ return root;
}
@Override @Nonnull
public NodeStoreBranch branch() {
- return new SegmentNodeStoreBranch(store, journal);
+ return new SegmentNodeStoreBranch(this, new SegmentWriter(store));
}
@Override
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java?rev=1479858&r1=1479857&r2=1479858&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBranch.java Tue May 7 11:16:32 2013
@@ -32,49 +32,46 @@ class SegmentNodeStoreBranch extends Abs
private static final Random RANDOM = new Random();
- private final SegmentStore store;
-
- private final Journal journal;
+ private final SegmentNodeStore store;
private final SegmentWriter writer;
- private RecordId baseId;
+ private SegmentNodeState base;
- private RecordId rootId;
+ private SegmentNodeState head;
- SegmentNodeStoreBranch(SegmentStore store, Journal journal) {
+ SegmentNodeStoreBranch(SegmentNodeStore store, SegmentWriter writer) {
this.store = store;
- this.journal = journal;
- this.writer = new SegmentWriter(store);
- this.baseId = journal.getHead();
- this.rootId = baseId;
+ this.writer = writer;
+ this.base = store.getRoot();
+ this.head = base;
}
@Override @Nonnull
public NodeState getBase() {
- return new SegmentNodeState(store, baseId);
+ return base;
}
@Override @Nonnull
public synchronized NodeState getHead() {
- return new SegmentNodeState(store, rootId);
+ return head;
}
@Override
public synchronized void setRoot(NodeState newRoot) {
- this.rootId = writer.writeNode(newRoot).getRecordId();
+ head = writer.writeNode(newRoot);
writer.flush();
}
@Override
public synchronized void rebase() {
- RecordId newBaseId = journal.getHead();
- if (!baseId.equals(newBaseId)) {
- NodeBuilder builder =
- new SegmentNodeState(store, newBaseId).builder();
- getHead().compareAgainstBaseState(getBase(), new ConflictAnnotatingRebaseDiff(builder));
- this.baseId = newBaseId;
- this.rootId = writer.writeNode(builder.getNodeState()).getRecordId();
+ SegmentNodeState newBase = store.getRoot();
+ if (!base.getRecordId().equals(newBase.getRecordId())) {
+ NodeBuilder builder = newBase.builder();
+ head.compareAgainstBaseState(
+ base, new ConflictAnnotatingRebaseDiff(builder));
+ base = newBase;
+ head = writer.writeNode(builder.getNodeState());
writer.flush();
}
}
@@ -82,23 +79,23 @@ class SegmentNodeStoreBranch extends Abs
@Override @Nonnull
public synchronized NodeState merge(CommitHook hook)
throws CommitFailedException {
- RecordId originalBaseId = baseId;
- RecordId originalRootId = rootId;
+ SegmentNodeState originalBase = base;
+ SegmentNodeState originalHead = head;
long backoff = 1;
- while (baseId != rootId) {
+ while (base != head) {
// apply commit hooks on the rebased changes
- RecordId headId = writer.writeNode(
- hook.processCommit(getBase(), getHead())).getRecordId();
+ SegmentNodeState root = writer.writeNode(
+ hook.processCommit(base, head));
writer.flush();
// use optimistic locking to update the journal
- if (journal.setHead(baseId, headId)) {
- baseId = headId;
- rootId = headId;
+ if (store.setHead(base, root)) {
+ base = root;
+ head = root;
} else {
// someone else was faster, so restore state and retry later
- baseId = originalBaseId;
- rootId = originalRootId;
+ base = originalBase;
+ head = originalHead;
// use exponential backoff to reduce contention
if (backoff < 10000) {