You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/09 17:23:49 UTC
svn commit: r1549605 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Author: jukka
Date: Mon Dec 9 16:23:48 2013
New Revision: 1549605
URL: http://svn.apache.org/r1549605
Log:
OAK-593: Segment-based MK
Adjust synchronization in SegmentWriter.flush() to avoid a potential deadlock with the FileStore flush thread.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1549605&r1=1549604&r2=1549605&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Mon Dec 9 16:23:48 2013
@@ -139,7 +139,7 @@ public class SegmentWriter {
new Segment(store, newBulkSegmentId(), ByteBuffer.allocate(0));
}
- private void writeSegmentHeader(ByteBuffer b) {
+ private synchronized void writeSegmentHeader(ByteBuffer b) {
int p = b.position();
b.put((byte) refids.size());
@@ -184,16 +184,34 @@ public class SegmentWriter {
return dummySegment;
}
- public synchronized void flush() {
- if (length > 0) {
- length += align(3 + roots.size() * 3) + refids.size() * 16;
-
- ByteBuffer b = ByteBuffer.wrap(
- buffer, buffer.length - length, length);
- writeSegmentHeader(b);
+ /**
+ * Flushes the current segment to the underlying store, and starts
+ * writing a new segment. This method is carefully synchronized to avoid
+ * a deadlock with the flush thread of the underlying segment store.
+ * The internal variables ({@link #buffer}, etc.) of this class are
+ * read and updated within a synchronization block, but the actual
+ * writing of the segment is done outside of that block to allow
+ * concurrent writing of transient changes and to avoid a deadlock with
+ * the flush thread that calls this method when already holding the
+ * lock on the underlying segment store.
+ */
+ public void flush() {
+ int n;
+ byte[] b;
+ UUID id;
+
+ synchronized (this) {
+ if (length == 0) {
+ return; // do nothing if there is no content to flush
+ }
- store.writeSegment(uuid, buffer, buffer.length - length, length);
+ // prepare the segment to be written
+ n = align(3 + roots.size() * 3) + refids.size() * 16 + length;
+ b = buffer;
+ id = uuid;
+ writeSegmentHeader(ByteBuffer.wrap(b, b.length - n, n));
+ // prepare a new segment for use by any new transient changes
uuid = newDataSegmentId();
buffer = new byte[MAX_SEGMENT_SIZE];
refids.clear();
@@ -202,13 +220,27 @@ public class SegmentWriter {
position = buffer.length;
currentSegment = null;
}
+
+ // must be outside the synchronization block to avoid a deadlock
+ store.writeSegment(id, b, b.length - n, n);
}
private RecordId prepare(RecordType type, int size) {
return prepare(type, size, Collections.<RecordId>emptyList());
}
- private RecordId prepare(
+ /**
+ * Prepares the space for the record described in the given arguments.
+ * This method should be called as the first thing within a synchronization
+ * block that then proceeds to write the prepared record before leaving
+ * the synchronization.
+ *
+ * @param type type of the record
+ * @param size number of bytes in the record, excluding the record ids
+ * @param ids ids of the other records referenced by the record
+ * @return id of the record being created
+ */
+ private synchronized RecordId prepare(
RecordType type, int size, Collection<RecordId> ids) {
checkArgument(size >= 0);
checkNotNull(ids);