You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/10/07 09:02:41 UTC

svn commit: r1707189 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: mduerig
Date: Wed Oct  7 07:02:41 2015
New Revision: 1707189

URL: http://svn.apache.org/viewvc?rev=1707189&view=rev
Log:
OAK-3235: Deadlock when closing a concurrently used FileStore
Release lock before writing segment to the segment store

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1707189&r1=1707188&r2=1707189&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Oct  7 07:02:41 2015
@@ -225,6 +225,7 @@ public class Segment {
         this.refids = new SegmentId[SEGMENT_REFERENCE_LIMIT + 1];
         this.refids[0] = id;
         this.version = SegmentVersion.fromByte(buffer[3]);
+        this.id.setSegment(this);
     }
 
     SegmentVersion getSegmentVersion() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1707189&r1=1707188&r2=1707189&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Oct  7 07:02:41 2015
@@ -168,7 +168,6 @@ public class SegmentWriter {
         this.version = version;
         this.buffer = createNewBuffer(version);
         this.segment = new Segment(tracker, buffer);
-        segment.getSegmentId().setSegment(segment);
     }
 
     /**
@@ -176,75 +175,97 @@ public class SegmentWriter {
      * store. This is done automatically (called from prepare) when there is not
      * enough space for a record. It can also be called explicitly.
      */
-    public synchronized void flush() {
-        if (length > 0) {
-            int refcount = segment.getRefCount();
-
-            int rootcount = roots.size();
-            buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8);
-            buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount;
-
-            int blobrefcount = blobrefs.size();
-            buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8);
-            buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount;
-
-            length = align(
-                    refcount * 16 + rootcount * 3 + blobrefcount * 2 + length,
-                    16);
-
-            checkState(length <= buffer.length);
-
-            int pos = refcount * 16;
-            if (pos + length <= buffer.length) {
-                // the whole segment fits to the space *after* the referenced
-                // segment identifiers we've already written, so we can safely
-                // copy those bits ahead even if concurrent code is still
-                // reading from that part of the buffer
-                System.arraycopy(buffer, 0, buffer, buffer.length-length, pos);
-                pos += buffer.length - length;
-            } else {
-                // this might leave some empty space between the header and
-                // the record data, but this case only occurs when the
-                // segment is >252kB in size and the maximum overhead is <<4kB,
-                // which is acceptable
-                length = buffer.length;
-            }
-
-            for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) {
-                int offset = entry.getKey().getOffset();
-                buffer[pos++] = (byte) entry.getValue().ordinal();
-                buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
-                buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
-            }
-
-            for (RecordId blobref : blobrefs) {
-                int offset = blobref.getOffset();
-                buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
-                buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+    public void flush() {
+        // Id of the segment to be written in the file store. If the segment id
+        // is not null, a segment will be written outside of the synchronized block.
+        SegmentId segmentId = null;
+
+        // Buffer containing segment data, and offset and length to locate the
+        // segment data into the buffer. These variable will be initialized in
+        // the synchronized block.
+        byte[] segmentBuffer = null;
+        int segmentOffset = 0;
+        int segmentLength = 0;
+
+        synchronized (this) {
+            if (length > 0) {
+                int refcount = segment.getRefCount();
+
+                int rootcount = roots.size();
+                buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8);
+                buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount;
+
+                int blobrefcount = blobrefs.size();
+                buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8);
+                buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount;
+
+                length = align(
+                        refcount * 16 + rootcount * 3 + blobrefcount * 2 + length,
+                        16);
+
+                checkState(length <= buffer.length);
+
+                int pos = refcount * 16;
+                if (pos + length <= buffer.length) {
+                    // the whole segment fits to the space *after* the referenced
+                    // segment identifiers we've already written, so we can safely
+                    // copy those bits ahead even if concurrent code is still
+                    // reading from that part of the buffer
+                    System.arraycopy(buffer, 0, buffer, buffer.length - length, pos);
+                    pos += buffer.length - length;
+                } else {
+                    // this might leave some empty space between the header and
+                    // the record data, but this case only occurs when the
+                    // segment is >252kB in size and the maximum overhead is <<4kB,
+                    // which is acceptable
+                    length = buffer.length;
+                }
+
+                for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) {
+                    int offset = entry.getKey().getOffset();
+                    buffer[pos++] = (byte) entry.getValue().ordinal();
+                    buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
+                    buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+                }
+
+                for (RecordId blobref : blobrefs) {
+                    int offset = blobref.getOffset();
+                    buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
+                    buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+                }
+
+                segmentId = segment.getSegmentId();
+                segmentBuffer = buffer;
+                segmentOffset = buffer.length - length;
+                segmentLength = length;
+
+                buffer = createNewBuffer(version);
+                roots.clear();
+                blobrefs.clear();
+                length = 0;
+                position = buffer.length;
+                segment = new Segment(tracker, buffer);
             }
+        }
 
-            SegmentId id = segment.getSegmentId();
-            log.debug("Writing data segment {} ({} bytes)", id, length);
-            store.writeSegment(id, buffer, buffer.length - length, length);
+        if (segmentId != null) {
+            log.debug("Writing data segment {} ({} bytes)", segmentId, segmentLength);
+            store.writeSegment(segmentId, segmentBuffer, segmentOffset, segmentLength);
 
             // Keep this segment in memory as it's likely to be accessed soon
             ByteBuffer data;
-            if (buffer.length - length > 4096) {
-                data = ByteBuffer.allocate(length);
-                data.put(buffer, buffer.length - length, length);
+            if (segmentOffset > 4096) {
+                data = ByteBuffer.allocate(segmentLength);
+                data.put(segmentBuffer, segmentOffset, segmentLength);
                 data.rewind();
             } else {
-                data = ByteBuffer.wrap(buffer, buffer.length - length, length);
+                data = ByteBuffer.wrap(segmentBuffer, segmentOffset, segmentLength);
             }
-            tracker.setSegment(id, new Segment(tracker, id, data));
 
-            buffer = createNewBuffer(version);
-            roots.clear();
-            blobrefs.clear();
-            length = 0;
-            position = buffer.length;
-            segment = new Segment(tracker, buffer);
-            segment.getSegmentId().setSegment(segment);
+            // It is important to put the segment into the cache only *after* it has been
+            // written to the store since as soon as it is in the cache it becomes eligible
+            // for eviction, which might lead to SNFEs when it is not yet in the store at that point.
+            tracker.setSegment(segmentId, new Segment(tracker, segmentId, data));
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java?rev=1707189&r1=1707188&r2=1707189&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java Wed Oct  7 07:02:41 2015
@@ -99,7 +99,9 @@ public class CompactionAndCleanupIT {
 
         // really long time span, no binary cloning
 
-        FileStore fileStore = new FileStore(directory, 1);
+        FileStore fileStore = FileStore.newFileStore(directory)
+                .withMaxFileSize(1)
+                .create();
         final SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
         CompactionStrategy custom = new CompactionStrategy(false, false,
                 CLEAN_OLD, TimeUnit.HOURS.toMillis(1), (byte) 0) {