You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/04/20 12:03:32 UTC

svn commit: r1740085 - in /jackrabbit/oak/trunk/oak-segment-next/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: mduerig
Date: Wed Apr 20 10:03:32 2016
New Revision: 1740085

URL: http://svn.apache.org/viewvc?rev=1740085&view=rev
Log:
OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
* Move SegmentBufferWriterPool to top level
* Allocate one SegmentBufferWriter per call to SegmentWriter
* Bind GC generation to SegmentBufferWriter thus there is no need anymore to flush writers on compaction
* Enable preCompactionReferences test as this one now passes

Added:
    jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java
Modified:
    jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java
    jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
    jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java

Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Apr 20 10:03:32 2016
@@ -23,7 +23,9 @@ import static com.google.common.base.Pre
 import static com.google.common.collect.Lists.newArrayListWithCapacity;
 import static com.google.common.collect.Maps.newConcurrentMap;
 import static java.lang.Boolean.getBoolean;
+import static java.lang.Integer.parseInt;
 import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentVersion.V_11;
 import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
 
@@ -36,6 +38,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
 import javax.annotation.CheckForNull;
@@ -47,6 +50,8 @@ import org.apache.commons.io.HexDump;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.json.JsonObject;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
 
@@ -254,9 +259,10 @@ public class Segment {
         }
     }
 
