You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2018/11/20 10:41:16 UTC

svn commit: r1846986 - in /jackrabbit/oak/branches/1.8: ./ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment...

Author: mduerig
Date: Tue Nov 20 10:41:16 2018
New Revision: 1846986

URL: http://svn.apache.org/viewvc?rev=1846986&view=rev
Log:
OAK-7909: Backport and validate OAK-7867 to Oak 1.6
Merged r1845405

Modified:
    jackrabbit/oak/branches/1.8/   (props changed)
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
    jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java

Propchange: jackrabbit/oak/branches/1.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 20 10:41:16 2018
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1820660-1820661,1820729,1820734,1820859,1820861,1820878,1820888,1820947,1821027,1821130,1821140-1821141,1821178,1821237,1821240,1821249,1821258,1821325,1821358,1821361-1821362,1821370,1821375,1821393,1821477,1821487,1821516,1821617,1821663,1821665,1821668,1821681,1821847,1821975-1821983,1822121,1822201,1822207,1822527,1822642,1822723,1822808,1822850,1822934,1823135,1823163,1823169,1823172,1823655,1823669,1824196,1824198,1824253,1824255,1824896,1824962,1825065,1825362,1825381,1825442,1825448,1825466,1825470-1825471,1825475,1825523,1825525,1825561,1825619-1825621,1825651,1825654,1825992,1826079,1826090,1826096,1826216,1826237,1826338,1826516,1826532,1826551,1826560,1826638,1826640,1826730,1826833,1826932,1826957,1827423,1827472,1827486,1827816,1827977,1828349,1828439,1828502,1828529,1828948,1829527,1829534,1829546,1829569,1829587,1829665,1829854,1829864,1829978,1829985,1829987,1829998,1830019,1830048,1830160,1830171,1830197,1830209,1830239,1830347,1830748,1830911
 ,1830923,1831157-1831158,1831163,1831190,1831374,1831560,1831689,1832258,1832376,1832379,1832535,1833308,1833347,1833833,1834112,1834117,1834287,1834291,1834302,1834326,1834328,1834336,1834428,1834468,1834483,1834610,1834648-1834649,1834681,1834823,1834857-1834858,1835060,1835518,1835521,1835635,1835642,1835780,1835819,1836082,1836121,1836167-1836168,1836170-1836187,1836189-1836196,1836206,1836487,1836493,1837057,1837274,1837296,1837326,1837475,1837503,1837547,1837569,1837600,1837657,1837718,1837998,1838076,1838637,1839549,1839570,1839637,1839746,1840019,1840024,1840031,1840226,1840455,1840462,1840574,1841314,1841352,1842089,1842677,1843222,1843231,1843398,1843618,1843652,1843911,1844325,1844549,1844625,1844627,1844642,1844728,1844775,1844932,1845135,1845336,1846057
+/jackrabbit/oak/trunk:1820660-1820661,1820729,1820734,1820859,1820861,1820878,1820888,1820947,1821027,1821130,1821140-1821141,1821178,1821237,1821240,1821249,1821258,1821325,1821358,1821361-1821362,1821370,1821375,1821393,1821477,1821487,1821516,1821617,1821663,1821665,1821668,1821681,1821847,1821975-1821983,1822121,1822201,1822207,1822527,1822642,1822723,1822808,1822850,1822934,1823135,1823163,1823169,1823172,1823655,1823669,1824196,1824198,1824253,1824255,1824896,1824962,1825065,1825362,1825381,1825442,1825448,1825466,1825470-1825471,1825475,1825523,1825525,1825561,1825619-1825621,1825651,1825654,1825992,1826079,1826090,1826096,1826216,1826237,1826338,1826516,1826532,1826551,1826560,1826638,1826640,1826730,1826833,1826932,1826957,1827423,1827472,1827486,1827816,1827977,1828349,1828439,1828502,1828529,1828948,1829527,1829534,1829546,1829569,1829587,1829665,1829854,1829864,1829978,1829985,1829987,1829998,1830019,1830048,1830160,1830171,1830197,1830209,1830239,1830347,1830748,1830911
 ,1830923,1831157-1831158,1831163,1831190,1831374,1831560,1831689,1832258,1832376,1832379,1832535,1833308,1833347,1833833,1834112,1834117,1834287,1834291,1834302,1834326,1834328,1834336,1834428,1834468,1834483,1834610,1834648-1834649,1834681,1834823,1834857-1834858,1835060,1835518,1835521,1835635,1835642,1835780,1835819,1836082,1836121,1836167-1836168,1836170-1836187,1836189-1836196,1836206,1836487,1836493,1837057,1837274,1837296,1837326,1837475,1837503,1837547,1837569,1837600,1837657,1837718,1837998,1838076,1838637,1839549,1839570,1839637,1839746,1840019,1840024,1840031,1840226,1840455,1840462,1840574,1841314,1841352,1842089,1842677,1843222,1843231,1843398,1843618,1843652,1843911,1844325,1844549,1844625,1844627,1844642,1844728,1844775,1844932,1845135,1845336,1845405,1846057
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriter.java Tue Nov 20 10:41:16 2018
@@ -65,6 +65,7 @@ import org.apache.jackrabbit.oak.api.Blo
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
+import org.apache.jackrabbit.oak.segment.RecordWriters.RecordWriter;
 import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -140,97 +141,53 @@ public class DefaultSegmentWriter implem
 
     @Override
     @NotNull
