You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/06/01 07:48:52 UTC

svn commit: r1746410 [2/4] - in /jackrabbit/oak/trunk: oak-run/src/main/java/org/apache/jackrabbit/oak/checkpoint/ oak-run/src/main/java/org/apache/jackrabbit/oak/console/ oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/ oak-run/src/main/java/...

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java Wed Jun  1 07:48:51 2016
@@ -62,7 +62,7 @@ import org.apache.jackrabbit.oak.plugins
  */
 public class SegmentPropertyState extends Record implements PropertyState {
     @Nonnull
-    private final SegmentStore store;
+    private final SegmentReader reader;
 
     @Nonnull
     private final String name;
@@ -70,17 +70,17 @@ public class SegmentPropertyState extend
     @Nonnull
     private final Type<?> type;
 
-    SegmentPropertyState(@Nonnull SegmentStore store, @Nonnull RecordId id,
+    SegmentPropertyState(@Nonnull SegmentReader reader, @Nonnull RecordId id,
                          @Nonnull String name, @Nonnull Type<?> type) {
         super(id);
-        this.store = checkNotNull(store);
+        this.reader = checkNotNull(reader);
         this.name = checkNotNull(name);
         this.type = checkNotNull(type);
     }
 
-    SegmentPropertyState(@Nonnull SegmentStore store, @Nonnull RecordId id,
+    SegmentPropertyState(@Nonnull SegmentReader reader, @Nonnull RecordId id,
                          @Nonnull PropertyTemplate template) {
-        this(store, id, template.getName(), template.getType());
+        this(reader, id, template.getName(), template.getType());
     }
 
     private ListRecord getValueList(Segment segment) {
@@ -106,7 +106,7 @@ public class SegmentPropertyState extend
         ListRecord values = getValueList(segment);
         for (int i = 0; i < values.size(); i++) {
             RecordId valueId = values.getEntry(i);
-            String value = store.getReader().readString(valueId);
+            String value = reader.readString(valueId);
             map.put(value, valueId);
         }
 
@@ -186,10 +186,10 @@ public class SegmentPropertyState extend
     @SuppressWarnings("unchecked")
     private <T> T getValue(RecordId id, Type<T> type) {
         if (type == BINARY) {
-            return (T) new SegmentBlob(store, id); // load binaries lazily
+            return (T) reader.readBlob(id); // load binaries lazily
         }
 
-        String value = store.getReader().readString(id);
+        String value = reader.readString(id);
         if (type == STRING || type == URI || type == DATE
                 || type == NAME || type == PATH
                 || type == REFERENCE || type == WEAKREFERENCE) {
@@ -222,7 +222,7 @@ public class SegmentPropertyState extend
         RecordId entry = values.getEntry(index);
 
         if (getType().equals(BINARY) || getType().equals(BINARIES)) {
-            return new SegmentBlob(store, entry).length();
+            return reader.readBlob(entry).length();
         }
 
         return getSegment().readLength(entry);

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java Wed Jun  1 07:48:51 2016
@@ -21,8 +21,6 @@ package org.apache.jackrabbit.oak.segmen
 
 import javax.annotation.Nonnull;
 
-import org.apache.jackrabbit.oak.cache.CacheStats;
-
 public interface SegmentReader {
     @Nonnull
     String readString(@Nonnull RecordId id);
@@ -33,7 +31,18 @@ public interface SegmentReader {
     @Nonnull
     Template readTemplate(@Nonnull RecordId id);
 
-    // FIXME OAK-4373 remove from this interface
     @Nonnull
-    CacheStats getStringCacheStats();
+    SegmentNodeState readNode(@Nonnull RecordId id);
+
+    @Nonnull
+    SegmentNodeState readHeadState();
+
+    @Nonnull
+    SegmentPropertyState readProperty(
+            @Nonnull RecordId id,
+            @Nonnull PropertyTemplate template);
+
+    @Nonnull
+    SegmentBlob readBlob(@Nonnull RecordId id);
+
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java Wed Jun  1 07:48:51 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Supplier;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.http.HttpStore;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+
+public final class SegmentReaders {
+    private SegmentReaders() {}
+
+    @Nonnull
+    public static SegmentReader segmentReader(@Nonnull FileStore store, long stringCacheMB) {
+        return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB);
+    }
+
+    private static Supplier<SegmentWriter> getWriter(final FileStore store) {
+        return new Supplier<SegmentWriter>() {
+            @Override
+            public SegmentWriter get() {
+                return store.getWriter();
+            }
+        };
+    }
+
+    @Nonnull
+    public static SegmentReader segmentReader(@Nonnull MemoryStore store, long stringCacheMB) {
+        return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB);
+    }
+
+    private static Supplier<SegmentWriter> getWriter(final MemoryStore store) {
+        return new Supplier<SegmentWriter>() {
+            @Override
+            public SegmentWriter get() {
+                return store.getWriter();
+            }
+        };
+    }
+
+    @Nonnull
+    public static SegmentReader segmentReader(@Nonnull HttpStore store, long stringCacheMB) {
+        return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB);
+    }
+
+    private static Supplier<SegmentWriter> getWriter(final HttpStore store) {
+        return new Supplier<SegmentWriter>() {
+            @Override
+            public SegmentWriter get() {
+                return store.getWriter();
+            }
+        };
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java Wed Jun  1 07:48:51 2016
@@ -18,38 +18,14 @@
  */
 package org.apache.jackrabbit.oak.segment;
 
-import java.io.Closeable;
 import java.io.IOException;
 
-import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-
 /**
  * The backend storage interface used by the segment node store.
  */
-public interface SegmentStore extends Closeable {
-
-    @Nonnull
-    SegmentTracker getTracker();
-
-    @Nonnull
-    SegmentWriter getWriter();
-
-    @Nonnull
-    SegmentReader getReader();
-
-    /**
-     * Returns the head state.
-     *
-     * @return head state
-     */
-    @Nonnull
-    SegmentNodeState getHead();
-
-    boolean setHead(SegmentNodeState base, SegmentNodeState head);
+public interface SegmentStore {
 
     /**
      * Checks whether the identified segment exists in this store.
@@ -78,25 +54,4 @@ public interface SegmentStore extends Cl
      */
     void writeSegment(SegmentId id, byte[] bytes, int offset, int length) throws IOException;
 
-    void close();
-
-    /**
-     * Read a blob from external storage.
-     *
-     * @param reference blob reference
-     * @return external blob
-     */
-    Blob readBlob(String reference);
-
-    /**
-     * Returns the external BlobStore (if configured) with this store
-     */
-    @CheckForNull
-    BlobStore getBlobStore();
-
-    /**
-     * Triggers removal of segments that are no longer referenceable.
-     */
-    void gc();
-
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed Jun  1 07:48:51 2016
@@ -80,9 +80,6 @@ import org.slf4j.LoggerFactory;
  * {@code WriteOperationHandler} passed to the constructor is thread safe.
  */
 public class SegmentWriter {
-// FIXME OAK-4102: Break cyclic dependency of FileStore and SegmentTracker
-// Improve the way how SegmentWriter instances are created. (OAK-4102)
-
     private static final Logger LOG = LoggerFactory.getLogger(SegmentWriter.class);
 
     static final int BLOCK_SIZE = 1 << 12; // 4kB
@@ -94,6 +91,15 @@ public class SegmentWriter {
     private final SegmentStore store;
 
     @Nonnull
+    private final SegmentReader reader;
+
+    @CheckForNull
+    private final BlobStore blobStore;
+
+    @Nonnull
+    private final SegmentTracker tracker;
+
+    @Nonnull
     private final WriteOperationHandler writeOperationHandler;
 
     /**
@@ -101,11 +107,20 @@ public class SegmentWriter {
      * pointed out in the class comment.
      *
      * @param store      store to write to
+     * @param reader     segment reader for the {@code store}
+     * @param blobStore  the blog store or {@code null} for inlined blobs
+     * @param tracker    segment tracker for {@code store}
      * @param writeOperationHandler  handler for write operations.
      */
     public SegmentWriter(@Nonnull SegmentStore store,
+                         @Nonnull SegmentReader reader,
+                         @Nullable BlobStore blobStore,
+                         @Nonnull SegmentTracker tracker,
                          @Nonnull WriteOperationHandler writeOperationHandler) {
         this.store = checkNotNull(store);
+        this.reader = checkNotNull(reader);
+        this.blobStore = blobStore;
+        this.tracker = checkNotNull(tracker);
         this.writeOperationHandler = checkNotNull(writeOperationHandler);
         this.cacheManager = new WriterCacheManager();
     }
@@ -138,7 +153,7 @@ public class SegmentWriter {
                 return with(writer).writeMap(base, changes);
             }
         });
-        return new MapRecord(store, mapId);
+        return new MapRecord(reader, mapId);
     }
 
     /**
@@ -187,7 +202,7 @@ public class SegmentWriter {
                 return with(writer).writeBlob(blob);
             }
         });
-        return new SegmentBlob(store, blobId);
+        return new SegmentBlob(blobStore, blobId);
     }
 
     /**
@@ -225,11 +240,11 @@ public class SegmentWriter {
                 return with(writer).writeStream(stream);
             }
         });
-        return new SegmentBlob(store, blobId);
+        return new SegmentBlob(blobStore, blobId);
     }
 
     /**
-     * Write a propery.
+     * Write a property.
      * @param state  the property to write
      * @return       the property state written
      * @throws IOException
@@ -243,7 +258,7 @@ public class SegmentWriter {
                 return with(writer).writeProperty(state);
             }
         });
-        return new SegmentPropertyState(store, id, state.getName(), state.getType());
+        return new SegmentPropertyState(reader, id, state.getName(), state.getType());
     }
 
     /**
@@ -260,7 +275,7 @@ public class SegmentWriter {
                 return with(writer).writeNode(state, 0);
             }
         });
-        return new SegmentNodeState(store, nodeId);
+        return new SegmentNodeState(reader, this, nodeId);
     }
 
     /**
@@ -287,7 +302,7 @@ public class SegmentWriter {
                 }
             });
             writeOperationHandler.flush();
-            return new SegmentNodeState(store, nodeId);
+            return new SegmentNodeState(reader, this, nodeId);
         } catch (SegmentWriteOperation.CancelledWriteException ignore) {
             return null;
         }
@@ -344,11 +359,11 @@ public class SegmentWriter {
             if (base != null && base.isDiff()) {
                 Segment segment = base.getSegment();
                 RecordId key = segment.readRecordId(base.getOffset(8));
-                String name = store.getReader().readString(key);
+                String name = reader.readString(key);
                 if (!changes.containsKey(name)) {
                     changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
                 }
-                base = new MapRecord(store, segment.readRecordId(base.getOffset(8, 2)));
+                base = new MapRecord(reader, segment.readRecordId(base.getOffset(8, 2)));
             }
 
             if (base != null && changes.size() == 1) {
@@ -384,7 +399,7 @@ public class SegmentWriter {
                 }
 
                 if (keyId != null) {
-                    entries.add(new MapEntry(store, key, keyId, entry.getValue()));
+                    entries.add(new MapEntry(reader, key, keyId, entry.getValue()));
                 }
             }
             return writeMapBucket(base, entries, 0);
@@ -497,7 +512,7 @@ public class SegmentWriter {
         }
 
         private MapRecord mapRecordOrNull(RecordId id) {
-            return id == null ? null : new MapRecord(store, id);
+            return id == null ? null : new MapRecord(reader, id);
         }
 
         /**
@@ -585,7 +600,7 @@ public class SegmentWriter {
 
             // write as many full bulk segments as possible
             while (pos + Segment.MAX_SEGMENT_SIZE <= data.length) {
-                SegmentId bulkId = store.getTracker().newBulkSegmentId();
+                SegmentId bulkId = tracker.newBulkSegmentId();
                 store.writeSegment(bulkId, data, pos, Segment.MAX_SEGMENT_SIZE);
                 for (int i = 0; i < Segment.MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
                     blockIds.add(new RecordId(bulkId, i));
@@ -626,8 +641,8 @@ public class SegmentWriter {
             }
 
             String reference = blob.getReference();
-            if (reference != null && store.getBlobStore() != null) {
-                String blobId = store.getBlobStore().getBlobId(reference);
+            if (reference != null && blobStore != null) {
+                String blobId = blobStore.getBlobId(reference);
                 if (blobId != null) {
                     return writeBlobId(blobId);
                 } else {
@@ -697,7 +712,6 @@ public class SegmentWriter {
                 return writeValueRecord(n, data);
             }
 
-            BlobStore blobStore = store.getBlobStore();
             if (blobStore != null) {
                 String blobId = blobStore.writeBlob(new SequenceInputStream(
                     new ByteArrayInputStream(data, 0, n), stream));
@@ -712,7 +726,7 @@ public class SegmentWriter {
 
             // Write the data to bulk segments and collect the list of block ids
             while (n != 0) {
-                SegmentId bulkId = store.getTracker().newBulkSegmentId();
+                SegmentId bulkId = tracker.newBulkSegmentId();
                 int len = Segment.align(n, 1 << Segment.RECORD_ALIGN_BITS);
                 LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
                 store.writeSegment(bulkId, data, 0, len);
@@ -893,7 +907,7 @@ public class SegmentWriter {
             }
 
             List<RecordId> ids = newArrayList();
-            Template template = new Template(store, state);
+            Template template = new Template(reader, state);
             if (template.equals(beforeTemplate)) {
                 ids.add(before.getTemplateId());
             } else {

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java Wed Jun  1 07:48:51 2016
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Supplier;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.http.HttpStore;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+
+public final class SegmentWriters {
+    private SegmentWriters() {}
+
+    @Nonnull
+    public static SegmentWriter pooledSegmentWriter(@Nonnull FileStore store,
+                                                    @Nonnull SegmentVersion version,
+                                                    @Nonnull String name,
+                                                    @Nonnull Supplier<Integer> generation) {
+        return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+                new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation));
+    }
+
+    @Nonnull
+    public static SegmentWriter pooledSegmentWriter(@Nonnull MemoryStore store,
+                                                    @Nonnull SegmentVersion version,
+                                                    @Nonnull String name,
+                                                    @Nonnull Supplier<Integer> generation) {
+        return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+                new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation));
+    }
+
+    @Nonnull
+    public static SegmentWriter pooledSegmentWriter(@Nonnull HttpStore store,
+                                                    @Nonnull SegmentVersion version,
+                                                    @Nonnull String name,
+                                                    @Nonnull Supplier<Integer> generation) {
+        return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+                new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation));
+    }
+
+    @Nonnull
+    public static SegmentWriter segmentWriter(@Nonnull FileStore store,
+                                              @Nonnull SegmentVersion version,
+                                              @Nonnull String name,
+                                              int generation) {
+        return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+                new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation));
+    }
+
+    @Nonnull
+    public static SegmentWriter segmentWriter(@Nonnull MemoryStore store,
+                                              @Nonnull SegmentVersion version,
+                                              @Nonnull String name,
+                                              int generation) {
+        return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+                new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation));
+    }
+
+    @Nonnull
+    public static SegmentWriter segmentWriter(@Nonnull HttpStore store,
+                                              @Nonnull SegmentVersion version,
+                                              @Nonnull String name,
+                                              int generation) {
+        return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+                new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation));
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java Wed Jun  1 07:48:51 2016
@@ -59,7 +59,7 @@ public class Template {
     static final String MANY_CHILD_NODES = "";
 
     @Nonnull
-    private final SegmentStore store;
+    private final SegmentReader reader;
 
     /**
      * The {@code jcr:primaryType} property, if present as a single-valued
@@ -90,12 +90,12 @@ public class Template {
     @CheckForNull
     private final String childName;
 
-    Template(@Nonnull SegmentStore store,
+    Template(@Nonnull SegmentReader reader,
              @Nullable PropertyState primaryType,
              @Nullable PropertyState mixinTypes,
              @Nullable  PropertyTemplate[] properties,
              @Nullable String childName) {
-        this.store = store;
+        this.reader = checkNotNull(reader);
         this.primaryType = primaryType;
         this.mixinTypes = mixinTypes;
         if (properties != null) {
@@ -107,8 +107,9 @@ public class Template {
         this.childName = childName;
     }
 
-    Template(@Nonnull SegmentStore store, @Nonnull NodeState state) {
-        this.store = store;
+    Template(@Nonnull SegmentReader reader, @Nonnull NodeState state) {
+        this.reader = checkNotNull(reader);
+        checkNotNull(state);
         PropertyState primary = null;
         PropertyState mixins = null;
         List<PropertyTemplate> templates = Lists.newArrayList();
@@ -198,7 +199,7 @@ public class Template {
         RecordId lid = segment.readRecordId(offset);
         ListRecord props = new ListRecord(lid, properties.length);
         RecordId rid = props.getEntry(index);
-        return new SegmentPropertyState(store, rid, properties[index]);
+        return reader.readProperty(rid, properties[index]);
     }
 
     MapRecord getChildNodeMap(RecordId recordId) {
@@ -206,7 +207,7 @@ public class Template {
         Segment segment = recordId.getSegment();
         int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES;
         RecordId childNodesId = segment.readRecordId(offset);
-        return store.getReader().readMap(childNodesId);
+        return reader.readMap(childNodesId);
     }
 
     public NodeState getChildNode(String name, RecordId recordId) {
@@ -224,7 +225,7 @@ public class Template {
             Segment segment = recordId.getSegment();
             int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES;
             RecordId childNodeId = segment.readRecordId(offset);
-            return new SegmentNodeState(store, childNodeId);
+            return reader.readNode(childNodeId);
         } else {
             return MISSING_NODE;
         }
@@ -241,7 +242,7 @@ public class Template {
             int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES;
             RecordId childNodeId = segment.readRecordId(offset);
             return Collections.singletonList(new MemoryChildNodeEntry(
-                    childName, new SegmentNodeState(store, childNodeId)));
+                    childName, reader.readNode(childNodeId)));
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed Jun  1 07:48:51 2016
@@ -33,10 +33,15 @@ import static java.util.Collections.empt
 import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
 import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
-import static org.apache.jackrabbit.oak.segment.SegmentReaderImpl.DEFAULT_STRING_CACHE_MB;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.segmentWriter;
+import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
 
 import java.io.Closeable;
 import java.io.File;
@@ -54,30 +59,28 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.cache.CacheStats;
-import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentBufferWriter;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
 import org.apache.jackrabbit.oak.segment.SegmentCache;
 import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
 import org.apache.jackrabbit.oak.segment.SegmentId;
@@ -85,7 +88,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentReaderImpl;
+import org.apache.jackrabbit.oak.segment.SegmentReaders;
 import org.apache.jackrabbit.oak.segment.SegmentStore;
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentVersion;
@@ -102,9 +105,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The storage implementation for tar files.
  */
-public class FileStore implements SegmentStore {
-
-    /** Logger instance */
+public class FileStore implements SegmentStore, Closeable {
     private static final Logger log = LoggerFactory.getLogger(FileStore.class);
 
     private static final int MB = 1024 * 1024;
@@ -114,8 +115,6 @@ public class FileStore implements Segmen
 
     private static final String FILE_NAME_FORMAT = "data%05d%s.tar";
 
-    private static final String JOURNAL_FILE_NAME = "journal.log";
-
     private static final String LOCK_FILE_NAME = "repo.lock";
 
     /**
@@ -149,24 +148,14 @@ public class FileStore implements Segmen
 
     private volatile File writeFile;
 
-    private volatile TarWriter writer;
-
-    private final RandomAccessFile journalFile;
+    private volatile TarWriter tarWriter;
 
     private final RandomAccessFile lockFile;
 
     private final FileLock lock;
 
-    /**
-     * The latest head state.
-     */
-    private final AtomicReference<RecordId> head;
-
-    /**
-     * The persisted head of the root journal, used to determine whether the
-     * latest {@link #head} value should be written to the disk.
-     */
-    private final AtomicReference<RecordId> persistedHead;
+    @Nonnull
+    private final TarRevisions revisions;
 
     /**
      * The background flush thread. Automatically flushes the TarMK state
@@ -254,8 +243,6 @@ public class FileStore implements Segmen
 
         private BlobStore blobStore;   // null ->  store blobs inline
 
-        private NodeState root = EMPTY_NODE;
-
         private int maxFileSize = 256;
 
         private int cacheSize;   // 0 -> DEFAULT_MEMORY_CACHE_SIZE
@@ -274,6 +261,8 @@ public class FileStore implements Segmen
             this.directory = directory;
         }
 
+        private TarRevisions revisions;
+
         /**
          * Specify the {@link BlobStore}.
          * @param blobStore
@@ -286,17 +275,6 @@ public class FileStore implements Segmen
         }
 
         /**
-         * Specify the initial root node state for the file store
-         * @param root
-         * @return this instance
-         */
-        @Nonnull
-        public Builder withRoot(@Nonnull NodeState root) {
-            this.root = checkNotNull(root);
-            return this;
-        }
-
-        /**
          * Maximal size of the generated tar files in MB.
          * @param maxFileSize
          * @return this instance
@@ -384,7 +362,7 @@ public class FileStore implements Segmen
 
         @Nonnull
         public Builder withGCOptions(SegmentGCOptions gcOptions) {
-            this.gcOptions = gcOptions;
+            this.gcOptions = checkNotNull(gcOptions);
             return this;
         }
 
@@ -407,34 +385,49 @@ public class FileStore implements Segmen
          */
         @Nonnull
         public FileStore build() throws IOException {
-            return new FileStore(this, false);
+            directory.mkdir();
+            revisions = new TarRevisions(false, directory);
+            FileStore store = new FileStore(this, false);
+            revisions.bind(store, store.getTracker(), initialNode(store));
+            return store;
         }
 
+        @Nonnull
         public ReadOnlyStore buildReadOnly() throws IOException {
-            return new ReadOnlyStore(this);
+            checkState(directory.exists() && directory.isDirectory());
+            revisions = new TarRevisions(true, directory);
+            ReadOnlyStore store = new ReadOnlyStore(this);
+            revisions.bind(store, store.getTracker(), initialNode(store));
+            return store;
         }
 
+        @Nonnull
+        private static Supplier<RecordId> initialNode(final FileStore store) {
+            return new Supplier<RecordId>() {
+                @Override
+                public RecordId get() {
+                    try {
+                        SegmentWriter writer = segmentWriter(store, LATEST_VERSION, "init", 0);
+                        NodeBuilder builder = EMPTY_NODE.builder();
+                        builder.setChildNode("root", EMPTY_NODE);
+                        SegmentNodeState node = writer.writeNode(builder.getNodeState());
+                        writer.flush();
+                        return node.getRecordId();
+                    } catch (IOException e) {
+                        String msg = "Failed to write initial node";
+                        log.error(msg, e);
+                        throw new IllegalStateException(msg, e);
+                    }
+                }
+            };
+        }
     }
 
     private FileStore(Builder builder, boolean readOnly) throws IOException {
         this.version = builder.version;
-
-        if (readOnly) {
-            checkNotNull(builder.directory);
-            checkState(builder.directory.exists() && builder.directory.isDirectory());
-        } else {
-            checkNotNull(builder.directory).mkdirs();
-        }
-
-        // FIXME OAK-4102: Break cyclic dependency of FileStore and SegmentTracker
-        // SegmentTracker and FileStore have a cyclic dependency, which we should
-        // try to break. Here we pass along a not fully initialised instances of the
-        // FileStore to the SegmentTracker, which in turn is in later invoked to write
-        // the initial node state. Notably before this instance is fully initialised!
-        // Once consequence of this is that we cannot reliably determine the current
-        // GC generation while writing the initial head state. See further below.
-
         this.tracker = new SegmentTracker(this);
+        this.revisions = builder.revisions;
+        this.blobStore = builder.blobStore;
 
         // FIXME OAK-4373 refactor cache size configurations
         if (builder.cacheSize < 0) {
@@ -445,34 +438,24 @@ public class FileStore implements Segmen
             this.segmentCache = new SegmentCache(DEFAULT_STRING_CACHE_MB);
         }
         if (builder.cacheSize < 0) {
-            this.segmentReader = new SegmentReaderImpl(this, 0);
+            this.segmentReader = SegmentReaders.segmentReader(this, 0);
         } else if (builder.cacheSize > 0) {
-            this.segmentReader = new SegmentReaderImpl(this, builder.cacheSize);
+            this.segmentReader = SegmentReaders.segmentReader(this, builder.cacheSize);
         } else {
-            this.segmentReader = new SegmentReaderImpl(this);
+            this.segmentReader = SegmentReaders.segmentReader(this, DEFAULT_STRING_CACHE_MB);
         }
-        this.segmentWriter = new SegmentWriter(this,
-                new SegmentBufferWriterPool(this, version, "sys", new Supplier<Integer>() {
+        this.segmentWriter = pooledSegmentWriter(this, version, "sys", new Supplier<Integer>() {
                     @Override
                     public Integer get() {
                         return getGcGeneration();
                     }
-                }));
-        this.blobStore = builder.blobStore;
+                });
         this.directory = builder.directory;
         this.maxFileSize = builder.maxFileSize * MB;
         this.memoryMapping = builder.memoryMapping;
         this.gcMonitor = builder.gcMonitor;
         this.gcOptions = builder.gcOptions;
 
-        if (readOnly) {
-            journalFile = new RandomAccessFile(new File(directory,
-                    JOURNAL_FILE_NAME), "r");
-        } else {
-            journalFile = new RandomAccessFile(new File(directory,
-                    JOURNAL_FILE_NAME), "rw");
-        }
-
         Map<Integer, Map<Character, File>> map = collectFiles(directory);
         this.readers = newArrayListWithCapacity(map.size());
         Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
@@ -501,52 +484,17 @@ public class FileStore implements Segmen
             }
             this.writeFile = new File(directory, String.format(
                     FILE_NAME_FORMAT, writeNumber, "a"));
-            this.writer = new TarWriter(writeFile, stats);
-        }
-
-        RecordId id = null;
-        try (JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME))) {
-            Iterator<String> heads = journalReader.iterator();
-            while (id == null && heads.hasNext()) {
-                String head = heads.next();
-                try {
-                    RecordId last = RecordId.fromString(tracker, head);
-                    SegmentId segmentId = last.getSegmentId();
-                    if (containsSegment(
-                            segmentId.getMostSignificantBits(),
-                            segmentId.getLeastSignificantBits())) {
-                        id = last;
-                    } else {
-                        log.warn("Unable to access revision {}, rewinding...", last);
-                    }
-                } catch (IllegalArgumentException ignore) {
-                    log.warn("Skipping invalid record id {}", head);
-                }
-            }
+            this.tarWriter = new TarWriter(writeFile, stats);
         }
 
-        journalFile.seek(journalFile.length());
-
         if (!readOnly) {
-            lockFile = new RandomAccessFile(
-                    new File(directory, LOCK_FILE_NAME), "rw");
+            lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw");
             lock = lockFile.getChannel().lock();
         } else {
             lockFile = null;
             lock = null;
         }
 
-        if (id != null) {
-            head = new AtomicReference<>(id);
-            persistedHead = new AtomicReference<>(id);
-        } else {
-            NodeBuilder nodeBuilder = EMPTY_NODE.builder();
-            nodeBuilder.setChildNode("root", builder.root);
-            head = new AtomicReference<>(writeNode(
-                builder.root, segmentWriter, new SegmentBufferWriter(this, version, "init", 0)));
-            persistedHead = new AtomicReference<>(null);
-        }
-
         // FIXME OAK-3468 Replace BackgroundThread with Scheduler
         // Externalise these background operations
         if (!readOnly) {
@@ -601,19 +549,8 @@ public class FileStore implements Segmen
         log.debug("TarMK readers {}", this.readers);
     }
 
-    @Nonnull
-    private static RecordId writeNode(NodeState root, SegmentWriter writer,
-                                      SegmentBufferWriter bufferWriter)
-    throws IOException {
-        NodeBuilder nodeBuilder = EMPTY_NODE.builder();
-        nodeBuilder.setChildNode("root", root);
-        SegmentNodeState node = writer.writeNode(nodeBuilder.getNodeState(), bufferWriter, Suppliers.ofInstance(false));
-        assert node != null;
-        return node.getRecordId();
-    }
-
     private int getGcGeneration() {
-        return head.get().getSegment().getGcGeneration();
+        return revisions.getHead().getSegment().getGcGeneration();
     }
 
     @Nonnull
@@ -621,6 +558,12 @@ public class FileStore implements Segmen
         return segmentCache.getCacheStats();
     }
 
+    // FIXME OAK-4373 move access to the cache stats to the segment reader and avoid casting to implementation
+    @Nonnull
+    public CacheStats getStringCacheStats() {
+        return ((CachingSegmentReader)segmentReader).getStringCacheStats();
+    }
+
     public void maybeCompact(boolean cleanup) throws IOException {
         gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
 
@@ -768,7 +711,7 @@ public class FileStore implements Segmen
         return dataFiles;
     }
 
-    public long size() {
+    public final long size() {
         fileStoreLock.readLock().lock();
         try {
             long size = writeFile != null ? writeFile.length() : 0;
@@ -799,8 +742,8 @@ public class FileStore implements Segmen
         fileStoreLock.readLock().lock();
         try {
             int count = 0;
-            if (writer != null) {
-                count += writer.count();
+            if (tarWriter != null) {
+                count += tarWriter.count();
             }
             for (TarReader reader : readers) {
                 count += reader.count();
@@ -818,7 +761,7 @@ public class FileStore implements Segmen
      * @return compaction gain estimate
      */
     CompactionGainEstimate estimateCompactionGain(Supplier<Boolean> stop) {
-        CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop);
+        CompactionGainEstimate estimate = new CompactionGainEstimate(segmentReader.readHeadState(), count(), stop);
         fileStoreLock.readLock().lock();
         try {
             for (TarReader reader : readers) {
@@ -838,52 +781,32 @@ public class FileStore implements Segmen
     }
 
     public void flush() throws IOException {
-        flush(cleanupNeeded.getAndSet(false));
-    }
-
-    public void flush(boolean cleanup) throws IOException {
-        synchronized (persistedHead) {
-            RecordId before = persistedHead.get();
-            RecordId after = head.get();
-
-            if (cleanup || !after.equals(before)) {
-                segmentWriter.flush();
-
+        revisions.flush(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
                 // FIXME OAK-4291: FileStore.flush prone to races leading to corruption
                 // There is a small windows that could lead to a corrupted store:
                 // if we crash right after setting the persisted head but before any delay-flushed
                 // SegmentBufferWriter instance flushes (see SegmentBufferWriterPool.returnWriter())
                 // then that data is lost although it might be referenced from the persisted head already.
-
                 // Need a test case. Possible fix: return a future from flush() and set the persisted head
                 // in the completion handler.
-                writer.flush();
-
-                fileStoreLock.writeLock().lock();
-                try {
-                    log.debug("TarMK journal update {} -> {}", before, after);
-                    journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis()+"\n");
-                    journalFile.getChannel().force(false);
-                    persistedHead.set(after);
-                } finally {
-                    fileStoreLock.writeLock().unlock();
-                }
-
-                if (cleanup) {
-                    // Explicitly give up reference to the previous root state
-                    // otherwise they could block cleanup. See OAK-3347
-                    before = null;
-                    after = null;
-                    pendingRemove.addAll(cleanup());
-                }
+                segmentWriter.flush();
+                tarWriter.flush();
+                return null;
             }
+        });
 
-            // remove all obsolete tar generations
+        if (cleanupNeeded.getAndSet(false)) {
+            pendingRemove.addAll(cleanup());
+        }
+
+        // remove all obsolete tar generations
+        synchronized (pendingRemove) {
             Iterator<File> iterator = pendingRemove.iterator();
             while (iterator.hasNext()) {
                 File file = iterator.next();
-                log.debug("TarMK GC: Attempting to remove old file {}",
-                        file);
+                log.debug("TarMK GC: Attempting to remove old file {}", file);
                 if (!file.exists() || file.delete()) {
                     log.debug("TarMK GC: Removed old file {}", file);
                     iterator.remove();
@@ -1103,7 +1026,7 @@ public class FileStore implements Segmen
         gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
         Stopwatch watch = Stopwatch.createStarted();
 
-        SegmentNodeState before = getHead();
+        SegmentNodeState before = segmentReader.readHeadState();
         long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
                 .getChildNodeCount(Long.MAX_VALUE);
         if (existing > 1) {
@@ -1116,7 +1039,7 @@ public class FileStore implements Segmen
 
         final int newGeneration = getGcGeneration() + 1;
         SegmentBufferWriter bufferWriter = new SegmentBufferWriter(
-                this, version, "c", newGeneration);
+                this, tracker, segmentReader, version, "c", newGeneration);
         Supplier<Boolean> cancel = newCancelCompactionCondition();
         SegmentNodeState after = compact(bufferWriter, before, cancel);
         if (after == null) {
@@ -1130,13 +1053,14 @@ public class FileStore implements Segmen
         try {
             int cycles = 0;
             boolean success = false;
-            while (cycles++ < gcOptions.getRetryCount() && !(success = setHead(before, after))) {
+            while (cycles++ < gcOptions.getRetryCount() &&
+                    !(success = revisions.setHead(before.getRecordId(), after.getRecordId()))) {
                 // Some other concurrent changes have been made.
                 // Rebase (and compact) those changes on top of the
                 // compacted state before retrying to set the head.
                 gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
                     "Compacting these commits. Cycle {}", GC_COUNT, cycles);
-                SegmentNodeState head = getHead();
+                SegmentNodeState head = segmentReader.readHeadState();
                 after = compact(bufferWriter, head, cancel);
                 if (after == null) {
                     gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
@@ -1215,32 +1139,38 @@ public class FileStore implements Segmen
         return segmentWriter.writeNode(node, bufferWriter, cancel);
     }
 
-    private boolean forceCompact(SegmentBufferWriter bufferWriter, Supplier<Boolean> cancel)
-    throws InterruptedException, IOException {
-        if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) {
-            try {
-                SegmentNodeState head = getHead();
-                SegmentNodeState after = compact(bufferWriter, head, cancel);
-                if (after == null) {
-                    gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
-                    return false;
-                } else {
-                    return setHead(head, after);
+    private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
+                                 @Nonnull final Supplier<Boolean> cancel)
+    throws InterruptedException {
+        return revisions.
+            setHead(new Function<RecordId, RecordId>() {
+                @Nullable
+                @Override
+                public RecordId apply(RecordId base) {
+                    try {
+                        SegmentNodeState after = compact(bufferWriter,
+                                segmentReader.readNode(base), cancel);
+                        if (after == null) {
+                            gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+                            return null;
+                        } else {
+                            return after.getRecordId();
+                        }
+                    } catch (IOException e) {
+                        gcMonitor.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
+                        return null;
+                    }
                 }
-            } finally {
-                rwLock.writeLock().unlock();
-            }
-        } else {
-            return false;
-        }
+            },
+            timeout(gcOptions.getLockWaitTime(), SECONDS));
     }
 
     public Iterable<SegmentId> getSegmentIds() {
         fileStoreLock.readLock().lock();
         try {
             List<SegmentId> ids = newArrayList();
-            if (writer != null) {
-                for (UUID uuid : writer.getUUIDs()) {
+            if (tarWriter != null) {
+                for (UUID uuid : tarWriter.getUUIDs()) {
                     ids.add(tracker.getSegmentId(
                             uuid.getMostSignificantBits(),
                             uuid.getLeastSignificantBits()));
@@ -1259,43 +1189,24 @@ public class FileStore implements Segmen
         }
     }
 
-    @Override
     @Nonnull
     public SegmentTracker getTracker() {
         return tracker;
     }
 
-    @Override
     @Nonnull
     public SegmentWriter getWriter() {
         return segmentWriter;
     }
 
-    @Override
     @Nonnull
     public SegmentReader getReader() {
         return segmentReader;
     }
 
-    @Override
-    public SegmentNodeState getHead() {
-        return new SegmentNodeState(this, head.get());
-    }
-
-    // FIXME OAK-4015: Expedite commits from the compactor
-    // use a lock that can expedite important commits like compaction and checkpoints.
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
-    @Override
-    public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
-        rwLock.readLock().lock();
-        try {
-            RecordId id = this.head.get();
-            return id.equals(base.getRecordId())
-                && this.head.compareAndSet(id, head.getRecordId());
-        } finally {
-            rwLock.readLock().unlock();
-        }
+    @Nonnull
+    public TarRevisions getRevisions() {
+        return revisions;
     }
 
     @Override
@@ -1310,12 +1221,13 @@ public class FileStore implements Segmen
         closeAndLogOnFail(diskSpaceThread);
         try {
             flush();
+            revisions.close();
             // FIXME OAK-4291: FileStore.flush prone to races leading to corruption
             // Replace this with a way to "close" the underlying SegmentBufferWriter(s)
             // tracker.getWriter().dropCache();
             fileStoreLock.writeLock().lock();
             try {
-                closeAndLogOnFail(writer);
+                closeAndLogOnFail(tarWriter);
 
                 List<TarReader> list = readers;
                 readers = newArrayList();
@@ -1327,7 +1239,6 @@ public class FileStore implements Segmen
                     lock.release();
                 }
                 closeAndLogOnFail(lockFile);
-                closeAndLogOnFail(journalFile);
             } finally {
                 fileStoreLock.writeLock().unlock();
             }
@@ -1355,10 +1266,10 @@ public class FileStore implements Segmen
             }
         }
 
-        if (writer != null) {
+        if (tarWriter != null) {
             fileStoreLock.readLock().lock();
             try {
-                if (writer.containsEntry(msb, lsb)) {
+                if (tarWriter.containsEntry(msb, lsb)) {
                     return true;
                 }
             } finally {
@@ -1398,23 +1309,23 @@ public class FileStore implements Segmen
 
                             ByteBuffer buffer = reader.readEntry(msb, lsb);
                             if (buffer != null) {
-                                return new Segment(FileStore.this, id, buffer);
+                                return new Segment(tracker, segmentReader, id, buffer);
                             }
                         } catch (IOException e) {
                             log.warn("Failed to read from tar file {}", reader, e);
                         }
                     }
 
-                    if (writer != null) {
+                    if (tarWriter != null) {
                         fileStoreLock.readLock().lock();
                         try {
                             try {
-                                ByteBuffer buffer = writer.readEntry(msb, lsb);
+                                ByteBuffer buffer = tarWriter.readEntry(msb, lsb);
                                 if (buffer != null) {
-                                    return new Segment(FileStore.this, id, buffer);
+                                    return new Segment(tracker, segmentReader, id, buffer);
                                 }
                             } catch (IOException e) {
-                                log.warn("Failed to read from tar file {}", writer, e);
+                                log.warn("Failed to read from tar file {}", tarWriter, e);
                             }
                         } finally {
                             fileStoreLock.readLock().unlock();
@@ -1434,7 +1345,7 @@ public class FileStore implements Segmen
 
                             ByteBuffer buffer = reader.readEntry(msb, lsb);
                             if (buffer != null) {
-                                return new Segment(FileStore.this, id, buffer);
+                                return new Segment(tracker, segmentReader, id, buffer);
                             }
                         } catch (IOException e) {
                             log.warn("Failed to read from tar file {}", reader, e);
@@ -1456,7 +1367,7 @@ public class FileStore implements Segmen
         fileStoreLock.writeLock().lock();
         try {
             int generation = Segment.getGcGeneration(wrap(buffer, offset, length), id.asUUID());
-            long size = writer.writeEntry(
+            long size = tarWriter.writeEntry(
                     id.getMostSignificantBits(),
                     id.getLeastSignificantBits(),
                     buffer, offset, length, generation);
@@ -1478,7 +1389,7 @@ public class FileStore implements Segmen
             } else {
                 data = ByteBuffer.wrap(buffer, offset, length);
             }
-            segmentCache.putSegment(new Segment(this, id, data));
+            segmentCache.putSegment(new Segment(tracker, segmentReader, id, data));
         }
     }
 
@@ -1488,8 +1399,8 @@ public class FileStore implements Segmen
      * @throws IOException
      */
     private void newWriter() throws IOException {
-        if (writer.isDirty()) {
-            writer.close();
+        if (tarWriter.isDirty()) {
+            tarWriter.close();
 
             List<TarReader> list =
                     newArrayListWithCapacity(1 + readers.size());
@@ -1501,25 +1412,21 @@ public class FileStore implements Segmen
             writeFile = new File(
                     directory,
                     String.format(FILE_NAME_FORMAT, writeNumber, "a"));
-            writer = new TarWriter(writeFile, stats);
+            tarWriter = new TarWriter(writeFile, stats);
         }
     }
 
-    @Override
-    public Blob readBlob(String blobId) {
-        if (blobStore != null) {
-            return new BlobStoreBlob(blobStore, blobId);
-        }
-        throw new IllegalStateException("Attempt to read external blob with blobId [" + blobId + "] " +
-                "without specifying BlobStore");
-    }
-
-    @Override
+    /**
+     * @return  the external BlobStore (if configured) with this store, {@code null} otherwise.
+     */
+    @CheckForNull
     public BlobStore getBlobStore() {
         return blobStore;
     }
 
-    @Override
+    /**
+     * Trigger a garbage collection cycle
+     */
     public void gc() {
         compactionThread.trigger();
     }
@@ -1552,9 +1459,7 @@ public class FileStore implements Segmen
     private void setRevision(String rootRevision) {
         fileStoreLock.writeLock().lock();
         try {
-            RecordId id = RecordId.fromString(tracker, rootRevision);
-            head.set(id);
-            persistedHead.set(id);
+            revisions.setHeadId(RecordId.fromString(tracker, rootRevision));
         } finally {
             fileStoreLock.writeLock().unlock();
         }
@@ -1637,11 +1542,6 @@ public class FileStore implements Segmen
         }
 
         @Override
-        public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
-            throw new UnsupportedOperationException("Read Only Store");
-        }
-
-        @Override
         public void writeSegment(SegmentId id, byte[] data,
                 int offset, int length) {
             throw new UnsupportedOperationException("Read Only Store");

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java Wed Jun  1 07:48:51 2016
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.propagate;
+import static com.google.common.base.Throwables.propagateIfInstanceOf;
+import static java.lang.Long.MAX_VALUE;
+import static java.util.concurrent.TimeUnit.DAYS;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.SegmentTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TarRevisions implements Revisions, Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(TarRevisions.class);
+
+    public static final String JOURNAL_FILE_NAME = "journal.log";
+
+    private final boolean readOnly;
+
+    /**
+     * The latest head state.
+     */
+    @Nonnull
+    private final AtomicReference<RecordId> head;
+
+    @Nonnull
+    private final File directory;
+
+    @Nonnull
+    private final RandomAccessFile journalFile;
+
+    /**
+     * The persisted head of the root journal, used to determine whether the
+     * latest {@link #head} value should be written to the disk.
+     */
+    @Nonnull
+    private final AtomicReference<RecordId> persistedHead;
+
+    // FIXME OAK-4015: Expedite commits from the compactor
+    // use a lock that can expedite important commits like compaction and checkpoints.
+    @Nonnull
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    private static class TimeOutOption implements Option {
+        private final long time;
+
+        @Nonnull
+        private final TimeUnit unit;
+
+        TimeOutOption(long time, @Nonnull TimeUnit unit) {
+            this.time = time;
+            this.unit = unit;
+        }
+
+        @Nonnull
+        public static TimeOutOption from(@CheckForNull Option option) {
+            if (option instanceof TimeOutOption) {
+                return (TimeOutOption) option;
+            } else {
+                throw new IllegalArgumentException("Invalid option " + option);
+            }
+        }
+    }
+
+    public static final Option INFINITY = new TimeOutOption(MAX_VALUE, DAYS);
+
+    public static Option timeout(long time, TimeUnit unit) {
+        return new TimeOutOption(time, unit);
+    }
+
+    public TarRevisions(boolean readOnly, @Nonnull File directory)
+    throws IOException {
+        this.readOnly = readOnly;
+        this.directory = checkNotNull(directory);
+        this.journalFile = new RandomAccessFile(new File(directory, JOURNAL_FILE_NAME),
+                readOnly ? "r" : "rw");
+        this.journalFile.seek(journalFile.length());
+        this.head = new AtomicReference<>(null);
+        this.persistedHead = new AtomicReference<>(null);
+    }
+
+    synchronized void bind(@Nonnull SegmentStore store,
+                           @Nonnull SegmentTracker tracker,
+                           @Nonnull Supplier<RecordId> writeInitialNode)
+    throws IOException {
+        if (head.get() == null) {
+            RecordId persistedId = null;
+            try (JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME))) {
+                Iterator<String> entries = journalReader.iterator();
+                while (persistedId == null && entries.hasNext()) {
+                    String entry = entries.next();
+                    try {
+                        RecordId id = RecordId.fromString(tracker, entry);
+                        if (store.containsSegment(id.getSegmentId())) {
+                            persistedId = id;
+                        } else {
+                            LOG.warn("Unable to access revision {}, rewinding...", id);
+                        }
+                    } catch (IllegalArgumentException ignore) {
+                        LOG.warn("Skipping invalid record id {}", entry);
+                    }
+                }
+            }
+
+            if (persistedId == null) {
+                head.set(writeInitialNode.get());
+            } else {
+                persistedHead.set(persistedId);
+                head.set(persistedId);
+            }
+        }
+    }
+
+    private void checkBound() {
+        checkState(head.get() != null, "Revisions not bound to a store");
+    }
+
+    private final Lock flushLock = new ReentrantLock();
+
+    public void flush(@Nonnull Callable<Void> persisted) throws IOException {
+        checkBound();
+        if (flushLock.tryLock()) {
+            try {
+                RecordId before = persistedHead.get();
+                RecordId after = getHead();
+                if (!after.equals(before)) {
+                    persisted.call();
+
+                    LOG.debug("TarMK journal update {} -> {}", before, after);
+                    journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis() + "\n");
+                    journalFile.getChannel().force(false);
+                    persistedHead.set(after);
+                }
+            } catch (Exception e) {
+                propagateIfInstanceOf(e, IOException.class);
+                propagate(e);
+            } finally {
+                flushLock.unlock();
+            }
+        }
+    }
+
+    @Nonnull
+    @Override
+    public RecordId getHead() {
+        checkBound();
+        return head.get();
+    }
+
+    @Override
+    public boolean setHead(
+            @Nonnull RecordId base,
+            @Nonnull RecordId head,
+            @Nonnull Option... options) {
+        checkBound();
+        rwLock.readLock().lock();
+        try {
+            RecordId id = this.head.get();
+            return id.equals(base) && this.head.compareAndSet(id, head);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean setHead(
+            @Nonnull Function<RecordId, RecordId> newHead,
+            @Nonnull Option... options)
+    throws InterruptedException {
+        checkBound();
+        TimeOutOption timeout = getTimeout(options);
+        if (rwLock.writeLock().tryLock(timeout.time, timeout.unit)) {
+            try {
+                RecordId after = newHead.apply(getHead());
+                if (after != null) {
+                    head.set(after);
+                    return true;
+                } else {
+                    return false;
+                }
+            } finally {
+                rwLock.writeLock().unlock();
+            }
+        } else {
+            return false;
+        }
+    }
+
+    @Nonnull
+    private static TimeOutOption getTimeout(@Nonnull Option[] options) {
+        if (options.length == 0) {
+            return TimeOutOption.from(INFINITY);
+        } else if (options.length == 1) {
+            return TimeOutOption.from(options[0]);
+        } else {
+            throw new IllegalArgumentException("Expected zero or one options, got " + options.length);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        journalFile.close();
+    }
+
+    void setHeadId(@Nonnull RecordId headId) {
+        checkState(readOnly, "Cannot set revision on a writable store");
+        head.set(headId);
+        persistedHead.set(headId);
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java Wed Jun  1 07:48:51 2016
@@ -38,7 +38,7 @@ import org.apache.jackrabbit.oak.api.Blo
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.segment.SegmentBlob;
-import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore;
 import org.apache.jackrabbit.oak.segment.file.JournalReader;
@@ -145,7 +145,7 @@ public class ConsistencyChecker {
     private String checkPath(String path, long binLen) {
         try {
             print("Checking {}", path);
-            NodeState root = SegmentNodeStore.builder(store).build().getRoot();
+            NodeState root = SegmentNodeStoreBuilders.builder(store).build().getRoot();
             String parentPath = getParentPath(path);
             String name = getName(path);
             NodeState parent = getNode(root, parentPath);
@@ -173,7 +173,8 @@ public class ConsistencyChecker {
             store.setRevision(revision);
             nodeCount = 0;
             propertyCount = 0;
-            String result = traverse(SegmentNodeStore.builder(store).build().getRoot(), "/", true, binLen);
+            String result = traverse(SegmentNodeStoreBuilders.builder(store).build()
+                    .getRoot(), "/", true, binLen);
             print("Traversed {} nodes and {} properties", nodeCount, propertyCount);
             return result;
         } catch (RuntimeException e) {

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java Wed Jun  1 07:48:51 2016
@@ -81,7 +81,7 @@ public class RevisionHistory {
                 @Nullable @Override
                 public HistoryElement apply(String revision) {
                     store.setRevision(revision);
-                    NodeState node = getNode(store.getHead(), path);
+                    NodeState node = getNode(store.getReader().readHeadState(), path);
                     return new HistoryElement(revision, node);
                 }
         });

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java Wed Jun  1 07:48:51 2016
@@ -18,13 +18,12 @@
  */
 package org.apache.jackrabbit.oak.segment.http;
 
-import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
 import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter;
 
-import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -34,16 +33,14 @@ import java.nio.ByteBuffer;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import com.google.common.base.Suppliers;
 import com.google.common.io.ByteStreams;
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
 import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
 import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentReaderImpl;
+import org.apache.jackrabbit.oak.segment.SegmentReaders;
 import org.apache.jackrabbit.oak.segment.SegmentStore;
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
@@ -55,11 +52,14 @@ public class HttpStore implements Segmen
     private final SegmentTracker tracker = new SegmentTracker(this);
 
     @Nonnull
-    private final SegmentWriter segmentWriter = new SegmentWriter(this,
-            new SegmentBufferWriterPool(this, LATEST_VERSION, "sys"));
+    private final HttpStoreRevisions revisions = new HttpStoreRevisions(this);
 
     @Nonnull
-    private final SegmentReader segmentReader = new SegmentReaderImpl(this);
+    private final SegmentReader segmentReader = SegmentReaders.segmentReader(this, DEFAULT_STRING_CACHE_MB);
+
+    @Nonnull
+    private final SegmentWriter segmentWriter = pooledSegmentWriter(this,
+            LATEST_VERSION, "sys", Suppliers.ofInstance(0));
 
     private final URL base;
 
@@ -72,30 +72,32 @@ public class HttpStore implements Segmen
         this.base = base;
     }
 
-    @Override
     @Nonnull
     public SegmentTracker getTracker() {
         return tracker;
     }
 
-    @Override
     @Nonnull
     public SegmentWriter getWriter() {
         return segmentWriter;
     }
 
-    @Override
     @Nonnull
     public SegmentReader getReader() {
         return segmentReader;
     }
 
+    @Nonnull
+    public Revisions getRevisions() {
+        return revisions;
+    }
+
     /**
      * Builds a simple URLConnection. This method can be extended to add
      * authorization headers if needed.
      * 
      */
-    protected URLConnection get(String fragment) throws MalformedURLException,
+    URLConnection get(String fragment) throws MalformedURLException,
             IOException {
         final URL url;
         if (fragment == null) {
@@ -107,33 +109,6 @@ public class HttpStore implements Segmen
     }
 
     @Override
-    public SegmentNodeState getHead() {
-        try {
-            URLConnection connection = get(null);
-            InputStream stream = connection.getInputStream();
-            try {
-                BufferedReader reader = new BufferedReader(
-                        new InputStreamReader(stream, UTF_8));
-                return new SegmentNodeState(this, RecordId.fromString(tracker, reader.readLine()));
-            } finally {
-                stream.close();
-            }
-        } catch (IllegalArgumentException e) {
-            throw new IllegalStateException(e);
-        } catch (MalformedURLException e) {
-            throw new IllegalStateException(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
-        // TODO throw new UnsupportedOperationException();
-        return true;
-    }
-
-    @Override
     // FIXME OAK-4396: HttpStore.containsSegment throws SNFE instead of returning false for non existing segments
     public boolean containsSegment(SegmentId id) {
         return id.sameStore(this) || readSegment(id) != null;
@@ -147,7 +122,7 @@ public class HttpStore implements Segmen
             InputStream stream = connection.getInputStream();
             try {
                 byte[] data = ByteStreams.toByteArray(stream);
-                return new Segment(this, id, ByteBuffer.wrap(data));
+                return new Segment(tracker, segmentReader, id, ByteBuffer.wrap(data));
             } finally {
                 stream.close();
             }
@@ -176,23 +151,12 @@ public class HttpStore implements Segmen
         }
     }
 
-    @Override
-    public void close() {
-    }
-
-    @Override @CheckForNull
-    public Blob readBlob(String reference) {
-        return null;
-    }
-
-    @Override @CheckForNull
+    /**
+     * @return  {@code null}
+     */
+    @CheckForNull
     public BlobStore getBlobStore() {
         return null;
     }
 
-    @Override
-    public void gc() {
-        // TODO: distributed gc
-    }
-
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java Wed Jun  1 07:48:51 2016
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.http;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URLConnection;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+
+public class HttpStoreRevisions implements Revisions {
+
+    @Nonnull
+    private final HttpStore store;
+
+    public HttpStoreRevisions(@Nonnull HttpStore store) {
+        this.store = store;
+    }
+
+    @Nonnull
+    @Override
+    public RecordId getHead() {
+        try {
+            URLConnection connection = store.get(null);
+            try (
+                InputStream stream = connection.getInputStream();
+                BufferedReader reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
+            ) {
+                return RecordId.fromString(store.getTracker(), reader.readLine());
+            }
+        } catch (IllegalArgumentException | MalformedURLException e) {
+            throw new IllegalStateException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean setHead(
+            @Nonnull RecordId base, @Nonnull RecordId head,
+            @Nonnull Option... options) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setHead(
+            @Nonnull Function<RecordId, RecordId> newHead,
+            @Nonnull Option... options) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java Wed Jun  1 07:48:51 2016
@@ -18,30 +18,28 @@
  */
 package org.apache.jackrabbit.oak.segment.memory;
 
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import com.google.common.base.Suppliers;
 import com.google.common.collect.Maps;
-import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.segment.Revisions;
 import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
 import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentReaderImpl;
+import org.apache.jackrabbit.oak.segment.SegmentReaders;
 import org.apache.jackrabbit.oak.segment.SegmentStore;
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
 
 /**
  * A store used for in-memory operations.
@@ -52,60 +50,44 @@ public class MemoryStore implements Segm
     private final SegmentTracker tracker = new SegmentTracker(this);
 
     @Nonnull
-    private final SegmentWriter segmentWriter = new SegmentWriter(this,
-            new SegmentBufferWriterPool(this, LATEST_VERSION, "sys"));
+    private final MemoryStoreRevisions revisions;
 
     @Nonnull
-    private final SegmentReader segmentReader = new SegmentReaderImpl(this, 16);
+    private final SegmentReader segmentReader;
 
-    private SegmentNodeState head;
+    @Nonnull
+    private final SegmentWriter segmentWriter;
 
     private final ConcurrentMap<SegmentId, Segment> segments =
             Maps.newConcurrentMap();
 
-    public MemoryStore(NodeState root) throws IOException {
-        NodeBuilder builder = EMPTY_NODE.builder();
-        builder.setChildNode("root", root);
-
-        this.head = segmentWriter.writeNode(builder.getNodeState());
-        segmentWriter.flush();
-    }
-
     public MemoryStore() throws IOException {
-        this(EMPTY_NODE);
+        this.revisions = new MemoryStoreRevisions();
+        this.segmentReader = SegmentReaders.segmentReader(this, 16);
+        this.segmentWriter = pooledSegmentWriter(this,
+                LATEST_VERSION, "sys", Suppliers.ofInstance(0));
+        revisions.bind(this);
+        segmentWriter.flush();
     }
 
-    @Override
     @Nonnull
     public SegmentTracker getTracker() {
         return tracker;
     }
 
-    @Override
     @Nonnull
     public SegmentWriter getWriter() {
         return segmentWriter;
     }
 
-    @Override
     @Nonnull
     public SegmentReader getReader() {
         return segmentReader;
     }
 
-    @Override
-    public synchronized SegmentNodeState getHead() {
-        return head;
-    }
-
-    @Override
-    public synchronized boolean setHead(SegmentNodeState base, SegmentNodeState head) {
-        if (this.head.getRecordId().equals(base.getRecordId())) {
-            this.head = head;
-            return true;
-        } else {
-            return false;
-        }
+    @Nonnull
+    public Revisions getRevisions() {
+        return revisions;
     }
 
     @Override
@@ -128,27 +110,20 @@ public class MemoryStore implements Segm
         ByteBuffer buffer = ByteBuffer.allocate(length);
         buffer.put(data, offset, length);
         buffer.rewind();
-        Segment segment = new Segment(this, id, buffer);
+        Segment segment = new Segment(tracker, segmentReader, id, buffer);
         if (segments.putIfAbsent(id, segment) != null) {
             throw new IOException("Segment override: " + id);
         }
     }
 
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public Blob readBlob(String reference) {
-        return null;
-    }
-
-    @Override
+    /**
+     * @return  {@code null}
+     */
+    @CheckForNull
     public BlobStore getBlobStore() {
         return null;
     }
 
-    @Override
     public void gc() {
         System.gc();
         segments.keySet().retainAll(tracker.getReferencedSegmentIds());

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java Wed Jun  1 07:48:51 2016
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.memory;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+
+public class MemoryStoreRevisions implements Revisions {
+    private RecordId head;
+
+    public void bind(MemoryStore store) throws IOException {
+        if (head == null) {
+            NodeBuilder builder = EMPTY_NODE.builder();
+            builder.setChildNode("root", EMPTY_NODE);
+            head = store.getWriter().writeNode(builder.getNodeState()).getRecordId();
+            store.getWriter().flush();
+        }
+    }
+
+    private void checkBound() {
+        checkState(head != null, "Revisions not bound to a store");
+    }
+
+    @Nonnull
+    @Override
+    public synchronized RecordId getHead() {
+        checkBound();
+        return head;
+    }
+
+    @Override
+    public synchronized boolean setHead(
+            @Nonnull RecordId base, @Nonnull RecordId head,
+            @Nonnull Option... options) {
+        checkBound();
+        if (this.head.equals(base)) {
+            this.head = head;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean setHead(
+            @Nonnull Function<RecordId, RecordId> newHead,
+            @Nonnull Option... options) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+}