You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/09/27 20:30:01 UTC

svn commit: r1527022 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: kernel/ plugins/segment/ plugins/segment/file/ plugins/segment/memory/ plugins/segment/mongo/ spi/state/

Author: jukka
Date: Fri Sep 27 18:30:00 2013
New Revision: 1527022

URL: http://svn.apache.org/r1527022
Log:
OAK-1031: SegmentMK: Fewer segment lookups

Make SegmentNodeState extend Record and use the segment tracking mechanism

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeState.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapEntry.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryJournal.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoJournal.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeState.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeState.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeState.java Fri Sep 27 18:30:00 2013
@@ -623,7 +623,7 @@ public final class KernelNodeState exten
         if (!hasChanges(jsonDiff)) {
             return true;
         }
-        if (!comparePropertiesAgainstBaseState(base, diff)) {
+        if (!AbstractNodeState.comparePropertiesAgainstBaseState(this, base, diff)) {
             return false;
         }
         JsopTokenizer t = new JsopTokenizer(jsonDiff);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapEntry.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MapEntry.java Fri Sep 27 18:30:00 2013
@@ -59,7 +59,7 @@ class MapEntry extends AbstractChildNode
     @Override @Nonnull
     public NodeState getNodeState() {
         checkState(value != null);
-        return new SegmentNodeState(segment.store, value);
+        return new SegmentNodeState(segment, value);
     }
 
     //---------------------------------------------------------< Map.Entry >--

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeState.java Fri Sep 27 18:30:00 2013
@@ -30,30 +30,24 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
 
-public class SegmentNodeState extends AbstractNodeState {
+import com.google.common.base.Objects;
+
+public class SegmentNodeState extends Record implements NodeState {
 
     static boolean fastEquals(NodeState a, NodeState b) {
         return a instanceof SegmentNodeState
                 && b instanceof SegmentNodeState
-                && ((SegmentNodeState) a).recordId.equals(
-                        ((SegmentNodeState) b).recordId);
+                && Objects.equal(
+                        ((SegmentNodeState) a).getRecordId(),
+                        ((SegmentNodeState) b).getRecordId());
     }
 
-    private final SegmentStore store;
-
-    private final RecordId recordId;
-
     private RecordId templateId = null;
 
     private Template template = null;
 
-    public SegmentNodeState(SegmentStore store, RecordId id) {
-        this.store = checkNotNull(store);
-        this.recordId = checkNotNull(id);
-    }
-
-    public RecordId getRecordId() {
-        return recordId;
+    public SegmentNodeState(Segment segment, RecordId id) {
+        super(segment, id);
     }
 
     RecordId getTemplateId() {
@@ -63,15 +57,15 @@ public class SegmentNodeState extends Ab
 
     synchronized Template getTemplate() {
         if (template == null) {
-            Segment segment = store.readSegment(recordId.getSegmentId());
-            templateId = segment.readRecordId(recordId.getOffset());
+            Segment segment = getSegment();
+            templateId = segment.readRecordId(getOffset(0));
             template = segment.readTemplate(templateId);
         }
         return template;
     }
 
     MapRecord getChildNodeMap() {
-        return getTemplate().getChildNodeMap(store, recordId);
+        return getTemplate().getChildNodeMap(getSegment(), getRecordId());
     }
 
     @Override
@@ -93,44 +87,70 @@ public class SegmentNodeState extends Ab
     @Override @CheckForNull
     public PropertyState getProperty(String name) {
         checkNotNull(name);
-        return getTemplate().getProperty(name, store, recordId);
+        return getTemplate().getProperty(name, getSegment(), getRecordId());
     }
 
     @Override @Nonnull
     public Iterable<PropertyState> getProperties() {
-        return getTemplate().getProperties(store, recordId);
+        return getTemplate().getProperties(getSegment(), getRecordId());
+    }
+
+    @Override
+    public boolean getBoolean(String name) {
+        return AbstractNodeState.getBoolean(this, name);
+    }
+
+    @Override
+    public long getLong(String name) {
+        return AbstractNodeState.getLong(this, name);
+    }
+
+    @Override
+    public String getString(String name) {
+        return AbstractNodeState.getString(this, name);
+    }
+
+    @Override
+    public String getName(String name) {
+        return AbstractNodeState.getName(this, name);
+    }
+
+    @Override
+    public Iterable<String> getNames(String name) {
+        return AbstractNodeState.getNames(this, name);
     }
 
     @Override
     public long getChildNodeCount(long max) {
-        return getTemplate().getChildNodeCount(store, recordId);
+        return getTemplate().getChildNodeCount(getSegment(), getRecordId());
     }
 
     @Override
     public boolean hasChildNode(String name) {
         checkArgument(!checkNotNull(name).isEmpty());
-        return getTemplate().hasChildNode(name, store, recordId);
+        return getTemplate().hasChildNode(name, getSegment(), getRecordId());
     }
 
     @Override @CheckForNull
     public NodeState getChildNode(String name) {
         // checkArgument(!checkNotNull(name).isEmpty()); // TODO
-        return getTemplate().getChildNode(name, store, recordId);
+        return getTemplate().getChildNode(name, getSegment(), getRecordId());
     }
 
     @Override
     public Iterable<String> getChildNodeNames() {
-        return getTemplate().getChildNodeNames(store, recordId);
+        return getTemplate().getChildNodeNames(getSegment(), getRecordId());
     }
 
     @Override @Nonnull
     public Iterable<? extends ChildNodeEntry> getChildNodeEntries() {
-        return getTemplate().getChildNodeEntries(store, recordId);
+        return getTemplate().getChildNodeEntries(getSegment(), getRecordId());
     }
 
     @Override @Nonnull
     public NodeBuilder builder() {
-        return new SegmentRootBuilder(this, store.getWriter());
+        // TODO: avoid the Segment.store reference
+        return new SegmentRootBuilder(this, getSegment().store.getWriter());
     }
 
     @Override
@@ -139,15 +159,17 @@ public class SegmentNodeState extends Ab
              return true; // no changes
         } else if (base == EMPTY_NODE || !base.exists()) { // special case
             return getTemplate().compareAgainstEmptyState(
-                    store, recordId, diff);
+                    getSegment(), getRecordId(), diff);
         } else if (base instanceof SegmentNodeState) {
             SegmentNodeState that = (SegmentNodeState) base;
-            return recordId.equals(that.recordId)
+            return getRecordId().equals(that.getRecordId())
                 || getTemplate().compareAgainstBaseState(
-                        store, recordId, that.getTemplate(), that.recordId,
+                        getSegment(), getRecordId(),
+                        that.getTemplate(), that.getRecordId(),
                         diff);
         } else {
-            return super.compareAgainstBaseState(base, diff); // fallback
+            // fallback
+            return AbstractNodeState.compareAgainstBaseState(this, base, diff);
         }
     }
 
@@ -157,15 +179,17 @@ public class SegmentNodeState extends Ab
             return true;
         } else if (object instanceof SegmentNodeState) {
             SegmentNodeState that = (SegmentNodeState) object;
-            if (recordId.equals(that.recordId)) {
+            if (getRecordId().equals(that.getRecordId())) {
                 return true;
             } else {
                 Template template = getTemplate();
                 return template.equals(that.getTemplate())
-                        && template.compare(store, recordId, that.store, that.recordId);
+                        && template.compare(
+                                getSegment(), getRecordId(),
+                                that.getSegment(), that.getRecordId());
             }
         } else {
-            return super.equals(object);
+            return super.equals(object); // TODO
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Fri Sep 27 18:30:00 2013
@@ -56,7 +56,8 @@ public class SegmentNodeStore implements
         this.store = store;
         this.journal = store.getJournal(journal);
         this.observer = EmptyObserver.INSTANCE;
-        this.head = new SegmentNodeState(store, this.journal.getHead());
+        this.head = new SegmentNodeState(
+                store.getWriter().getDummySegment(), this.journal.getHead());
     }
 
     public SegmentNodeStore(SegmentStore store) {
@@ -69,7 +70,8 @@ public class SegmentNodeStore implements
 
     synchronized SegmentNodeState getHead() {
         NodeState before = head.getChildNode(ROOT);
-        head = new SegmentNodeState(store, journal.getHead());
+        head = new SegmentNodeState(
+                store.getWriter().getDummySegment(), journal.getHead());
         NodeState after = head.getChildNode(ROOT);
         observer.contentChanged(before, after);
         return head;
@@ -141,7 +143,9 @@ public class SegmentNodeStore implements
     public synchronized NodeState retrieve(@Nonnull String checkpoint) {
         // TODO: Verify validity of the checkpoint
         RecordId id = RecordId.fromString(checkNotNull(checkpoint));
-        return new SegmentNodeState(store, id).getChildNode(ROOT);
+        SegmentNodeState root =
+                new SegmentNodeState(store.getWriter().getDummySegment(), id);
+        return root.getChildNode(ROOT);
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Fri Sep 27 18:30:00 2013
@@ -782,7 +782,7 @@ public class SegmentWriter {
             for (RecordId id : ids) {
                 writeRecordId(id);
             }
-            return new SegmentNodeState(store, recordId);
+            return new SegmentNodeState(dummySegment, recordId);
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Template.java Fri Sep 27 18:30:00 2013
@@ -202,7 +202,7 @@ public class Template {
     }
 
     public PropertyState getProperty(
-            String name, SegmentStore store, RecordId recordId) {
+            String name, Segment segment, RecordId recordId) {
         if (JCR_PRIMARYTYPE.equals(name) && primaryType != null) {
             return primaryType;
         } else if (JCR_MIXINTYPES.equals(name) && mixinTypes != null) {
@@ -217,7 +217,7 @@ public class Template {
             while (index < properties.length
                     && properties[index].getName().hashCode() == hash) {
                 if (name.equals(properties[index].getName())) {
-                    return getProperty(store, recordId, index);
+                    return getProperty(segment, recordId, index);
                 }
                 index++;
             }
@@ -226,23 +226,21 @@ public class Template {
     }
 
     private PropertyState getProperty(
-            SegmentStore store, RecordId recordId, int index) {
-        checkNotNull(store);
-        checkNotNull(recordId);
+            Segment segment, RecordId recordId, int index) {
         checkElementIndex(index, properties.length);
+        segment = checkNotNull(segment).getSegment(checkNotNull(recordId));
 
         int offset = recordId.getOffset() + RECORD_ID_BYTES;
         if (!hasNoChildNodes()) {
             offset += RECORD_ID_BYTES;
         }
         offset += index * RECORD_ID_BYTES;
-        Segment segment = store.readSegment(recordId.getSegmentId());
         return new SegmentPropertyState(
                 segment, segment.readRecordId(offset), properties[index]);
     }
 
     public Iterable<PropertyState> getProperties(
-            SegmentStore store, RecordId recordId) {
+            Segment segment, RecordId recordId) {
         List<PropertyState> list =
                 Lists.newArrayListWithCapacity(properties.length + 2);
         if (primaryType != null) {
@@ -255,7 +253,7 @@ public class Template {
         if (!hasNoChildNodes()) {
             offset += RECORD_ID_BYTES;
         }
-        Segment segment = store.readSegment(recordId.getSegmentId());
+        segment = segment.getSegment(recordId);
         for (int i = 0; i < properties.length; i++) {
             RecordId propertyId = segment.readRecordId(offset);
             list.add(new SegmentPropertyState(
@@ -265,31 +263,31 @@ public class Template {
         return list;
     }
 
-    public long getChildNodeCount(SegmentStore store, RecordId recordId) {
+    public long getChildNodeCount(Segment segment, RecordId recordId) {
         if (hasNoChildNodes()) {
             return 0;
         } else if (hasManyChildNodes()) {
-            MapRecord map = getChildNodeMap(store, recordId);
+            MapRecord map = getChildNodeMap(segment, recordId);
             return map.size();
         } else {
             return 1;
         }
     }
 
-    MapRecord getChildNodeMap(SegmentStore store, RecordId recordId) {
+    MapRecord getChildNodeMap(Segment segment, RecordId recordId) {
         checkState(hasManyChildNodes());
+        segment = segment.getSegment(recordId);
         int offset = recordId.getOffset() + RECORD_ID_BYTES;
-        Segment segment = store.readSegment(recordId.getSegmentId());
         RecordId childNodesId = segment.readRecordId(offset);
         return MapRecord.readMap(segment, childNodesId);
     }
 
     public boolean hasChildNode(
-            String name, SegmentStore store, RecordId recordId) {
+            String name, Segment segment, RecordId recordId) {
         if (hasNoChildNodes()) {
             return false;
         } else if (hasManyChildNodes()) {
-            MapRecord map = getChildNodeMap(store, recordId);
+            MapRecord map = getChildNodeMap(segment, recordId);
             return map.getEntry(name) != null;
         } else {
             return name.equals(childName);
@@ -297,32 +295,32 @@ public class Template {
     }
 
     public NodeState getChildNode(
-            String name, SegmentStore store, RecordId recordId) {
+            String name, Segment segment, RecordId recordId) {
         if (hasNoChildNodes()) {
             return MISSING_NODE;
         } else if (hasManyChildNodes()) {
-            MapRecord map = getChildNodeMap(store, recordId);
+            MapRecord map = getChildNodeMap(segment, recordId);
             RecordId childNodeId = map.getEntry(name);
             if (childNodeId != null) {
-                return new SegmentNodeState(store, childNodeId);
+                return new SegmentNodeState(segment, childNodeId);
             } else {
                 return MISSING_NODE;
             }
         } else if (name.equals(childName)) {
+            segment = segment.getSegment(recordId);
             int offset = recordId.getOffset() + RECORD_ID_BYTES;
-            Segment segment = store.readSegment(recordId.getSegmentId());
             RecordId childNodeId = segment.readRecordId(offset);
-            return new SegmentNodeState(store, childNodeId);
+            return new SegmentNodeState(segment, childNodeId);
         } else {
             return MISSING_NODE;
         }
     }
 
-    Iterable<String> getChildNodeNames(SegmentStore store, RecordId recordId) {
+    Iterable<String> getChildNodeNames(Segment segment, RecordId recordId) {
         if (hasNoChildNodes()) {
             return Collections.emptyList();
         } else if (hasManyChildNodes()) {
-            MapRecord map = getChildNodeMap(store, recordId);
+            MapRecord map = getChildNodeMap(segment, recordId);
             return map.getKeys();
         } else {
             return Collections.singletonList(childName);
@@ -330,30 +328,30 @@ public class Template {
     }
 
     Iterable<? extends ChildNodeEntry> getChildNodeEntries(
-            SegmentStore store, RecordId recordId) {
+            Segment segment, RecordId recordId) {
         if (hasNoChildNodes()) {
             return Collections.emptyList();
         } else if (hasManyChildNodes()) {
-            MapRecord map = getChildNodeMap(store, recordId);
+            MapRecord map = getChildNodeMap(segment, recordId);
             return map.getEntries();
         } else {
+            segment = segment.getSegment(recordId);
             int offset = recordId.getOffset() + RECORD_ID_BYTES;
-            Segment segment = store.readSegment(recordId.getSegmentId());
             RecordId childNodeId = segment.readRecordId(offset);
             return Collections.singletonList(new MemoryChildNodeEntry(
-                    childName, new SegmentNodeState(store, childNodeId)));
+                    childName, new SegmentNodeState(segment, childNodeId)));
         }
     }
 
     public boolean compare(
-            SegmentStore thisStore, RecordId thisId, SegmentStore thatStore, RecordId thatId) {
+            Segment thisSegment, RecordId thisId, Segment thatSegment, RecordId thatId) {
         checkNotNull(thisId);
         checkNotNull(thatId);
 
         // Compare properties
         for (int i = 0; i < properties.length; i++) {
-            PropertyState thisProperty = getProperty(thisStore, thisId, i);
-            PropertyState thatProperty = getProperty(thatStore, thatId, i);
+            PropertyState thisProperty = getProperty(thisSegment, thisId, i);
+            PropertyState thatProperty = getProperty(thatSegment, thatId, i);
             if (!thisProperty.equals(thatProperty)) {
                 return false;
             }
@@ -363,13 +361,13 @@ public class Template {
         if (hasNoChildNodes()) {
             return true;
         } else if (hasOneChildNode()) {
-            NodeState thisChild = getChildNode(childName, thisStore, thisId);
-            NodeState thatChild = getChildNode(childName, thatStore, thatId);
+            NodeState thisChild = getChildNode(childName, thisSegment, thisId);
+            NodeState thatChild = getChildNode(childName, thatSegment, thatId);
             return thisChild.equals(thatChild);
         } else {
             // TODO: Leverage the HAMT data structure for the comparison
-            MapRecord thisMap = getChildNodeMap(thisStore, thisId);
-            MapRecord thatMap = getChildNodeMap(thatStore, thatId);
+            MapRecord thisMap = getChildNodeMap(thisSegment, thisId);
+            MapRecord thatMap = getChildNodeMap(thatSegment, thatId);
             if (thisMap.getRecordId().equals(thatMap.getRecordId())) {
                 return true; // shortcut
             } else if (thisMap.size() != thatMap.size()) {
@@ -383,8 +381,8 @@ public class Template {
                     if (thatChild == null) {
                         return false;
                     } else if (!thisChild.equals(thatChild)
-                            && !new SegmentNodeState(thisStore, thisChild).equals(
-                                    new SegmentNodeState(thatStore, thatChild))) {
+                            && !new SegmentNodeState(thisSegment, thisChild).equals(
+                                    new SegmentNodeState(thatSegment, thatChild))) {
                         return false;
                     }
                 }
@@ -394,10 +392,10 @@ public class Template {
     }
 
     public boolean compareAgainstBaseState(
-            final SegmentStore store, RecordId afterId,
+            final Segment segment, RecordId afterId,
             Template beforeTemplate, RecordId beforeId,
             final NodeStateDiff diff) {
-        checkNotNull(store);
+        checkNotNull(segment);
         checkNotNull(afterId);
         checkNotNull(beforeTemplate);
         checkNotNull(beforeId);
@@ -409,6 +407,9 @@ public class Template {
             return false;
         }
 
+        final Segment afterSegment = segment.getSegment(afterId);
+        final Segment beforeSegment = segment.getSegment(beforeId);
+
         // Compare other properties, leveraging the ordering
         int beforeIndex = 0;
         int afterIndex = 0;
@@ -423,27 +424,27 @@ public class Template {
             PropertyState beforeProperty = null;
             PropertyState afterProperty = null;
             if (d < 0) {
-                afterProperty = getProperty(store, afterId, afterIndex++);
+                afterProperty = getProperty(afterSegment, afterId, afterIndex++);
             } else if (d > 0) {
                 beforeProperty = beforeTemplate.getProperty(
-                        store, beforeId, beforeIndex++);
+                        beforeSegment, beforeId, beforeIndex++);
             } else {
-                afterProperty = getProperty(store, afterId, afterIndex++);
+                afterProperty = getProperty(afterSegment, afterId, afterIndex++);
                 beforeProperty = beforeTemplate.getProperty(
-                        store, beforeId, beforeIndex++);
+                        beforeSegment, beforeId, beforeIndex++);
             }
             if (!compareProperties(beforeProperty, afterProperty, diff)) {
                 return false;
             }
         }
         while (afterIndex < properties.length) {
-            if (!diff.propertyAdded(getProperty(store, afterId, afterIndex++))) {
+            if (!diff.propertyAdded(getProperty(afterSegment, afterId, afterIndex++))) {
                 return false;
             }
         }
         while (beforeIndex < beforeTemplate.properties.length) {
             PropertyState beforeProperty = beforeTemplate.getProperty(
-                    store, beforeId, beforeIndex++);
+                    beforeSegment, beforeId, beforeIndex++);
             if (!diff.propertyDeleted(beforeProperty)) {
                 return false;
             }
@@ -452,7 +453,7 @@ public class Template {
         if (hasNoChildNodes()) {
             if (!beforeTemplate.hasNoChildNodes()) {
                 for (ChildNodeEntry entry :
-                        beforeTemplate.getChildNodeEntries(store, beforeId)) {
+                        beforeTemplate.getChildNodeEntries(beforeSegment, beforeId)) {
                     if (!diff.childNodeDeleted(
                             entry.getName(), entry.getNodeState())) {
                         return false;
@@ -460,9 +461,9 @@ public class Template {
                 }
             }
         } else if (hasOneChildNode()) {
-            NodeState afterNode = getChildNode(childName, store, afterId);
+            NodeState afterNode = getChildNode(childName, afterSegment, afterId);
             NodeState beforeNode = beforeTemplate.getChildNode(
-                    childName, store, beforeId);
+                    childName, beforeSegment, beforeId);
             if (!beforeNode.exists()) {
                 if (!diff.childNodeAdded(childName, afterNode)) {
                     return false;
@@ -475,7 +476,7 @@ public class Template {
             if ((beforeTemplate.hasOneChildNode() && !beforeNode.exists())
                     || beforeTemplate.hasManyChildNodes()) {
                 for (ChildNodeEntry entry :
-                    beforeTemplate.getChildNodeEntries(store, beforeId)) {
+                    beforeTemplate.getChildNodeEntries(beforeSegment, beforeId)) {
                     if (!childName.equals(entry.getName())) {
                         if (!diff.childNodeDeleted(
                                 entry.getName(), entry.getNodeState())) {
@@ -485,7 +486,7 @@ public class Template {
                 }
             }
         } else if (beforeTemplate.hasNoChildNodes()) {
-            for (ChildNodeEntry entry : getChildNodeEntries(store, afterId)) {
+            for (ChildNodeEntry entry : getChildNodeEntries(afterSegment, afterId)) {
                 if (!diff.childNodeAdded(
                         entry.getName(), entry.getNodeState())) {
                     return false;
@@ -493,12 +494,12 @@ public class Template {
             }
         } else if (beforeTemplate.hasOneChildNode()) {
             String name = beforeTemplate.getChildName();
-            for (ChildNodeEntry entry : getChildNodeEntries(store, afterId)) {
+            for (ChildNodeEntry entry : getChildNodeEntries(afterSegment, afterId)) {
                 String childName = entry.getName();
                 NodeState afterChild = entry.getNodeState();
                 if (name.equals(childName)) {
                     NodeState beforeChild =
-                            beforeTemplate.getChildNode(name, store, beforeId);
+                            beforeTemplate.getChildNode(name, beforeSegment, beforeId);
                     if (beforeChild.exists()) {
                         if (!fastEquals(afterChild, beforeChild)
                                 && !diff.childNodeChanged(
@@ -516,25 +517,25 @@ public class Template {
             }
         } else {
             // TODO: Leverage the HAMT data structure for the comparison
-            MapRecord afterMap = getChildNodeMap(store, afterId);
-            MapRecord beforeMap = beforeTemplate.getChildNodeMap(store, beforeId);
+            MapRecord afterMap = getChildNodeMap(afterSegment, afterId);
+            MapRecord beforeMap = beforeTemplate.getChildNodeMap(beforeSegment, beforeId);
             return afterMap.compare(beforeMap, new MapDiff() {
                 @Override
                 public boolean entryAdded(String key, RecordId after) {
                     return diff.childNodeAdded(
-                            key, new SegmentNodeState(store, after));
+                            key, new SegmentNodeState(afterSegment, after));
                 }
                 @Override
                 public boolean entryChanged(
                         String key, RecordId before, RecordId after) {
-                    SegmentNodeState b = new SegmentNodeState(store, before);
-                    SegmentNodeState a = new SegmentNodeState(store, after);
+                    SegmentNodeState b = new SegmentNodeState(beforeSegment, before);
+                    SegmentNodeState a = new SegmentNodeState(afterSegment, after);
                     return fastEquals(a, b) || diff.childNodeChanged(key, b, a);
                 }
                 @Override
                 public boolean entryDeleted(String key, RecordId before) {
                     return diff.childNodeDeleted(
-                            key, new SegmentNodeState(store, before));
+                            key, new SegmentNodeState(beforeSegment, before));
                 }
             });
         }
@@ -543,9 +544,8 @@ public class Template {
     }
 
     public boolean compareAgainstEmptyState(
-            final SegmentStore store, RecordId recordId,
-            final NodeStateDiff diff) {
-        checkNotNull(store);
+            Segment segment, RecordId recordId, final NodeStateDiff diff) {
+        checkNotNull(segment);
         checkNotNull(recordId);
         checkNotNull(diff);
 
@@ -557,17 +557,18 @@ public class Template {
             return false;
         }
 
-        Segment segment = store.readSegment(recordId.getSegmentId());
+        segment = segment.getSegment(recordId);
         int offset = recordId.getOffset() + RECORD_ID_BYTES;
 
         if (childName == MANY_CHILD_NODES) {
             RecordId childNodesId = segment.readRecordId(offset);
             MapRecord children = MapRecord.readMap(segment, childNodesId);
+            final Segment s = segment;
             children.compareAgainstEmptyMap(new MapDiff() {
                 @Override
                 public boolean entryAdded(String key, RecordId after) {
                     return diff.childNodeAdded(
-                            key, new SegmentNodeState(store, after));
+                            key, new SegmentNodeState(s, after));
                 }
                 @Override
                 public boolean entryChanged(
@@ -583,7 +584,7 @@ public class Template {
         } else if (childName != ZERO_CHILD_NODES) {
             RecordId childNodeId = segment.readRecordId(offset);
             if (!diff.childNodeAdded(
-                    childName, new SegmentNodeState(store, childNodeId))) {
+                    childName, new SegmentNodeState(segment, childNodeId))) {
                 return false;
             }
             offset += RECORD_ID_BYTES;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Fri Sep 27 18:30:00 2013
@@ -83,6 +83,7 @@ public class FileStore implements Segmen
             }
         }
 
+        Segment segment = writer.getDummySegment();
         for (TarFile tar : files) {
             ByteBuffer buffer = tar.readEntry(JOURNALS_UUID);
             if (buffer != null) {
@@ -96,7 +97,7 @@ public class FileStore implements Segmen
                             new UUID(buffer.getLong(), buffer.getLong()),
                             buffer.getInt());
                     journals.put(name, new FileJournal(
-                            this, new SegmentNodeState(this, recordId)));
+                            this, new SegmentNodeState(segment, recordId)));
                 }
             }
         }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryJournal.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryJournal.java Fri Sep 27 18:30:00 2013
@@ -21,6 +21,7 @@ import static com.google.common.base.Pre
 import org.apache.jackrabbit.oak.plugins.segment.Journal;
 import org.apache.jackrabbit.oak.plugins.segment.MergeDiff;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
@@ -78,14 +79,16 @@ public class MemoryJournal implements Jo
     @Override
     public synchronized void merge() {
         if (parent != null) {
-            NodeState before = new SegmentNodeState(store, base);
-            NodeState after = new SegmentNodeState(store, head);
-
             SegmentWriter writer = store.getWriter();
+
+            Segment segment = writer.getDummySegment();
+            NodeState before = new SegmentNodeState(segment, base);
+            NodeState after = new SegmentNodeState(segment, head);
+
             while (!parent.setHead(base, head)) {
                 RecordId newBase = parent.getHead();
                 NodeBuilder builder =
-                        new SegmentNodeState(store, newBase).builder();
+                        new SegmentNodeState(segment, newBase).builder();
                 after.compareAgainstBaseState(before, new MergeDiff(builder));
                 NodeState state = builder.getNodeState();
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoJournal.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoJournal.java Fri Sep 27 18:30:00 2013
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.jackrabbit.oak.plugins.segment.Journal;
 import org.apache.jackrabbit.oak.plugins.segment.MergeDiff;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
@@ -175,15 +176,17 @@ class MongoJournal implements Journal {
             RecordId base = RecordId.fromString(state.get("base").toString());
             RecordId head = RecordId.fromString(state.get("head").toString());
 
-            NodeState before = new SegmentNodeState(store, base);
-            NodeState after = new SegmentNodeState(store, head);
+            SegmentWriter writer = store.getWriter();
+            Segment segment = writer.getDummySegment();
+
+            NodeState before = new SegmentNodeState(segment, base);
+            NodeState after = new SegmentNodeState(segment, head);
 
             Journal parent = store.getJournal(state.get("parent").toString());
-            SegmentWriter writer = store.getWriter();
             while (!parent.setHead(base, head)) {
                 RecordId newBase = parent.getHead();
                 NodeBuilder builder =
-                        new SegmentNodeState(store, newBase).builder();
+                        new SegmentNodeState(segment, newBase).builder();
                 after.compareAgainstBaseState(before, new MergeDiff(builder));
                 RecordId newHead =
                         writer.writeNode(builder.getNodeState()).getRecordId();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeState.java?rev=1527022&r1=1527021&r2=1527022&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeState.java Fri Sep 27 18:30:00 2013
@@ -50,22 +50,15 @@ import org.apache.jackrabbit.oak.api.Pro
  */
 public abstract class AbstractNodeState implements NodeState {
 
-    @Override
-    public boolean hasProperty(String name) {
-        return getProperty(name) != null;
-    }
-
-    @Override
-    public boolean getBoolean(String name) {
-        PropertyState property = getProperty(name);
+    public static boolean getBoolean(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
         return property != null
                 && property.getType() == BOOLEAN
                 && property.getValue(BOOLEAN);
     }
 
-    @Override
-    public long getLong(String name) {
-        PropertyState property = getProperty(name);
+    public static long getLong(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
         if (property != null && property.getType() == LONG) {
             return property.getValue(LONG);
         } else {
@@ -73,9 +66,8 @@ public abstract class AbstractNodeState 
         }
     }
 
-    @Override
-    public String getString(String name) {
-        PropertyState property = getProperty(name);
+    public static String getString(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
         if (property != null && property.getType() == STRING) {
             return property.getValue(STRING);
         } else {
@@ -83,9 +75,8 @@ public abstract class AbstractNodeState 
         }
     }
 
-    @Override @CheckForNull
-    public String getName(@Nonnull String name) {
-        PropertyState property = getProperty(name);
+    public static String getName(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
         if (property != null && property.getType() == NAME) {
             return property.getValue(NAME);
         } else {
@@ -93,9 +84,8 @@ public abstract class AbstractNodeState 
         }
     }
 
-    @Override @Nonnull
-    public Iterable<String> getNames(@Nonnull String name) {
-        PropertyState property = getProperty(name);
+    public static Iterable<String> getNames(NodeState state, String name) {
+        PropertyState property = state.getProperty(name);
         if (property != null && property.getType() == NAMES) {
             return property.getValue(NAMES);
         } else {
@@ -103,6 +93,119 @@ public abstract class AbstractNodeState 
         }
     }
 
+    /**
+     * Generic default comparison algorithm that simply walks through the
+     * property and child node lists of the given base state and compares
+     * the entries one by one with corresponding ones (if any) in this state.
+     */
+    public static boolean compareAgainstBaseState(
+            NodeState state, NodeState base, NodeStateDiff diff) {
+        if (!comparePropertiesAgainstBaseState(state, base, diff)) {
+            return false;
+        }
+
+        Set<String> baseChildNodes = new HashSet<String>();
+        for (ChildNodeEntry beforeCNE : base.getChildNodeEntries()) {
+            String name = beforeCNE.getName();
+            NodeState beforeChild = beforeCNE.getNodeState();
+            NodeState afterChild = state.getChildNode(name);
+            if (!afterChild.exists()) {
+                if (!diff.childNodeDeleted(name, beforeChild)) {
+                    return false;
+                }
+            } else {
+                baseChildNodes.add(name);
+                if (afterChild != beforeChild) { // TODO: fastEquals?
+                    if (!diff.childNodeChanged(name, beforeChild, afterChild)) {
+                        return false;
+                    }
+                }
+            }
+        }
+
+        for (ChildNodeEntry afterChild : state.getChildNodeEntries()) {
+            String name = afterChild.getName();
+            if (!baseChildNodes.contains(name)) {
+                if (!diff.childNodeAdded(name, afterChild.getNodeState())) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Compares the properties of {@code base} state with {@code this}
+     * state.
+     *
+     * @param state the head node state.
+     * @param base the base node state.
+     * @param diff the node state diff.
+     * @return {@code true} to continue the comparison, {@code false} to stop
+     */
+    public static boolean comparePropertiesAgainstBaseState(
+            NodeState state, NodeState base, NodeStateDiff diff) {
+        Set<String> baseProperties = new HashSet<String>();
+        for (PropertyState beforeProperty : base.getProperties()) {
+            String name = beforeProperty.getName();
+            PropertyState afterProperty = state.getProperty(name);
+            if (afterProperty == null) {
+                if (!diff.propertyDeleted(beforeProperty)) {
+                    return false;
+                }
+            } else {
+                baseProperties.add(name);
+                if (!beforeProperty.equals(afterProperty)) {
+                    if (!diff.propertyChanged(beforeProperty, afterProperty)) {
+                        return false;
+                    }
+                }
+            }
+        }
+
+        for (PropertyState afterProperty : state.getProperties()) {
+            if (!baseProperties.contains(afterProperty.getName())) {
+                if (!diff.propertyAdded(afterProperty)) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public boolean hasProperty(String name) {
+        return getProperty(name) != null;
+    }
+
+    @Override
+    public boolean getBoolean(String name) {
+        return getBoolean(this, name);
+    }
+
+    @Override
+    public long getLong(String name) {
+        return getLong(this, name);
+    }
+
+    @Override
+    public String getString(String name) {
+        return getString(this, name);
+    }
+
+    @Override @CheckForNull
+    public String getName(@Nonnull String name) {
+        return getName(this, name);
+    }
+
+    @Override @Nonnull
+    public Iterable<String> getNames(@Nonnull String name) {
+        return getNames(this, name);
+    }
+
     @Override
     public PropertyState getProperty(String name) {
         for (PropertyState property : getProperties()) {
@@ -156,39 +259,7 @@ public abstract class AbstractNodeState 
      */
     @Override
     public boolean compareAgainstBaseState(NodeState base, NodeStateDiff diff) {
-        if (!comparePropertiesAgainstBaseState(base, diff)) {
-            return false;
-        }
-
-        Set<String> baseChildNodes = new HashSet<String>();
-        for (ChildNodeEntry beforeCNE : base.getChildNodeEntries()) {
-            String name = beforeCNE.getName();
-            NodeState beforeChild = beforeCNE.getNodeState();
-            NodeState afterChild = getChildNode(name);
-            if (!afterChild.exists()) {
-                if (!diff.childNodeDeleted(name, beforeChild)) {
-                    return false;
-                }
-            } else {
-                baseChildNodes.add(name);
-                if (afterChild != beforeChild) { // TODO: fastEquals?
-                    if (!diff.childNodeChanged(name, beforeChild, afterChild)) {
-                        return false;
-                    }
-                }
-            }
-        }
-
-        for (ChildNodeEntry afterChild : getChildNodeEntries()) {
-            String name = afterChild.getName();
-            if (!baseChildNodes.contains(name)) {
-                if (!diff.childNodeAdded(name, afterChild.getNodeState())) {
-                    return false;
-                }
-            }
-        }
-
-        return true;
+        return compareAgainstBaseState(this, base, diff);
     }
 
     /**
@@ -301,45 +372,6 @@ public abstract class AbstractNodeState 
         return 0;
     }
 
-    /**
-     * Compares the properties of {@code base} state with {@code this}
-     * state.
-     *
-     * @param base the base node state.
-     * @param diff the node state diff.
-     * @return {@code true} to continue the comparison, {@code false} to stop
-     */
-    protected boolean comparePropertiesAgainstBaseState(NodeState base,
-                                                     NodeStateDiff diff) {
-        Set<String> baseProperties = new HashSet<String>();
-        for (PropertyState beforeProperty : base.getProperties()) {
-            String name = beforeProperty.getName();
-            PropertyState afterProperty = getProperty(name);
-            if (afterProperty == null) {
-                if (!diff.propertyDeleted(beforeProperty)) {
-                    return false;
-                }
-            } else {
-                baseProperties.add(name);
-                if (!beforeProperty.equals(afterProperty)) {
-                    if (!diff.propertyChanged(beforeProperty, afterProperty)) {
-                        return false;
-                    }
-                }
-            }
-        }
-
-        for (PropertyState afterProperty : getProperties()) {
-            if (!baseProperties.contains(afterProperty.getName())) {
-                if (!diff.propertyAdded(afterProperty)) {
-                    return false;
-                }
-            }
-        }
-
-        return true;
-    }
-
     //-----------------------------------------------------------< private >--
 
     protected static long count(Iterable<?> iterable) {