-    public RecordId writeMap(@Nullable final MapRecord base,
-            @NotNull final Map<String, RecordId> changes
-    )
-            throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeMap(base, changes);
-            }
-        });
+    public RecordId writeMap(@Nullable final MapRecord base, @NotNull final Map<String, RecordId> changes)
+    throws IOException {
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeMap(base, changes);
     }
 
     @Override
     @NotNull
     public RecordId writeList(@NotNull final List<RecordId> list) throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeList(list);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeList(list);
     }
 
     @Override
     @NotNull
     public RecordId writeString(@NotNull final String string) throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeString(string);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeString(string);
     }
 
     @Override
     @NotNull
     public RecordId writeBlob(@NotNull final Blob blob) throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeBlob(blob);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeBlob(blob);
     }
 
     @Override
     @NotNull
     public RecordId writeBlock(@NotNull final byte[] bytes, final int offset, final int length)
             throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeBlock(bytes, offset, length);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeBlock(bytes, offset, length);
     }
 
     @Override
     @NotNull
     public RecordId writeStream(@NotNull final InputStream stream) throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeStream(stream);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeStream(stream);
     }
 
     @Override
     @NotNull
     public RecordId writeProperty(@NotNull final PropertyState state) throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeProperty(state);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeProperty(state);
     }
 
     @Override
@@ -239,13 +196,8 @@ public class DefaultSegmentWriter implem
             @NotNull final NodeState state,
             @Nullable final ByteBuffer stableIdBytes)
     throws IOException {
-        return writeOperationHandler.execute(new SegmentWriteOperation() {
-            @NotNull
-            @Override
-            public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException {
-                return with(writer).writeNode(state, stableIdBytes);
-            }
-        });
+        return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
+                .writeNode(state, stableIdBytes);
     }
 
     /**
@@ -254,28 +206,25 @@ public class DefaultSegmentWriter implem
      * as a separate argument (a poor mans monad). As such it is entirely
      * <em>not thread safe</em>.
      */
-    private abstract class SegmentWriteOperation implements WriteOperation {
-        private SegmentBufferWriter writer;
-
-        private Cache<String, RecordId> stringCache;
+    private class SegmentWriteOperation {
+        private final GCGeneration gcGeneration;
 
-        private Cache<Template, RecordId> templateCache;
+        private final Cache<String, RecordId> stringCache;
 
-        private Cache<String, RecordId> nodeCache;
+        private final Cache<Template, RecordId> templateCache;
 
-        @NotNull
-        @Override
-        public abstract RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException;
+        private final Cache<String, RecordId> nodeCache;
 
-        @NotNull
-        SegmentWriteOperation with(@NotNull SegmentBufferWriter writer) {
-            checkState(this.writer == null);
-            this.writer = writer;
-            int generation = writer.getGCGeneration().getGeneration();
+        SegmentWriteOperation(@NotNull GCGeneration gcGeneration) {
+            int generation = gcGeneration.getGeneration();
+            this.gcGeneration = gcGeneration;
             this.stringCache = cacheManager.getStringCache(generation);
             this.templateCache = cacheManager.getTemplateCache(generation);
             this.nodeCache = cacheManager.getNodeCache(generation);
-            return this;
+        }
+
+        private WriteOperation newWriteOperation(RecordWriter recordWriter) {
+            return writer -> recordWriter.write(writer, store);
         }
 
         private RecordId writeMap(@Nullable MapRecord base,
@@ -302,8 +251,10 @@ public class DefaultSegmentWriter implem
                         if (value.equals(entry.getValue())) {
                             return base.getRecordId();
                         } else {
-                            return RecordWriters.newMapBranchWriter(entry.getHash(), asList(entry.getKey(),
-                                    value, base.getRecordId())).write(writer, store);
+                            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                                RecordWriters.newMapBranchWriter(
+                                    entry.getHash(),
+                                    asList(entry.getKey(), value, base.getRecordId()))));
                         }
                     }
                 }
