You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/09 17:23:49 UTC

svn commit: r1549605 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java

Author: jukka
Date: Mon Dec  9 16:23:48 2013
New Revision: 1549605

URL: http://svn.apache.org/r1549605
Log:
OAK-593: Segment-based MK

Adjust synchronization in SegmentWriter.flush() to avoid a potential deadlock with the FileStore flush thread.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1549605&r1=1549604&r2=1549605&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Mon Dec  9 16:23:48 2013
@@ -139,7 +139,7 @@ public class SegmentWriter {
                 new Segment(store, newBulkSegmentId(), ByteBuffer.allocate(0));
     }
 
-    private void writeSegmentHeader(ByteBuffer b) {
+    private synchronized void writeSegmentHeader(ByteBuffer b) {
         int p = b.position();
 
         b.put((byte) refids.size());
@@ -184,16 +184,34 @@ public class SegmentWriter {
         return dummySegment;
     }
 
-    public synchronized void flush() {
-        if (length > 0) {
-            length += align(3 + roots.size() * 3) + refids.size() * 16;
-
-            ByteBuffer b = ByteBuffer.wrap(
-                    buffer, buffer.length - length, length);
-            writeSegmentHeader(b);
+    /**
+     * Flushes the current segment to the underlying store, and starts
+     * writing a new segment. This method is carefully synchronized to avoid
+     * a deadlock with the flush thread of the underlying segment store.
+     * The internal variables ({@link #buffer}, etc.) of this class are
+     * read and updated within a synchronization block, but the actual
+     * writing of the segment is done outside of that block to allow
+     * concurrent writing of transient changes and to avoid a deadlock with
+     * the flush thread that calls this method when already holding the
+     * lock on the underlying segment store.
+     */
+    public void flush() {
+        int n;
+        byte[] b;
+        UUID id;
+
+        synchronized (this) {
+            if (length == 0) {
+                return; // do nothing if there is no content to flush
+            }
 
-            store.writeSegment(uuid, buffer, buffer.length - length, length);
+            // prepare the segment to be written
+            n = align(3 + roots.size() * 3) + refids.size() * 16 + length;
+            b = buffer;
+            id = uuid;
+            writeSegmentHeader(ByteBuffer.wrap(b, b.length - n, n));
 
+            // prepare a new segment for use by any new transient changes
             uuid = newDataSegmentId();
             buffer = new byte[MAX_SEGMENT_SIZE];
             refids.clear();
@@ -202,13 +220,27 @@ public class SegmentWriter {
             position = buffer.length;
             currentSegment = null;
         }
+
+        // must be outside the synchronization block to avoid a deadlock
+        store.writeSegment(id, b, b.length - n, n);
     }
 
     private RecordId prepare(RecordType type, int size) {
         return prepare(type, size, Collections.<RecordId>emptyList());
     }
 
-    private RecordId prepare(
+    /**
+     * Prepares the space for the record described in the given arguments.
+     * This method should be called as the first thing within a synchronization
+     * block that then proceeds to write the prepared record before leaving
+     * the synchronization.
+     *
+     * @param type type of the record
+     * @param size number of bytes in the record, excluding the record ids
+     * @param ids ids of the other records referenced by the record
+     * @return id of the record being created
+     */
+    private synchronized RecordId prepare(
             RecordType type, int size, Collection<RecordId> ids) {
         checkArgument(size >= 0);
         checkNotNull(ids);