-    Segment(SegmentTracker tracker, byte[] buffer) {
+    Segment(SegmentTracker tracker, byte[] buffer, String info) {
         this.tracker = checkNotNull(tracker);
         this.id = tracker.newDataSegmentId();
+        this.info = info;
         if (tracker.getStringCache() == null) {
             strings = newConcurrentMap();
             stringCache = null;
@@ -322,6 +328,8 @@ public class Segment {
                 << RECORD_ALIGN_BITS;
     }
 
+    private volatile String info;
+
     /**
      * Returns the segment meta data of this segment or {@code null} if none is present.
      * <p>
@@ -339,11 +347,10 @@ public class Segment {
      */
     @CheckForNull
     public String getSegmentInfo() {
-        if (getRootCount() == 0) {
-            return null;
-        } else {
-            return readString(getRootOffset(0));
+        if (info == null && getRefCount() != 0) {
+            info = readString(getRootOffset(0));
         }
+        return info;
     }
 
     SegmentId getRefId(int index) {
@@ -735,4 +742,28 @@ public class Segment {
         return string.toString();
     }
 
+    private volatile int gcGen = -1;
+
+    // FIXME OAK-3348 improve generation handling
+    public int getGcGen() {
+        if (gcGen < 0) {
+            if (isDataSegmentId(id.getLeastSignificantBits())) {
+                String info = getSegmentInfo();
+                if (info != null) {
+                    JsopTokenizer tokenizer = new JsopTokenizer(info);
+                    tokenizer.read('{');
+                    Map<String, String> properties = JsonObject.create(tokenizer).getProperties();
+                    int gen = parseInt(properties.get("gc"));
+                    if (properties.get("wid").contains("c-")) {
+                        gen++;
+                    }
+                    gcGen = gen;
+                    return gcGen;
+                }
+            }
+            gcGen = Integer.MAX_VALUE;
+            return gcGen;
+        }
+        return gcGen;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java Wed Apr 20 10:03:32 2016
@@ -34,6 +34,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ID_BYTES;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.SEGMENT_REFERENCE_LIMIT;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.align;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -89,6 +90,8 @@ class SegmentBufferWriter {
 
     private final SegmentTracker tracker;
 
+    private final int generation;
+
     /**
      * The segment write buffer, filled from the end to the beginning
      * (see OAK-629).
@@ -117,10 +120,15 @@ class SegmentBufferWriter {
                 : wid);
 
         this.tracker = store.getTracker();
+        this.generation = tracker.getCompactionMap().getGeneration();
         this.buffer = createNewBuffer(version);
         newSegment(this.wid);
     }
 
+    int getGeneration() {
+        return generation;
+    }
+
     /**
      * Allocate a new segment and write the segment meta data.
      * The segment meta data is a string of the format {@code "{wid=W,sno=S,gc=G,t=T}"}
@@ -137,12 +145,11 @@ class SegmentBufferWriter {
      * @param wid  the writer id
      */
     private void newSegment(String wid) throws IOException {
-        this.segment = new Segment(tracker, buffer);
         String metaInfo = "{\"wid\":\"" + wid + '"' +
                 ",\"sno\":" + tracker.getNextSegmentNo() +
-                ",\"gc\":" + tracker.getCompactionMap().getGeneration() +
+                ",\"gc\":" + generation +
                 ",\"t\":" + currentTimeMillis() + "}";
-
+        this.segment = new Segment(tracker, buffer, metaInfo);
         byte[] data = metaInfo.getBytes(UTF_8);
         newValueWriter(data.length, data).write(this);
     }
@@ -198,7 +205,37 @@ class SegmentBufferWriter {
         buffer[position++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
     }
 
+    // FIXME OAK-3348 disable/remove this in production
+    private void checkGCGen(SegmentId id) {
+        try {
+            if (isDataSegmentId(id.getLeastSignificantBits())) {
+                if (id.getSegment().getGcGen() < generation && !isCompactionMap(id)) {
+                    LOG.warn("Detected reference from {} to segment {} from a previous gc generation.",
+                        info(this.segment), info(id.getSegment()), new Exception());
+                }
+            }
+        } catch (SegmentNotFoundException snfe) {
+            LOG.warn("Detected reference from {} to non existing segment {}",
+                info(this.segment), id, snfe);
+        }
+    }
+
+    private static boolean isCompactionMap(SegmentId id) {
+        String info = id.getSegment().getSegmentInfo();
+        return info != null && info.contains("cm-");
+    }
+
+    private static String info(Segment segment) {
+        String info = segment.getSegmentId().toString();
+        if (isDataSegmentId(segment.getSegmentId().getLeastSignificantBits())) {
+            info += (" " + segment.getSegmentInfo());
+        }
+        return info;
+    }
+
     private int getSegmentRef(SegmentId segmentId) {
+        checkGCGen(segmentId);
+
         int refCount = segment.getRefCount();
         if (refCount > SEGMENT_REFERENCE_LIMIT) {
             throw new SegmentOverflowException(

Added: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java?rev=1740085&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java (added)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java Wed Apr 20 10:03:32 2016
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * FIXME OAK-3348 document
+ */
+class SegmentBufferWriterPool {
+    private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+    private final Set<SegmentBufferWriter> borrowed = newHashSet();
+    private final Set<SegmentBufferWriter> disposed = newHashSet();
+    private final SegmentStore store;
+    private final SegmentVersion version;
+    private final String wid;
+
+    private short writerId = -1;
+
+    SegmentBufferWriterPool(SegmentStore store, SegmentVersion version, String wid) {
+        this.store = store;
+        this.version = version;
+        this.wid = wid;
+    }
+
+    void flush() throws IOException {
+        List<SegmentBufferWriter> toFlush = newArrayList();
+        synchronized (this) {
+            toFlush.addAll(writers.values());
+            toFlush.addAll(disposed);
+            writers.clear();
+            disposed.clear();
+            borrowed.clear();
+        }
+        // Call flush from outside a synchronized context to avoid
+        // deadlocks of that method calling SegmentStore.writeSegment
+        for (SegmentBufferWriter writer : toFlush) {
+            writer.flush();
+        }
+    }
+
+    synchronized SegmentBufferWriter borrowWriter(Object key) throws IOException {
+        SegmentBufferWriter writer = writers.remove(key);
+        if (writer == null) {
+            writer = new SegmentBufferWriter(store, version, getWriterId(wid));
+        } else if (writer.getGeneration() != store.getTracker().getCompactionMap().getGeneration()) {  // FIXME OAK-3348 improve generation tracking
+            disposed.add(writer);
+            writer = new SegmentBufferWriter(store, version, getWriterId(wid));
+        }
+        borrowed.add(writer);
+        return writer;
+    }
+
+    synchronized void returnWriter(Object key, SegmentBufferWriter writer) throws IOException {
+        if (borrowed.remove(writer)) {
+            writers.put(key, writer);
+        } else {
+            // Defer flush this writer as it was borrowed while flush() was called.
+            disposed.add(writer);
+        }
+    }
+
+    private synchronized String getWriterId(String wid) {
+        if (++writerId > 9999) {
+            writerId = 0;
+        }
+        // Manual padding seems to be fastest here
+        if (writerId < 10) {
+            return wid + ".000" + writerId;
+        } else if (writerId < 100) {
+            return wid + ".00" + writerId;
+        } else if (writerId < 1000) {
+            return wid + ".0" + writerId;
+        } else {
+            return wid + "." + writerId;
+        }
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Apr 20 10:03:32 2016
@@ -29,10 +29,7 @@ import static com.google.common.collect.
 import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
 import static com.google.common.collect.Lists.partition;
 import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
 import static com.google.common.io.ByteStreams.read;
-import static com.google.common.io.Closeables.close;
-import static java.lang.String.valueOf;
 import static java.lang.Thread.currentThread;
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyMap;
@@ -66,15 +63,14 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import javax.jcr.PropertyType;
 
+import com.google.common.io.Closeables;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.RecordWriters.RecordWriter;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
@@ -84,14 +80,13 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Converts nodes, properties, and values to records, which are written to segments.
+ * FIXME OAK-3348 doc thread safety properties
  */
 public class SegmentWriter {
     private static final Logger LOG = LoggerFactory.getLogger(SegmentWriter.class);
 
     static final int BLOCK_SIZE = 1 << 12; // 4kB
 
-    private final SegmentBufferWriterPool segmentBufferWriterPool = new SegmentBufferWriterPool();
-
     private static final int STRING_RECORDS_CACHE_SIZE = Integer.getInteger(
             "oak.segment.writer.stringsCacheSize", 15000);
 
@@ -99,7 +94,7 @@ public class SegmentWriter {
      * Cache of recently stored string records, used to avoid storing duplicates
      * of frequently occurring data.
      */
-    final Map<String, RecordId> stringCache = newItemsCache(
+    private final Map<String, RecordId> stringCache = newItemsCache(
             STRING_RECORDS_CACHE_SIZE);
 
     private static final int TPL_RECORDS_CACHE_SIZE = Integer.getInteger(
@@ -109,10 +104,19 @@ public class SegmentWriter {
      * Cache of recently stored template records, used to avoid storing
      * duplicates of frequently occurring data.
      */
-    final Map<Template, RecordId> templateCache = newItemsCache(TPL_RECORDS_CACHE_SIZE);
+    private final Map<Template, RecordId> templateCache = newItemsCache(TPL_RECORDS_CACHE_SIZE);
+
+    private final SegmentStore store;
+
+    /**
+     * Version of the segment storage format.
+     */
+    private final SegmentVersion version;
+
+    private final SegmentBufferWriterPool segmentBufferWriterPool;
 
     private static final <T> Map<T, RecordId> newItemsCache(final int size) {
-        final boolean disabled = size <= 0;
+        final boolean disabled = true;  // FIXME OAK-3348 re-enable caches but make generation part of cache key to avoid backrefs
         final int safeSize = size <= 0 ? 0 : (int) (size * 1.2);
         return new LinkedHashMap<T, RecordId>(safeSize, 0.9f, true) {
             @Override
@@ -140,15 +144,6 @@ public class SegmentWriter {
         };
     }
 
-    private final SegmentStore store;
-
-    /**
-     * Version of the segment storage format.
-     */
-    private final SegmentVersion version;
-
-    private final String wid;
-
     /**
      * @param store     store to write to
      * @param version   segment version to write
@@ -157,744 +152,801 @@ public class SegmentWriter {
     public SegmentWriter(SegmentStore store, SegmentVersion version, String wid) {
         this.store = store;
         this.version = version;
-        this.wid = wid;
+        this.segmentBufferWriterPool = new SegmentBufferWriterPool(store, version, wid);
     }
 
     public void flush() throws IOException {
         segmentBufferWriterPool.flush();
     }
 
+    // FIXME OAK-3348 this is a hack and probably prone to races: replace with making the tacker allocate a new writer
     public void dropCache() {
         stringCache.clear();
         templateCache.clear();
     }
 
     MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) throws IOException {
-        if (base != null && base.isDiff()) {
-            Segment segment = base.getSegment();
-            RecordId key = segment.readRecordId(base.getOffset(8));
-            String name = readString(key);
-            if (!changes.containsKey(name)) {
-                changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
-            }
-            base = new MapRecord(segment.readRecordId(base.getOffset(8, 2)));
-        }
-
-        if (base != null && changes.size() == 1) {
-            Map.Entry<String, RecordId> change =
-                changes.entrySet().iterator().next();
-            RecordId value = change.getValue();
-            if (value != null) {
-                MapEntry entry = base.getEntry(change.getKey());
-                if (entry != null) {
-                    if (value.equals(entry.getValue())) {
-                        return base;
-                    } else {
-                        return writeRecord(newMapBranchWriter(entry.getHash(),
-                                asList(entry.getKey(), value, base.getRecordId())));
-                    }
-                }
-            }
+        Writer writer = new Writer();
+        try {
+            return writer.writeMap(base, changes);
+        } finally {
+            writer.close();
         }
+    }
 
-        List<MapEntry> entries = newArrayList();
-        for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
-            String key = entry.getKey();
+    public RecordId writeList(List<RecordId> list) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeList(list);
+        } finally {
+            writer.close();
+        }
+    }
 
-            RecordId keyId = null;
-            if (base != null) {
-                MapEntry e = base.getEntry(key);
-                if (e != null) {
-                    keyId = e.getKey();
-                }
-            }
-            if (keyId == null && entry.getValue() != null) {
-                keyId = writeString(key);
-            }
+    public RecordId writeString(String string) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeString(string);
+        } finally {
+            writer.close();
+        }
+    }
 
-            if (keyId != null) {
-                entries.add(new MapEntry(key, keyId, entry.getValue()));
-            }
+    SegmentBlob writeBlob(Blob blob) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeBlob(blob);
+        } finally {
+            writer.close();
         }
-        return writeMapBucket(base, entries, 0);
     }
 
-    private MapRecord writeMapLeaf(int level, Collection<MapEntry> entries) throws IOException {
-        checkNotNull(entries);
-        int size = entries.size();
-        checkElementIndex(size, MapRecord.MAX_SIZE);
-        checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
-        checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
-        return writeRecord(newMapLeafWriter(level, entries));
+    /**
+     * Writes a block record containing the given block of bytes.
+     *
+     * @param bytes source buffer
+     * @param offset offset within the source buffer
+     * @param length number of bytes to write
+     * @return block record identifier
+     */
+    RecordId writeBlock(byte[] bytes, int offset, int length) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeBlock(bytes, offset, length);
+        } finally {
+            writer.close();
+        }
     }
 
-    private MapRecord writeMapBranch(int level, int size, MapRecord[] buckets) throws IOException {
-        int bitmap = 0;
-        List<RecordId> bucketIds = newArrayListWithCapacity(buckets.length);
-        for (int i = 0; i < buckets.length; i++) {
-            if (buckets[i] != null) {
-                bitmap |= 1L << i;
-                bucketIds.add(buckets[i].getRecordId());
-            }
+    SegmentBlob writeExternalBlob(String blobId) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeExternalBlob(blobId);
+        } finally {
+            writer.close();
         }
-        return writeRecord(newMapBranchWriter(level, size, bitmap, bucketIds));
     }
 
-    private MapRecord writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level) throws IOException {
-        // when no changed entries, return the base map (if any) as-is
-        if (entries == null || entries.isEmpty()) {
-            if (base != null) {
-                return base;
-            } else if (level == 0) {
-                return writeRecord(newMapLeafWriter());
-            } else {
-                return null;
-            }
+    SegmentBlob writeLargeBlob(long length, List<RecordId> list) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeLargeBlob(length, list);
+        } finally {
+            writer.close();
         }
+    }
 
-        // when no base map was given, write a fresh new map
-        if (base == null) {
-            // use leaf records for small maps or the last map level
-            if (entries.size() <= BUCKETS_PER_LEVEL
-                    || level == MapRecord.MAX_NUMBER_OF_LEVELS) {
-                return writeMapLeaf(level, entries);
-            }
+    /**
+     * Writes a stream value record. The given stream is consumed <em>and closed</em> by
+     * this method.
+     *
+     * @param stream stream to be written
+     * @return blob for the passed {@code stream}
+     * @throws IOException if the input stream could not be read or the output could not be written
+     */
+    public SegmentBlob writeStream(InputStream stream) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeStream(stream);
+        } finally {
+            writer.close();
+        }
+    }
 
-            // write a large map by dividing the entries into buckets
-            MapRecord[] buckets = new MapRecord[BUCKETS_PER_LEVEL];
-            List<List<MapEntry>> changes = splitToBuckets(entries, level);
-            for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
-                buckets[i] = writeMapBucket(null, changes.get(i), level + 1);
-            }
+    public SegmentNodeState writeNode(NodeState state) throws IOException {
+        Writer writer = new Writer();
+        try {
+            return writer.writeNode(state);
+        } finally {
+            writer.close();
+        }
+    }
+
+    // FIXME OAK-3348 document: not thread safe
+    private final class Writer {
+        private final SegmentBufferWriter writer;
+        private final int key = currentThread().hashCode();
 
-            // combine the buckets into one big map
-            return writeMapBranch(level, entries.size(), buckets);
+        private Writer() throws IOException {
+            writer = segmentBufferWriterPool.borrowWriter(key);
         }
 
-        // if the base map is small, update in memory and write as a new map
-        if (base.isLeaf()) {
-            Map<String, MapEntry> map = newHashMap();
-            for (MapEntry entry : base.getEntries()) {
-                map.put(entry.getName(), entry);
-            }
-            for (MapEntry entry : entries) {
-                if (entry.getValue() != null) {
-                    map.put(entry.getName(), entry);
-                } else {
-                    map.remove(entry.getName());
+        private void close() throws IOException {
+            // Not implementing Closeable because we are not idempotent
+            segmentBufferWriterPool.returnWriter(key, writer);
+        }
+
+        private MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) throws IOException {
+            if (base != null && base.isDiff()) {
+                Segment segment = base.getSegment();
+                RecordId key = segment.readRecordId(base.getOffset(8));
+                String name = readString(key);
+                if (!changes.containsKey(name)) {
+                    changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
                 }
+                base = new MapRecord(segment.readRecordId(base.getOffset(8, 2)));
             }
-            return writeMapBucket(null, map.values(), level);
-        }
 
-        // finally, the if the base map is large, handle updates per bucket
-        int newSize = 0;
-        int newCount = 0;
-        MapRecord[] buckets = base.getBuckets();
-        List<List<MapEntry>> changes = splitToBuckets(entries, level);
-        for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
-            buckets[i] = writeMapBucket(buckets[i], changes.get(i), level + 1);
-            if (buckets[i] != null) {
-                newSize += buckets[i].size();
-                newCount++;
+            if (base != null && changes.size() == 1) {
+                Map.Entry<String, RecordId> change =
+                    changes.entrySet().iterator().next();
+                RecordId value = change.getValue();
+                if (value != null) {
+                    MapEntry entry = base.getEntry(change.getKey());
+                    if (entry != null) {
+                        if (value.equals(entry.getValue())) {
+                            return base;
+                        } else {
+                            return newMapBranchWriter(entry.getHash(), asList(entry.getKey(),
+                                value, base.getRecordId())).write(writer);
+                        }
+                    }
+                }
             }
-        }
 
-        // OAK-654: what if the updated map is smaller?
-        if (newSize > BUCKETS_PER_LEVEL) {
-            return writeMapBranch(level, newSize, buckets);
-        } else if (newCount <= 1) {
-            // up to one bucket contains entries, so return that as the new map
-            for (MapRecord bucket : buckets) {
-                if (bucket != null) {
-                    return bucket;
+            List<MapEntry> entries = newArrayList();
+            for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
+                String key = entry.getKey();
+
+                RecordId keyId = null;
+                if (base != null) {
+                    MapEntry e = base.getEntry(key);
+                    if (e != null) {
+                        keyId = e.getKey();
+                    }
+                }
+                if (keyId == null && entry.getValue() != null) {
+                    keyId = writeString(key);
+                }
+
+                if (keyId != null) {
+                    entries.add(new MapEntry(key, keyId, entry.getValue()));
                 }
             }
-            // no buckets remaining, return empty map
-            return writeMapBucket(null, null, level);
-        } else {
-            // combine all remaining entries into a leaf record
-            List<MapEntry> list = newArrayList();
-            for (MapRecord bucket : buckets) {
-                if (bucket != null) {
-                    addAll(list, bucket.getEntries());
+            return writeMapBucket(base, entries, 0);
+        }
+
+        private MapRecord writeMapLeaf(int level, Collection<MapEntry> entries) throws IOException {
+            checkNotNull(entries);
+            int size = entries.size();
+            checkElementIndex(size, MapRecord.MAX_SIZE);
+            checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
+            checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
+            return newMapLeafWriter(level, entries).write(writer);
+        }
+
+        private MapRecord writeMapBranch(int level, int size, MapRecord... buckets) throws IOException {
+            int bitmap = 0;
+            List<RecordId> bucketIds = newArrayListWithCapacity(buckets.length);
+            for (int i = 0; i < buckets.length; i++) {
+                if (buckets[i] != null) {
+                    bitmap |= 1L << i;
+                    bucketIds.add(buckets[i].getRecordId());
                 }
             }
-            return writeMapLeaf(level, list);
+            return newMapBranchWriter(level, size, bitmap, bucketIds).write(writer);
         }
-    }
 
-    /**
-     * Writes a list record containing the given list of record identifiers.
-     *
-     * @param list list of record identifiers
-     * @return list record identifier
-     */
-    public RecordId writeList(List<RecordId> list) throws IOException {
-        checkNotNull(list);
-        checkArgument(!list.isEmpty());
-        List<RecordId> thisLevel = list;
-        while (thisLevel.size() > 1) {
-            List<RecordId> nextLevel = newArrayList();
-            for (List<RecordId> bucket :
-                partition(thisLevel, ListRecord.LEVEL_SIZE)) {
-                if (bucket.size() > 1) {
-                    nextLevel.add(writeListBucket(bucket));
+        private MapRecord writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level)
+                throws IOException {
+            // when no changed entries, return the base map (if any) as-is
+            if (entries == null || entries.isEmpty()) {
+                if (base != null) {
+                    return base;
+                } else if (level == 0) {
+                    return newMapLeafWriter().write(writer);
                 } else {
-                    nextLevel.add(bucket.get(0));
+                    return null;
                 }
             }
-            thisLevel = nextLevel;
-        }
-        return thisLevel.iterator().next();
-    }
 
-    private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
-        checkArgument(bucket.size() > 1);
-        return writeRecord(newListBucketWriter(bucket));
-    }
+            // when no base map was given, write a fresh new map
+            if (base == null) {
+                // use leaf records for small maps or the last map level
+                if (entries.size() <= BUCKETS_PER_LEVEL
+                    || level == MapRecord.MAX_NUMBER_OF_LEVELS) {
+                    return writeMapLeaf(level, entries);
+                }
 
-    private static List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
-        List<MapEntry> empty = null;
-        int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
-        int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
+                // write a large map by dividing the entries into buckets
+                MapRecord[] buckets = new MapRecord[BUCKETS_PER_LEVEL];
+                List<List<MapEntry>> changes = splitToBuckets(entries, level);
+                for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
+                    buckets[i] = writeMapBucket(null, changes.get(i), level + 1);
+                }
 
-        List<List<MapEntry>> buckets =
-                newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, empty));
-        for (MapEntry entry : entries) {
-            int index = (entry.getHash() >> shift) & mask;
-            List<MapEntry> bucket = buckets.get(index);
-            if (bucket == null) {
-                bucket = newArrayList();
-                buckets.set(index, bucket);
+                // combine the buckets into one big map
+                return writeMapBranch(level, entries.size(), buckets);
             }
-            bucket.add(entry);
-        }
-        return buckets;
-    }
 
-    private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
-        long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
-        return writeRecord(newValueWriter(blocks, len));
-    }
+            // if the base map is small, update in memory and write as a new map
+            if (base.isLeaf()) {
+                Map<String, MapEntry> map = newHashMap();
+                for (MapEntry entry : base.getEntries()) {
+                    map.put(entry.getName(), entry);
+                }
+                for (MapEntry entry : entries) {
+                    if (entry.getValue() != null) {
+                        map.put(entry.getName(), entry);
+                    } else {
+                        map.remove(entry.getName());
+                    }
+                }
+                return writeMapBucket(null, map.values(), level);
+            }
 
-    private RecordId writeValueRecord(int length, byte[] data) throws IOException {
-        checkArgument(length < Segment.MEDIUM_LIMIT);
-        return writeRecord(newValueWriter(length, data));
-    }
+            // finally, the if the base map is large, handle updates per bucket
+            int newSize = 0;
+            int newCount = 0;
+            MapRecord[] buckets = base.getBuckets();
+            List<List<MapEntry>> changes = splitToBuckets(entries, level);
+            for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
+                buckets[i] = writeMapBucket(buckets[i], changes.get(i), level + 1);
+                if (buckets[i] != null) {
+                    newSize += buckets[i].size();
+                    newCount++;
+                }
+            }
 
-    /**
-     * Writes a string value record.
-     *
-     * @param string string to be written
-     * @return value record identifier
-     */
-    public RecordId writeString(String string) throws IOException {
-        RecordId id = stringCache.get(string);
-        if (id != null) {
-            return id; // shortcut if the same string was recently stored
+            // OAK-654: what if the updated map is smaller?
+            if (newSize > BUCKETS_PER_LEVEL) {
+                return writeMapBranch(level, newSize, buckets);
+            } else if (newCount <= 1) {
+                // up to one bucket contains entries, so return that as the new map
+                for (MapRecord bucket : buckets) {
+                    if (bucket != null) {
+                        return bucket;
+                    }
+                }
+                // no buckets remaining, return empty map
+                return writeMapBucket(null, null, level);
+            } else {
+                // combine all remaining entries into a leaf record
+                List<MapEntry> list = newArrayList();
+                for (MapRecord bucket : buckets) {
+                    if (bucket != null) {
+                        addAll(list, bucket.getEntries());
+                    }
+                }
+                return writeMapLeaf(level, list);
+            }
         }
 
-        byte[] data = string.getBytes(UTF_8);
+        /**
+         * Writes a list record containing the given list of record identifiers.
+         *
+         * @param list list of record identifiers
+         * @return list record identifier
+         */
+        private RecordId writeList(List<RecordId> list) throws IOException {
+            checkNotNull(list);
+            checkArgument(!list.isEmpty());
+            List<RecordId> thisLevel = list;
+            while (thisLevel.size() > 1) {
+                List<RecordId> nextLevel = newArrayList();
+                for (List<RecordId> bucket :
+                    partition(thisLevel, ListRecord.LEVEL_SIZE)) {
+                    if (bucket.size() > 1) {
+                        nextLevel.add(writeListBucket(bucket));
+                    } else {
+                        nextLevel.add(bucket.get(0));
+                    }
+                }
+                thisLevel = nextLevel;
+            }
+            return thisLevel.iterator().next();
+        }
 
-        if (data.length < Segment.MEDIUM_LIMIT) {
-            // only cache short strings to avoid excessive memory use
-            id = writeValueRecord(data.length, data);
-            stringCache.put(string, id);
-            return id;
+        private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
+            checkArgument(bucket.size() > 1);
+            return newListBucketWriter(bucket).write(writer);
         }
 
-        int pos = 0;
-        List<RecordId> blockIds = newArrayListWithExpectedSize(
-            data.length / BLOCK_SIZE + 1);
+        private List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
+            int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
+            int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
 
-        // write as many full bulk segments as possible
-        while (pos + MAX_SEGMENT_SIZE <= data.length) {
-            SegmentId bulkId = store.getTracker().newBulkSegmentId();
-            store.writeSegment(bulkId, data, pos, MAX_SEGMENT_SIZE);
-            for (int i = 0; i < MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
-                blockIds.add(new RecordId(bulkId, i));
+            List<List<MapEntry>> buckets =
+                newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, (List<MapEntry>) null));
+            for (MapEntry entry : entries) {
+                int index = (entry.getHash() >> shift) & mask;
+                List<MapEntry> bucket = buckets.get(index);
+                if (bucket == null) {
+                    bucket = newArrayList();
+                    buckets.set(index, bucket);
+                }
+                bucket.add(entry);
             }
-            pos += MAX_SEGMENT_SIZE;
+            return buckets;
         }
 
-        // inline the remaining data as block records
-        while (pos < data.length) {
-            int len = Math.min(BLOCK_SIZE, data.length - pos);
-            blockIds.add(writeBlock(data, pos, len));
-            pos += len;
+        private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
+            long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
+            return newValueWriter(blocks, len).write(writer);
         }
 
-        return writeValueRecord(data.length, writeList(blockIds));
-    }
-
-    public SegmentBlob writeBlob(Blob blob) throws IOException {
-        if (blob instanceof SegmentBlob
-            && store.containsSegment(((SegmentBlob) blob).getRecordId().getSegmentId())) {
-            return (SegmentBlob) blob;
+        private RecordId writeValueRecord(int length, byte... data) throws IOException {
+            checkArgument(length < Segment.MEDIUM_LIMIT);
+            return newValueWriter(length, data).write(writer);
         }
 
-        String reference = blob.getReference();
-        if (reference != null && store.getBlobStore() != null) {
-            String blobId = store.getBlobStore().getBlobId(reference);
-            if (blobId != null) {
-                RecordId id = writeBlobId(blobId);
-                return new SegmentBlob(id);
-            } else {
-                LOG.debug("No blob found for reference {}, inlining...", reference);
+        /**
+         * Writes a string value record.
+         *
+         * @param string string to be written
+         * @return value record identifier
+         */
+        private RecordId writeString(String string) throws IOException {
+            RecordId id = stringCache.get(string);
+            if (id != null) {
+                return id; // shortcut if the same string was recently stored
             }
-        }
 
-        return writeStream(blob.getNewStream());
-    }
+            byte[] data = string.getBytes(UTF_8);
 
-    /**
-     * Write a reference to an external blob. This method handles blob IDs of
-     * every length, but behaves differently for small and large blob IDs.
-     *
-     * @param blobId Blob ID.
-     * @return Record ID pointing to the written blob ID.
-     * @see Segment#BLOB_ID_SMALL_LIMIT
-     */
-    private RecordId writeBlobId(String blobId) throws IOException {
-        byte[] data = blobId.getBytes(UTF_8);
-        if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
-            return writeRecord(newBlobIdWriter(data));
-        } else {
-            return writeRecord(newBlobIdWriter(writeString(blobId)));
-        }
-    }
-
-    /**
-     * Writes a block record containing the given block of bytes.
-     *
-     * @param bytes source buffer
-     * @param offset offset within the source buffer
-     * @param length number of bytes to write
-     * @return block record identifier
-     */
-    RecordId writeBlock(byte[] bytes, int offset, int length) throws IOException {
-        checkNotNull(bytes);
-        checkPositionIndexes(offset, offset + length, bytes.length);
-        return writeRecord(newBlockWriter(bytes, offset, length));
-    }
+            if (data.length < Segment.MEDIUM_LIMIT) {
+                // only cache short strings to avoid excessive memory use
+                id = writeValueRecord(data.length, data);
+                stringCache.put(string, id);
+                return id;
+            }
 
-    SegmentBlob writeExternalBlob(String blobId) throws IOException {
-        RecordId id = writeBlobId(blobId);
-        return new SegmentBlob(id);
-    }
+            int pos = 0;
+            List<RecordId> blockIds = newArrayListWithExpectedSize(
+                data.length / BLOCK_SIZE + 1);
 
-    SegmentBlob writeLargeBlob(long length, List<RecordId> list) throws IOException {
-        RecordId id = writeValueRecord(length, writeList(list));
-        return new SegmentBlob(id);
-    }
+            // write as many full bulk segments as possible
+            while (pos + MAX_SEGMENT_SIZE <= data.length) {
+                SegmentId bulkId = store.getTracker().newBulkSegmentId();
+                store.writeSegment(bulkId, data, pos, MAX_SEGMENT_SIZE);
+                for (int i = 0; i < MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
+                    blockIds.add(new RecordId(bulkId, i));
+                }
+                pos += MAX_SEGMENT_SIZE;
+            }
 
-    /**
-     * Writes a stream value record. The given stream is consumed
-     * <em>and closed</em> by this method.
-     *
-     * @param stream stream to be written
-     * @return value record identifier
-     * @throws IOException if the stream could not be read
-     */
-    public SegmentBlob writeStream(InputStream stream) throws IOException {
-        boolean threw = true;
-        try {
-            RecordId id = SegmentStream.getRecordIdIfAvailable(stream, store);
-            if (id == null) {
-                id = internalWriteStream(stream);
+            // inline the remaining data as block records
+            while (pos < data.length) {
+                int len = Math.min(BLOCK_SIZE, data.length - pos);
+                blockIds.add(writeBlock(data, pos, len));
+                pos += len;
             }
-            threw = false;
-            return new SegmentBlob(id);
-        } finally {
-            close(stream, threw);
-        }
-    }
 
-    private RecordId internalWriteStream(InputStream stream)
-            throws IOException {
-        BlobStore blobStore = store.getBlobStore();
-        byte[] data = new byte[Segment.MEDIUM_LIMIT];
-        int n = read(stream, data, 0, data.length);
-
-        // Special case for short binaries (up to about 16kB):
-        // store them directly as small- or medium-sized value records
-        if (n < Segment.MEDIUM_LIMIT) {
-            return writeValueRecord(n, data);
-        } else if (blobStore != null) {
-            String blobId = blobStore.writeBlob(new SequenceInputStream(
-                    new ByteArrayInputStream(data, 0, n), stream));
-            return writeBlobId(blobId);
+            return writeValueRecord(data.length, writeList(blockIds));
         }
 
-        data = Arrays.copyOf(data, MAX_SEGMENT_SIZE);
-        n += read(stream, data, n, MAX_SEGMENT_SIZE - n);
-        long length = n;
-        List<RecordId> blockIds =
-                newArrayListWithExpectedSize(2 * n / BLOCK_SIZE);
+        private boolean hasSegment(Blob blob) {
+            return (blob instanceof SegmentBlob)
+                    && store.containsSegment(((Record) blob).getRecordId().getSegmentId());
+        }
 
-        // Write the data to bulk segments and collect the list of block ids
-        while (n != 0) {
-            SegmentId bulkId = store.getTracker().newBulkSegmentId();
-            int len = align(n, 1 << Segment.RECORD_ALIGN_BITS);
-            LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
-            store.writeSegment(bulkId, data, 0, len);
+        private SegmentBlob writeBlob(Blob blob) throws IOException {
+            if (hasSegment(blob)) {
+                SegmentBlob segmentBlob = (SegmentBlob) blob;
+                if (!isOldGen(segmentBlob.getRecordId())) {
+                    return segmentBlob;
+                }
+            }
 
-            for (int i = 0; i < n; i += BLOCK_SIZE) {
-                blockIds.add(new RecordId(bulkId, data.length - len + i));
+            String reference = blob.getReference();
+            if (reference != null && store.getBlobStore() != null) {
+                String blobId = store.getBlobStore().getBlobId(reference);
+                if (blobId != null) {
+                    RecordId id = writeBlobId(blobId);
+                    return new SegmentBlob(id);
+                } else {
+                    LOG.debug("No blob found for reference {}, inlining...", reference);
+                }
             }
 
-            n = read(stream, data, 0, data.length);
-            length += n;
+            return writeStream(blob.getNewStream());
         }
 
-        return writeValueRecord(length, writeList(blockIds));
-    }
+        /**
+         * Write a reference to an external blob. This method handles blob IDs of
+         * every length, but behaves differently for small and large blob IDs.
+         *
+         * @param blobId Blob ID.
+         * @return Record ID pointing to the written blob ID.
+         * @see Segment#BLOB_ID_SMALL_LIMIT
+         */
+        private RecordId writeBlobId(String blobId) throws IOException {
+            byte[] data = blobId.getBytes(UTF_8);
+            if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
+                return newBlobIdWriter(data).write(writer);
+            } else {
+                return newBlobIdWriter(writeString(blobId)).write(writer);
+            }
+        }
 
-    public RecordId writeProperty(PropertyState state) throws IOException {
-        Map<String, RecordId> previousValues = emptyMap();
-        return writeProperty(state, previousValues);
-    }
+        private RecordId writeBlock(byte[] bytes, int offset, int length) throws IOException {
+            checkNotNull(bytes);
+            checkPositionIndexes(offset, offset + length, bytes.length);
+            return newBlockWriter(bytes, offset, length).write(writer);
+        }
 
-    private RecordId writeProperty(PropertyState state, Map<String, RecordId> previousValues) throws IOException {
-        Type<?> type = state.getType();
-        int count = state.count();
+        private SegmentBlob writeExternalBlob(String blobId) throws IOException {
+            RecordId id = writeBlobId(blobId);
+            return new SegmentBlob(id);
+        }
 
-        List<RecordId> valueIds = newArrayList();
-        for (int i = 0; i < count; i++) {
-            if (type.tag() == PropertyType.BINARY) {
-                try {
-                    SegmentBlob blob =
-                            writeBlob(state.getValue(BINARY, i));
-                    valueIds.add(blob.getRecordId());
-                } catch (IOException e) {
-                    throw new IllegalStateException("Unexpected IOException", e);
-                }
-            } else {
-                String value = state.getValue(STRING, i);
-                RecordId valueId = previousValues.get(value);
-                if (valueId == null) {
-                    valueId = writeString(value);
+        private SegmentBlob writeLargeBlob(long length, List<RecordId> list) throws IOException {
+            RecordId id = writeValueRecord(length, writeList(list));
+            return new SegmentBlob(id);
+        }
+
+        private SegmentBlob writeStream(InputStream stream) throws IOException {
+            boolean threw = true;
+            try {
+                RecordId id = SegmentStream.getRecordIdIfAvailable(stream, store);
+                if (id == null || isOldGen(id)) {
+                    id = internalWriteStream(stream);
                 }
-                valueIds.add(valueId);
+                threw = false;
+                return new SegmentBlob(id);
+            } finally {
+                Closeables.close(stream, threw);
             }
         }
 
-        if (!type.isArray()) {
-            return valueIds.iterator().next();
-        } else if (count == 0) {
-            return writeRecord(newListWriter());
-        } else {
-            return writeRecord(newListWriter(count, writeList(valueIds)));
-        }
-    }
+        private RecordId internalWriteStream(InputStream stream) throws IOException {
+            BlobStore blobStore = store.getBlobStore();
+            byte[] data = new byte[Segment.MEDIUM_LIMIT];
+            int n = read(stream, data, 0, data.length);
 
-    public RecordId writeTemplate(Template template) throws IOException {
-        checkNotNull(template);
+            // Special case for short binaries (up to about 16kB):
+            // store them directly as small- or medium-sized value records
+            if (n < Segment.MEDIUM_LIMIT) {
+                return writeValueRecord(n, data);
+            }
+            if (blobStore != null) {
+                String blobId = blobStore.writeBlob(new SequenceInputStream(
+                    new ByteArrayInputStream(data, 0, n), stream));
+                return writeBlobId(blobId);
+            }
 
-        RecordId id = templateCache.get(template);
-        if (id != null) {
-            return id; // shortcut if the same template was recently stored
-        }
+            data = Arrays.copyOf(data, MAX_SEGMENT_SIZE);
+            n += read(stream, data, n, MAX_SEGMENT_SIZE - n);
+            long length = n;
+            List<RecordId> blockIds =
+                newArrayListWithExpectedSize(2 * n / BLOCK_SIZE);
 
-        Collection<RecordId> ids = newArrayList();
-        int head = 0;
+            // Write the data to bulk segments and collect the list of block ids
+            while (n != 0) {
+                SegmentId bulkId = store.getTracker().newBulkSegmentId();
+                int len = align(n, 1 << Segment.RECORD_ALIGN_BITS);
+                LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
+                store.writeSegment(bulkId, data, 0, len);
 
-        RecordId primaryId = null;
-        PropertyState primaryType = template.getPrimaryType();
-        if (primaryType != null) {
-            head |= 1 << 31;
-            primaryId = writeString(primaryType.getValue(NAME));
-            ids.add(primaryId);
-        }
+                for (int i = 0; i < n; i += BLOCK_SIZE) {
+                    blockIds.add(new RecordId(bulkId, data.length - len + i));
+                }
 
-        List<RecordId> mixinIds = null;
-        PropertyState mixinTypes = template.getMixinTypes();
-        if (mixinTypes != null) {
-            head |= 1 << 30;
-            mixinIds = newArrayList();
-            for (String mixin : mixinTypes.getValue(NAMES)) {
-                mixinIds.add(writeString(mixin));
+                n = read(stream, data, 0, data.length);
+                length += n;
             }
-            ids.addAll(mixinIds);
-            checkState(mixinIds.size() < (1 << 10));
-            head |= mixinIds.size() << 18;
+
+            return writeValueRecord(length, writeList(blockIds));
         }
 
-        RecordId childNameId = null;
-        String childName = template.getChildName();
-        if (childName == Template.ZERO_CHILD_NODES) {
-            head |= 1 << 29;
-        } else if (childName == Template.MANY_CHILD_NODES) {
-            head |= 1 << 28;
-        } else {
-            childNameId = writeString(childName);
-            ids.add(childNameId);
+        private RecordId writeProperty(PropertyState state) throws IOException {
+            Map<String, RecordId> previousValues = emptyMap();
+            return writeProperty(state, previousValues);
         }
 
-        PropertyTemplate[] properties = template.getPropertyTemplates();
-        RecordId[] propertyNames = new RecordId[properties.length];
-        byte[] propertyTypes = new byte[properties.length];
-        for (int i = 0; i < properties.length; i++) {
-            // Note: if the property names are stored in more than 255 separate
-            // segments, this will not work.
-            propertyNames[i] = writeString(properties[i].getName());
-            Type<?> type = properties[i].getType();
-            if (type.isArray()) {
-                propertyTypes[i] = (byte) -type.tag();
-            } else {
-                propertyTypes[i] = (byte) type.tag();
+        private RecordId writeProperty(PropertyState state, Map<String, RecordId> previousValues)
+                throws IOException {
+            Type<?> type = state.getType();
+            int count = state.count();
+
+            List<RecordId> valueIds = newArrayList();
+            for (int i = 0; i < count; i++) {
+                if (type.tag() == PropertyType.BINARY) {
+                    try {
+                        SegmentBlob blob =
+                            writeBlob(state.getValue(BINARY, i));
+                        valueIds.add(blob.getRecordId());
+                    } catch (IOException e) {
+                        throw new IllegalStateException("Unexpected IOException", e);
+                    }
+                } else {
+                    String value = state.getValue(STRING, i);
+                    RecordId valueId = previousValues.get(value);
+                    if (valueId == null) {
+                        valueId = writeString(value);
+                    }
+                    valueIds.add(valueId);
+                }
             }
-        }
 
-        RecordId propNamesId = null;
-        if (version.onOrAfter(V_11)) {
-            if (propertyNames.length > 0) {
-                propNamesId = writeList(asList(propertyNames));
-                ids.add(propNamesId);
+            if (!type.isArray()) {
+                return valueIds.iterator().next();
+            } else if (count == 0) {
+                return newListWriter().write(writer);
+            } else {
+                return newListWriter(count, writeList(valueIds)).write(writer);
             }
-        } else {
-            ids.addAll(asList(propertyNames));
         }
 
-        checkState(propertyNames.length < (1 << 18));
-        head |= propertyNames.length;
+        private RecordId writeTemplate(Template template) throws IOException {
+            checkNotNull(template);
 
-        RecordId tid = writeRecord(newTemplateWriter(ids, propertyNames,
-                propertyTypes, head, primaryId, mixinIds, childNameId,
-                propNamesId, version));
-        templateCache.put(template, tid);
-        return tid;
-    }
+            RecordId id = templateCache.get(template);
+            if (id != null) {
+                return id; // shortcut if the same template was recently stored
+            }
 
-    public SegmentNodeState writeNode(NodeState state) throws IOException {
-        if (state instanceof SegmentNodeState) {
-            SegmentNodeState sns = uncompact((SegmentNodeState) state);
-            if (sns != state || store.containsSegment(
-                sns.getRecordId().getSegmentId())) {
-                return sns;
+            Collection<RecordId> ids = newArrayList();
+            int head = 0;
+
+            RecordId primaryId = null;
+            PropertyState primaryType = template.getPrimaryType();
+            if (primaryType != null) {
+                head |= 1 << 31;
+                primaryId = writeString(primaryType.getValue(NAME));
+                ids.add(primaryId);
             }
-        }
 
-        SegmentNodeState before = null;
-        Template beforeTemplate = null;
-        ModifiedNodeState after = null;
-        if (state instanceof ModifiedNodeState) {
-            after = (ModifiedNodeState) state;
-            NodeState base = after.getBaseState();
-            if (base instanceof SegmentNodeState) {
-                SegmentNodeState sns = uncompact((SegmentNodeState) base);
-                if (sns != base || store.containsSegment(
-                    sns.getRecordId().getSegmentId())) {
-                    before = sns;
-                    beforeTemplate = before.getTemplate();
+            List<RecordId> mixinIds = null;
+            PropertyState mixinTypes = template.getMixinTypes();
+            if (mixinTypes != null) {
+                head |= 1 << 30;
+                mixinIds = newArrayList();
+                for (String mixin : mixinTypes.getValue(NAMES)) {
+                    mixinIds.add(writeString(mixin));
                 }
+                ids.addAll(mixinIds);
+                checkState(mixinIds.size() < (1 << 10));
+                head |= mixinIds.size() << 18;
             }
-        }
 
-        Template template = new Template(state);
-        RecordId templateId;
-        if (before != null && template.equals(beforeTemplate)) {
-            templateId = before.getTemplateId();
-        } else {
-            templateId = writeTemplate(template);
-        }
-
-        List<RecordId> ids = newArrayList();
-        ids.add(templateId);
-
-        String childName = template.getChildName();
-        if (childName == Template.MANY_CHILD_NODES) {
-            MapRecord base;
-            Map<String, RecordId> childNodes;
-            if (before != null
-                && before.getChildNodeCount(2) > 1
-                && after.getChildNodeCount(2) > 1) {
-                base = before.getChildNodeMap();
-                childNodes = new ChildNodeCollectorDiff().diff(before, after);
-            } else {
-                base = null;
-                childNodes = newHashMap();
-                for (ChildNodeEntry entry : state.getChildNodeEntries()) {
-                    childNodes.put(
-                        entry.getName(),
-                        writeNode(entry.getNodeState()).getRecordId());
-                }
-            }
-            ids.add(writeMap(base, childNodes).getRecordId());
-        } else if (childName != Template.ZERO_CHILD_NODES) {
-            ids.add(writeNode(state.getChildNode(template.getChildName())).getRecordId());
-        }
-
-        List<RecordId> pIds = newArrayList();
-        for (PropertyTemplate pt : template.getPropertyTemplates()) {
-            String name = pt.getName();
-            PropertyState property = state.getProperty(name);
-
-            if (property instanceof SegmentPropertyState
-                && store.containsSegment(((SegmentPropertyState) property).getRecordId().getSegmentId())) {
-                pIds.add(((SegmentPropertyState) property).getRecordId());
-            } else if (before == null
-                || !store.containsSegment(before.getRecordId().getSegmentId())) {
-                pIds.add(writeProperty(property));
+            RecordId childNameId = null;
+            String childName = template.getChildName();
+            if (childName == Template.ZERO_CHILD_NODES) {
+                head |= 1 << 29;
+            } else if (childName == Template.MANY_CHILD_NODES) {
+                head |= 1 << 28;
             } else {
-                // reuse previously stored property, if possible
-                PropertyTemplate bt = beforeTemplate.getPropertyTemplate(name);
-                if (bt == null) {
-                    pIds.add(writeProperty(property)); // new property
+                childNameId = writeString(childName);
+                ids.add(childNameId);
+            }
+
+            PropertyTemplate[] properties = template.getPropertyTemplates();
+            RecordId[] propertyNames = new RecordId[properties.length];
+            byte[] propertyTypes = new byte[properties.length];
+            for (int i = 0; i < properties.length; i++) {
+                // Note: if the property names are stored in more than 255 separate
+                // segments, this will not work.
+                propertyNames[i] = writeString(properties[i].getName());
+                Type<?> type = properties[i].getType();
+                if (type.isArray()) {
+                    propertyTypes[i] = (byte) -type.tag();
                 } else {
-                    SegmentPropertyState bp = beforeTemplate.getProperty(
-                        before.getRecordId(), bt.getIndex());
-                    if (property.equals(bp)) {
-                        pIds.add(bp.getRecordId()); // no changes
-                    } else if (bp.isArray() && bp.getType() != BINARIES) {
-                        // reuse entries from the previous list
-                        pIds.add(writeProperty(property, bp.getValueRecords()));
-                    } else {
-                        pIds.add(writeProperty(property));
-                    }
+                    propertyTypes[i] = (byte) type.tag();
                 }
             }
-        }
 
-        if (!pIds.isEmpty()) {
+            RecordId propNamesId = null;
             if (version.onOrAfter(V_11)) {
-                ids.add(writeList(pIds));
+                if (propertyNames.length > 0) {
+                    propNamesId = writeList(asList(propertyNames));
+                    ids.add(propNamesId);
+                }
             } else {
-                ids.addAll(pIds);
+                ids.addAll(asList(propertyNames));
             }
-        }
-        return writeRecord(newNodeStateWriter(ids));
-    }
 
-    /**
-     * If the given node was compacted, return the compacted node, otherwise
-     * return the passed node. This is to avoid pointing to old nodes, if they
-     * have been compacted.
-     *
-     * @param state the node
-     * @return the compacted node (if it was compacted)
-     */
-    private SegmentNodeState uncompact(SegmentNodeState state) {
-        RecordId id = store.getTracker().getCompactionMap().get(state.getRecordId());
-        if (id != null) {
-            return new SegmentNodeState(id);
-        } else {
-            return state;
-        }
-    }
+            checkState(propertyNames.length < (1 << 18));
+            head |= propertyNames.length;
 
-    private <T> T writeRecord(RecordWriter<T> recordWriter) throws IOException {
-        SegmentBufferWriter writer = segmentBufferWriterPool.borrowWriter(currentThread());
-        try {
-            return recordWriter.write(writer);
-        } finally {
-            segmentBufferWriterPool.returnWriter(currentThread(), writer);
+            RecordId tid = newTemplateWriter(ids, propertyNames,
+                propertyTypes, head, primaryId, mixinIds, childNameId,
+                propNamesId, version).write(writer);
+            templateCache.put(template, tid);
+            return tid;
         }
-    }
 
-    private class SegmentBufferWriterPool {
-        private final Set<SegmentBufferWriter> borrowed = newHashSet();
-        private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+        // FIXME OAK-3348 defer compacted items are not in the compaction map -> performance regression
+        //        split compaction map into 1) id based equality and 2) cache (like string and template) for nodes
+        private SegmentNodeState writeNode(NodeState state) throws IOException {
+            if (state instanceof SegmentNodeState) {
+                SegmentNodeState sns = uncompact((SegmentNodeState) state);
+                if (sns != state || hasSegment(sns)) {
+                    if (!isOldGen(sns.getRecordId())) {
+                        return sns;
+                    }
+                }
+            }
 
-        private short writerId = -1;
+            SegmentNodeState before = null;
+            Template beforeTemplate = null;
+            ModifiedNodeState after = null;
+            if (state instanceof ModifiedNodeState) {
+                after = (ModifiedNodeState) state;
+                NodeState base = after.getBaseState();
+                if (base instanceof SegmentNodeState) {
+                    SegmentNodeState sns = uncompact((SegmentNodeState) base);
+                    if (sns != base || hasSegment(sns)) {
+                        if (!isOldGen(sns.getRecordId())) {
+                            before = sns;
+                            beforeTemplate = before.getTemplate();
+                        }
+                    }
+                }
+            }
 
-        public void flush() throws IOException {
-            List<SegmentBufferWriter> toFlush = newArrayList();
-            synchronized (this) {
-                toFlush.addAll(writers.values());
-                writers.clear();
-                borrowed.clear();
+            Template template = new Template(state);
+            RecordId templateId;
+            if (template.equals(beforeTemplate)) {
+                templateId = before.getTemplateId();
+            } else {
+                templateId = writeTemplate(template);
             }
-            // Call flush from outside a synchronized context to avoid
-            // deadlocks of that method calling SegmentStore.writeSegment
-            for (SegmentBufferWriter writer : toFlush) {
-                writer.flush();
+
+            List<RecordId> ids = newArrayList();
+            ids.add(templateId);
+
+            String childName = template.getChildName();
+            if (childName == Template.MANY_CHILD_NODES) {
+                MapRecord base;
+                Map<String, RecordId> childNodes;
+                if (before != null
+                    && before.getChildNodeCount(2) > 1
+                    && after.getChildNodeCount(2) > 1) {
+                    base = before.getChildNodeMap();
+                    childNodes = new ChildNodeCollectorDiff().diff(before, after);
+                } else {
+                    base = null;
+                    childNodes = newHashMap();
+                    for (ChildNodeEntry entry : state.getChildNodeEntries()) {
+                        childNodes.put(
+                            entry.getName(),
+                            writeNode(entry.getNodeState()).getRecordId());
+                    }
+                }
+                ids.add(writeMap(base, childNodes).getRecordId());
+            } else if (childName != Template.ZERO_CHILD_NODES) {
+                ids.add(writeNode(state.getChildNode(template.getChildName())).getRecordId());
+            }
+
+            List<RecordId> pIds = newArrayList();
+            for (PropertyTemplate pt : template.getPropertyTemplates()) {
+                String name = pt.getName();
+                PropertyState property = state.getProperty(name);
+
+                if (hasSegment(property)) {
+                    RecordId pid = ((Record) property).getRecordId();
+                    if (isOldGen(pid)) {
+                        pIds.add(writeProperty(property));
+                    } else {
+                        pIds.add(pid);
+                    }
+                } else if (before == null || !hasSegment(before)) {
+                    pIds.add(writeProperty(property));
+                } else {
+                    // reuse previously stored property, if possible
+                    PropertyTemplate bt = beforeTemplate.getPropertyTemplate(name);
+                    if (bt == null) {
+                        pIds.add(writeProperty(property)); // new property
+                    } else {
+                        SegmentPropertyState bp = beforeTemplate.getProperty(before.getRecordId(), bt.getIndex());
+                        if (property.equals(bp)) {
+                            pIds.add(bp.getRecordId()); // no changes
+                        } else if (bp.isArray() && bp.getType() != BINARIES) {
+                            // reuse entries from the previous list
+                            pIds.add(writeProperty(property, bp.getValueRecords()));
+                        } else {
+                            pIds.add(writeProperty(property));
+                        }
+                    }
+                }
             }
-        }
 
-        public synchronized SegmentBufferWriter borrowWriter(Object key) throws IOException {
-            SegmentBufferWriter writer = writers.remove(key);
-            if (writer == null) {
-                writer = new SegmentBufferWriter(store, version, wid + "." + getWriterId());
+            if (!pIds.isEmpty()) {
+                if (version.onOrAfter(V_11)) {
+                    ids.add(writeList(pIds));
+                } else {
+                    ids.addAll(pIds);
+                }
             }
-            borrowed.add(writer);
-            return writer;
+            return newNodeStateWriter(ids).write(writer);
         }
 
-        public void returnWriter(Object key, SegmentBufferWriter writer) throws IOException {
-            if (!tryReturn(key, writer)) {
-                // Delayed flush this writer as it was borrowed while flush() was called.
-                writer.flush();
-            }
+        private boolean hasSegment(SegmentNodeState node) {
+            return store.containsSegment(node.getRecordId().getSegmentId());
         }
 
-        private synchronized boolean tryReturn(Object key, SegmentBufferWriter writer) {
-            if (borrowed.remove(writer)) {
-                writers.put(key, writer);
-                return true;
-            } else {
-                return false;
-            }
+        private boolean hasSegment(PropertyState property) {
+            return (property instanceof SegmentPropertyState)
+                && store.containsSegment(((Record) property).getRecordId().getSegmentId());
         }
 
-        private synchronized String getWriterId() {
-            if (++writerId > 9999) {
-                writerId = 0;
-            }
-            // Manually padding seems to be fastest here
-            if (writerId < 10) {
-                return "000" + writerId;
-            } else if (writerId < 100) {
-                return "00" + writerId;
-            } else if (writerId < 1000) {
-                return "0" + writerId;
+        /**
+         * If the given node was compacted, return the compacted node, otherwise
+         * return the passed node. This is to avoid pointing to old nodes, if they
+         * have been compacted.
+         *
+         * @param state the node
+         * @return the compacted node (if it was compacted)
+         */
+        private SegmentNodeState uncompact(SegmentNodeState state) {
+            RecordId id = store.getTracker().getCompactionMap().get(state.getRecordId());
+            if (id != null) {
+                return new SegmentNodeState(id);
             } else {
-                return valueOf(writerId);
+                return state;
             }
         }
-    }
 
-    private class ChildNodeCollectorDiff extends DefaultNodeStateDiff {
-        private final Map<String, RecordId> childNodes = newHashMap();
-        private IOException exception;
+        private boolean isOldGen(RecordId id) {
+            int thatGen = id.getSegment().getGcGen();
+            int thisGen = writer.getGeneration();
+            return thatGen < thisGen;
+        }
 
-        @Override
-        public boolean childNodeAdded(String name, NodeState after) {
-            try {
-                childNodes.put(name, writeNode(after).getRecordId());
-            } catch (IOException e) {
-                exception = e;
-                return false;
+        private class ChildNodeCollectorDiff extends DefaultNodeStateDiff {
+            private final Map<String, RecordId> childNodes = newHashMap();
+            private IOException exception;
+
+            public Map<String, RecordId> diff(SegmentNodeState before, ModifiedNodeState after) throws IOException {
+                after.compareAgainstBaseState(before, this);
+                if (exception != null) {
+                    throw new IOException(exception);
+                } else {
+                    return childNodes;
+                }
             }
-            return true;
-        }
 
-        @Override
-        public boolean childNodeChanged(
-            String name, NodeState before, NodeState after) {
-            try {
-                childNodes.put(name, writeNode(after).getRecordId());
-            } catch (IOException e) {
-                exception = e;
-                return false;
+            @Override
+            public boolean childNodeAdded(String name, NodeState after) {
+                try {
+                    childNodes.put(name, writeNode(after).getRecordId());
+                } catch (IOException e) {
+                    exception = e;
+                    return false;
+                }
+                return true;
             }
-            return true;
-        }
 
-        @Override
-        public boolean childNodeDeleted(String name, NodeState before) {
-            childNodes.put(name, null);
-            return true;
-        }
+            @Override
+            public boolean childNodeChanged(
+                String name, NodeState before, NodeState after) {
+                try {
+                    childNodes.put(name, writeNode(after).getRecordId());
+                } catch (IOException e) {
+                    exception = e;
+                    return false;
+                }
+                return true;
+            }
 
-        public Map<String, RecordId> diff(SegmentNodeState before, ModifiedNodeState after) throws IOException {
-            after.compareAgainstBaseState(before, this);
-            if (exception != null) {
-                throw new IOException(exception);
-            } else {
-                return childNodes;
+            @Override
+            public boolean childNodeDeleted(String name, NodeState before) {
+                childNodes.put(name, null);
+                return true;
             }
         }
+
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Apr 20 10:03:32 2016
@@ -1484,7 +1484,6 @@ public class FileStore implements Segmen
                 // in an attempt to prevent new references to old pre-compacted
                 // content. TODO: There should be a cleaner way to do this. (implement GCMonitor!?)
                 tracker.getWriter().dropCache();
-                tracker.getWriter().flush();
 
                 CompactionMap cm = tracker.getCompactionMap();
                 gcMonitor.compacted(cm.getSegmentCounts(), cm.getRecordCounts(), cm.getEstimatedWeights());

Modified: jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java Wed Apr 20 10:03:32 2016
@@ -67,6 +67,8 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -89,7 +91,7 @@ public class CompactionAndCleanupIT {
     public static void assumptions() {
         assumeTrue(getFixtures().contains(SEGMENT_MK));
     }
-    
+
     @Test
     public void compactionNoBinaryClone() throws Exception {
         // 2MB data, 5MB blob
@@ -427,7 +429,6 @@ public class CompactionAndCleanupIT {
      * Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
      */
     @Test
-    @Ignore("OAK-3348")  // FIXME OAK-3348
     public void preCompactionReferences() throws IOException, CommitFailedException, InterruptedException {
         for (String ref : new String[] {"merge-before-compact", "merge-after-compact"}) {
             File repoDir = new File(getFileStoreFolder(), ref);

Modified: jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java Wed Apr 20 10:03:32 2016
@@ -22,6 +22,7 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Strings.repeat;
 import static com.google.common.collect.Lists.newArrayListWithCapacity;
 import static com.google.common.collect.Maps.newHashMap;
+import static java.io.File.createTempFile;
 import static junitx.framework.ComparableAssert.assertEquals;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
 import static org.apache.jackrabbit.oak.api.Type.LONGS;
@@ -38,10 +39,8 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.segment.TestUtils.newRecordId;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +56,9 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentParser.MapInfo;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentParser.NodeInfo;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentParser.ValueInfo;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -67,6 +68,7 @@ import org.junit.runners.Parameterized;
 public class SegmentParserTest {
     private final SegmentVersion segmentVersion;
 
+    private File directory;
     private SegmentStore store;
     private SegmentWriter writer;
 
@@ -152,13 +154,21 @@ public class SegmentParserTest {
     }
 
     @Before
-    public void setup() {
-        store = mock(SegmentStore.class, withSettings().stubOnly());
-        SegmentTracker tracker = new SegmentTracker(store);
-        when(store.getTracker()).thenReturn(tracker);
+    public void setup() throws IOException {
+        directory = createTempFile(getClass().getSimpleName(), "dir", new File("target"));
+        directory.delete();
+        directory.mkdir();
+
+        store = FileStore.builder(directory).build();
         writer = new SegmentWriter(store, segmentVersion, "");
     }
 
+    @After
+    public void tearDown() {
+        store.close();
+        directory.delete();
+    }
+
     @Test
     public void emptyNode() throws IOException {
         SegmentNodeState node = writer.writeNode(EMPTY_NODE);
@@ -293,10 +303,10 @@ public class SegmentParserTest {
         assertEquals(456, size.get());
     }
 
-    private Map<String, RecordId> createMap(int size, Random rnd) {
+    private Map<String, RecordId> createMap(int size, Random rnd) throws IOException {
         Map<String, RecordId> map = newHashMap();
         for (int k = 0; k < size; k++) {
-            map.put("k" + k, newRecordId(store.getTracker(), rnd));
+            map.put("k" + k, writer.writeString("string" + rnd.nextLong()));
         }
         return map;
     }
@@ -443,7 +453,7 @@ public class SegmentParserTest {
         Random rnd = new Random();
         List<RecordId> list = newArrayListWithCapacity(count);
         for (int k = 0; k < count; k++) {
-            list.add(newRecordId(store.getTracker(), rnd));
+            list.add(writer.writeString("string " + rnd.nextLong()));
         }
         RecordId listId = writer.writeList(list);
         ListInfo listInfo = new TestParser("nonEmptyList"){