@@ -337,7 +288,8 @@ public class DefaultSegmentWriter implem
             checkElementIndex(size, MapRecord.MAX_SIZE);
             checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
             checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
-            return RecordWriters.newMapLeafWriter(level, entries).write(writer, store);
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newMapLeafWriter(level, entries)));
         }
 
         private RecordId writeMapBranch(int level, int size, MapRecord... buckets) throws IOException {
@@ -349,7 +301,8 @@ public class DefaultSegmentWriter implem
                     bucketIds.add(buckets[i].getRecordId());
                 }
             }
-            return RecordWriters.newMapBranchWriter(level, size, bitmap, bucketIds).write(writer, store);
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newMapBranchWriter(level, size, bitmap, bucketIds)));
         }
 
         private RecordId writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level)
@@ -359,7 +312,8 @@ public class DefaultSegmentWriter implem
                 if (base != null) {
                     return base.getRecordId();
                 } else if (level == 0) {
-                    return RecordWriters.newMapLeafWriter().write(writer, store);
+                    return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                        RecordWriters.newMapLeafWriter()));
                 } else {
                     return null;
                 }
@@ -468,7 +422,8 @@ public class DefaultSegmentWriter implem
 
         private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
             checkArgument(bucket.size() > 1);
-            return RecordWriters.newListBucketWriter(bucket).write(writer, store);
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newListBucketWriter(bucket)));
         }
 
         private List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
@@ -491,12 +446,14 @@ public class DefaultSegmentWriter implem
 
         private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
             long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
-            return RecordWriters.newValueWriter(blocks, len).write(writer, store);
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newValueWriter(blocks, len)));
         }
 
         private RecordId writeValueRecord(int length, byte... data) throws IOException {
             checkArgument(length < Segment.MEDIUM_LIMIT);
-            return RecordWriters.newValueWriter(length, data).write(writer, store);
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newValueWriter(length, data)));
         }
 
         /**
@@ -594,23 +551,22 @@ public class DefaultSegmentWriter implem
         private RecordId writeBlobId(String blobId) throws IOException {
             byte[] data = blobId.getBytes(UTF_8);
 
-            RecordId recordId;
-
             if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
-                recordId = RecordWriters.newBlobIdWriter(data).write(writer, store);
+                return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                    RecordWriters.newBlobIdWriter(data)));
             } else {
                 RecordId refId = writeString(blobId);
-                recordId = RecordWriters.newBlobIdWriter(refId).write(writer, store);
+                return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                    RecordWriters.newBlobIdWriter(refId)));
             }
-
-            return recordId;
         }
 
         private RecordId writeBlock(@NotNull byte[] bytes, int offset, int length)
                 throws IOException {
             checkNotNull(bytes);
             checkPositionIndexes(offset, offset + length, bytes.length);
-            return RecordWriters.newBlockWriter(bytes, offset, length).write(writer, store);
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newBlockWriter(bytes, offset, length)));
         }
 
         private RecordId writeStream(@NotNull InputStream stream) throws IOException {
@@ -710,9 +666,12 @@ public class DefaultSegmentWriter implem
             if (!type.isArray()) {
                 return valueIds.iterator().next();
             } else if (count == 0) {
-                return RecordWriters.newListWriter().write(writer, store);
+                return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                    RecordWriters.newListWriter()));
             } else {
-                return RecordWriters.newListWriter(count, writeList(valueIds)).write(writer, store);
+                RecordId lid = writeList(valueIds);
+                return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                    RecordWriters.newListWriter(count, lid)));
             }
         }
 
@@ -783,9 +742,10 @@ public class DefaultSegmentWriter implem
             checkState(propertyNames.length < (1 << 18));
             head |= propertyNames.length;
 
-            RecordId tid = RecordWriters.newTemplateWriter(ids, propertyNames,
-                    propertyTypes, head, primaryId, mixinIds, childNameId,
-                    propNamesId).write(writer, store);
+            RecordId tid = writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                RecordWriters.newTemplateWriter(
+                        ids, propertyNames, propertyTypes, head, primaryId, mixinIds,
+                        childNameId, propNamesId)));
             templateCache.put(template, tid);
             return tid;
         }
@@ -921,14 +881,18 @@ public class DefaultSegmentWriter implem
                 ids.add(writeList(pIds));
             }
 
-            RecordId stableId = null;
+            RecordId stableId;
             if (stableIdBytes != null) {
                 ByteBuffer buffer = stableIdBytes.duplicate();
                 byte[] bytes = new byte[buffer.remaining()];
                 buffer.get(bytes);
                 stableId = writeBlock(bytes, 0, bytes.length);
+            } else {
+                stableId = null;
             }
-            return newNodeStateWriter(stableId, ids).write(writer, store);
+
+            return writeOperationHandler.execute(gcGeneration, newWriteOperation(
+                newNodeStateWriter(stableId, ids)));
         }
 
         /**
@@ -988,7 +952,7 @@ public class DefaultSegmentWriter implem
         private boolean isOldGeneration(RecordId id) {
             try {
                 GCGeneration thatGen = id.getSegmentId().getGcGeneration();
-                GCGeneration thisGen = writer.getGCGeneration();
+                GCGeneration thisGen = gcGeneration;
                 if (thatGen.isCompacted()) {
                     // If the segment containing the base state is compacted it is
                     // only considered old if it is from a earlier full generation.

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java Tue Nov 20 10:41:16 2018
@@ -21,6 +21,8 @@ package org.apache.jackrabbit.oak.segmen
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
+
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import org.apache.jackrabbit.oak.segment.WriterCacheManager.Empty;
@@ -171,7 +173,15 @@ public final class DefaultSegmentWriterB
                 new WriteOperationHandler() {
                     @NotNull
                     @Override
-                    public RecordId execute(@NotNull WriteOperation writeOperation) {
+                    public GCGeneration getGCGeneration() {
+                        throw new UnsupportedOperationException("Cannot write to read-only store");
+                    }
+
+                    @NotNull
+                    @Override
+                    public RecordId execute(@NotNull GCGeneration gcGeneration,
+                                            @NotNull WriteOperation writeOperation)
+                    throws IOException {
                         throw new UnsupportedOperationException("Cannot write to read-only store");
                     }
 

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java Tue Nov 20 10:41:16 2018
@@ -156,12 +156,16 @@ public class SegmentBufferWriter impleme
 
     @NotNull
     @Override
-    public RecordId execute(@NotNull WriteOperation writeOperation) throws IOException {
+    public RecordId execute(@NotNull GCGeneration gcGeneration,
+                            @NotNull WriteOperation writeOperation)
+    throws IOException {
+        checkState(gcGeneration.equals(this.gcGeneration));
         return writeOperation.execute(this);
     }
 
+    @Override
     @NotNull
-    GCGeneration getGCGeneration() {
+    public GCGeneration getGCGeneration() {
         return gcGeneration;
     }
 

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java Tue Nov 20 10:41:16 2018
@@ -27,6 +27,7 @@ import static com.google.common.collect.
 import static java.lang.Thread.currentThread;
 
 import java.io.IOException;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,7 +40,7 @@ import org.jetbrains.annotations.NotNull
 
 /**
  * This {@link WriteOperationHandler} uses a pool of {@link SegmentBufferWriter}s,
- * which it passes to its {@link #execute(WriteOperation) execute} method.
+ * which it passes to its {@link #execute(GCGeneration, WriteOperation) execute} method.
  * <p>
  * Instances of this class are thread safe.
  */
@@ -99,12 +100,21 @@ public class SegmentBufferWriterPool imp
 
     @NotNull
     @Override
-    public RecordId execute(@NotNull WriteOperation writeOperation) throws IOException {
-        SegmentBufferWriter writer = borrowWriter(currentThread());
+    public GCGeneration getGCGeneration() {
+        return gcGeneration.get();
+    }
+
+    @NotNull
+    @Override
+    public RecordId execute(@NotNull GCGeneration gcGeneration,
+                            @NotNull WriteOperation writeOperation)
+    throws IOException {
+        SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(currentThread(), gcGeneration);
+        SegmentBufferWriter writer = borrowWriter(key);
         try {
             return writeOperation.execute(writer);
         } finally {
-            returnWriter(currentThread(), writer);
+            returnWriter(key, writer);
         }
     }
 
@@ -179,7 +189,7 @@ public class SegmentBufferWriterPool imp
             monitor.enterWhen(guard);
             return true;
         } catch (InterruptedException ignore) {
-            Thread.currentThread().interrupt();
+            currentThread().interrupt();
             return false;
         }
     }
@@ -197,14 +207,6 @@ public class SegmentBufferWriterPool imp
                 writer = new SegmentBufferWriter(
                         idProvider,
                         reader,
-                        getWriterId(wid),
-                        gcGeneration.get()
-                );
-            } else if (!writer.getGCGeneration().equals(gcGeneration.get())) {
-                disposedOldGen.add(writer);
-                writer = new SegmentBufferWriter(
-                        idProvider,
-                        reader,
                         getWriterId(wid),
                         gcGeneration.get()
                 );

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/WriteOperationHandler.java Tue Nov 20 10:41:16 2018
@@ -23,6 +23,8 @@ import java.io.IOException;
 
 import org.jetbrains.annotations.NotNull;
 
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+
 /**
  * A {@code WriteOperationHandler} executes {@link WriteOperation
  * WriteOperation}s and as such serves as a bridge between a {@link
@@ -31,6 +33,12 @@ import org.jetbrains.annotations.NotNull
 interface WriteOperationHandler {
 
     /**
+     * @return the current {@code GCGeneration} of the store.
+     */
+    @NotNull
+    GCGeneration getGCGeneration();
+
+    /**
      * A {@code WriteOperation} encapsulates an operation on a {@link
      * SegmentWriter}. Executing it performs the actual act of persisting
      * changes to a {@link SegmentBufferWriter}.
@@ -50,12 +58,14 @@ interface WriteOperationHandler {
 
     /**
      * Execute the passed {@code writeOperation} by passing it a {@link SegmentBufferWriter}.
+     * @param gcGeneration    the {@code GCGeneration} the changes should persisted with.
      * @param writeOperation  {@link WriteOperation} to execute
      * @return                {@code RecordId} that resulted from persisting the changes.
      * @throws IOException
      */
     @NotNull
-    RecordId execute(@NotNull WriteOperation writeOperation) throws IOException;
+    RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation writeOperation)
+    throws IOException;
 
     /**
      * Flush any pending changes on any {@link SegmentBufferWriter} managed by this instance.

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java Tue Nov 20 10:41:16 2018
@@ -36,7 +36,6 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.base.Suppliers;
 import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
@@ -50,11 +49,13 @@ public class SegmentBufferWriterPoolTest
 
     private final RecordId rootId = store.getRevisions().getHead();
 
+    private GCGeneration gcGeneration = GCGeneration.NULL;
+
     private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool(
             store.getSegmentIdProvider(),
             store.getReader(),
             "",
-            Suppliers.ofInstance(GCGeneration.NULL)
+            () -> gcGeneration
     );
 
     private final ExecutorService[] executors = new ExecutorService[] {
@@ -69,18 +70,19 @@ public class SegmentBufferWriterPoolTest
         }
     }
 
-    private Future<RecordId> execute(final WriteOperation op, int executor) {
+    private Future<RecordId> execute(GCGeneration gcGeneration, final WriteOperation op, int executor) {
         return executors[executor].submit(new Callable<RecordId>() {
             @Override
             public RecordId call() throws Exception {
-                return pool.execute(op);
+                return pool.execute(gcGeneration, op);
             }
         });
     }
 
     private WriteOperation createOp(final String key, final ConcurrentMap<String, SegmentBufferWriter> map) {
         return new WriteOperation() {
-            @NotNull @Override
+            @NotNull
+            @Override
             public RecordId execute(@NotNull SegmentBufferWriter writer) {
                 map.put(key, writer);
                 return rootId;
@@ -90,10 +92,11 @@ public class SegmentBufferWriterPoolTest
 
     @Test
     public void testThreadAffinity() throws IOException, ExecutionException, InterruptedException {
+        GCGeneration gen = pool.getGCGeneration();
         ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
-        Future<RecordId> res1 = execute(createOp("a", map1), 0);
-        Future<RecordId> res2 = execute(createOp("b", map1), 1);
-        Future<RecordId> res3 = execute(createOp("c", map1), 2);
+        Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
+        Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
+        Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
 
         // Give the tasks some time to complete
         sleepUninterruptibly(10, MILLISECONDS);
@@ -104,9 +107,9 @@ public class SegmentBufferWriterPoolTest
         assertEquals(3, map1.size());
 
         ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
-        Future<RecordId> res4 = execute(createOp("a", map2), 0);
-        Future<RecordId> res5 = execute(createOp("b", map2), 1);
-        Future<RecordId> res6 = execute(createOp("c", map2), 2);
+        Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
+        Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
+        Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
 
         // Give the tasks some time to complete
         sleepUninterruptibly(10, MILLISECONDS);
@@ -120,10 +123,11 @@ public class SegmentBufferWriterPoolTest
 
     @Test
     public void testFlush() throws ExecutionException, InterruptedException, IOException {
+        GCGeneration gen = pool.getGCGeneration();
         ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
-        Future<RecordId> res1 = execute(createOp("a", map1), 0);
-        Future<RecordId> res2 = execute(createOp("b", map1), 1);
-        Future<RecordId> res3 = execute(createOp("c", map1), 2);
+        Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
+        Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
+        Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
 
         // Give the tasks some time to complete
         sleepUninterruptibly(10, MILLISECONDS);
@@ -136,9 +140,9 @@ public class SegmentBufferWriterPoolTest
         pool.flush(store);
 
         ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
-        Future<RecordId> res4 = execute(createOp("a", map2), 0);
-        Future<RecordId> res5 = execute(createOp("b", map2), 1);
-        Future<RecordId> res6 = execute(createOp("c", map2), 2);
+        Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
+        Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
+        Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
 
         // Give the tasks some time to complete
         sleepUninterruptibly(10, MILLISECONDS);
@@ -151,10 +155,61 @@ public class SegmentBufferWriterPoolTest
     }
 
     @Test
+    public void testCompaction() throws ExecutionException, InterruptedException, IOException {
+        GCGeneration gen = pool.getGCGeneration();
+        ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
+        Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
+        Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
+        Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res1.get());
+        assertEquals(rootId, res2.get());
+        assertEquals(rootId, res3.get());
+        assertEquals(3, map1.size());
+
+        // Simulate compaction by increasing the global gc generation
+        gcGeneration = gcGeneration.nextFull();
+
+        // Write using previous generation
+        ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
+        Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
+        Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
+        Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res4.get());
+        assertEquals(rootId, res5.get());
+        assertEquals(rootId, res6.get());
+        assertEquals(3, map2.size());
+        assertEquals(map1, map2);
+
+        // Write using current generation
+        ConcurrentMap<String, SegmentBufferWriter> map3 = newConcurrentMap();
+        Future<RecordId> res7 = execute(gen.nextFull(), createOp("a", map3), 0);
+        Future<RecordId> res8 = execute(gen.nextFull(), createOp("b", map3), 1);
+        Future<RecordId> res9 = execute(gen.nextFull(), createOp("c", map3), 2);
+
+        // Give the tasks some time to complete
+        sleepUninterruptibly(10, MILLISECONDS);
+
+        assertEquals(rootId, res7.get());
+        assertEquals(rootId, res8.get());
+        assertEquals(rootId, res9.get());
+        assertEquals(3, map3.size());
+        assertTrue(intersection(newHashSet(map1.values()), newHashSet(map3.values())).isEmpty());
+    }
+
+    @Test
     public void testFlushBlocks() throws ExecutionException, InterruptedException {
-        Future<RecordId> res = execute(new WriteOperation() {
-            @NotNull
-            @Nullable @Override
+        GCGeneration gcGeneration = pool.getGCGeneration();
+        Future<RecordId> res = execute(gcGeneration, new WriteOperation() {
+            @Nullable
+            @Override
             public RecordId execute(@NotNull SegmentBufferWriter writer) {
                 try {
                     // This should deadlock as flush waits for this write

Modified: jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java?rev=1846986&r1=1846985&r2=1846986&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java (original)
+++ jackrabbit/oak/branches/1.8/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java Tue Nov 20 10:41:16 2018
@@ -61,7 +61,6 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.junit.Ignore;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Rule;
 import org.junit.Test;
@@ -289,7 +288,6 @@ public class FileStoreIT {
         }
     }
 
-    @Ignore("OAK-7867")
     @Test
     public void blockingBlob() throws Exception {