You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/10/07 09:02:41 UTC
svn commit: r1707189 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/
test/java/org/apache/jackrabbit/oak/plugins/segment/
Author: mduerig
Date: Wed Oct 7 07:02:41 2015
New Revision: 1707189
URL: http://svn.apache.org/viewvc?rev=1707189&view=rev
Log:
OAK-3235: Deadlock when closing a concurrently used FileStore
Release lock before writing segment to the segment store
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1707189&r1=1707188&r2=1707189&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Oct 7 07:02:41 2015
@@ -225,6 +225,7 @@ public class Segment {
this.refids = new SegmentId[SEGMENT_REFERENCE_LIMIT + 1];
this.refids[0] = id;
this.version = SegmentVersion.fromByte(buffer[3]);
+ this.id.setSegment(this);
}
SegmentVersion getSegmentVersion() {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1707189&r1=1707188&r2=1707189&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Oct 7 07:02:41 2015
@@ -168,7 +168,6 @@ public class SegmentWriter {
this.version = version;
this.buffer = createNewBuffer(version);
this.segment = new Segment(tracker, buffer);
- segment.getSegmentId().setSegment(segment);
}
/**
@@ -176,75 +175,97 @@ public class SegmentWriter {
* store. This is done automatically (called from prepare) when there is not
* enough space for a record. It can also be called explicitly.
*/
- public synchronized void flush() {
- if (length > 0) {
- int refcount = segment.getRefCount();
-
- int rootcount = roots.size();
- buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8);
- buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount;
-
- int blobrefcount = blobrefs.size();
- buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8);
- buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount;
-
- length = align(
- refcount * 16 + rootcount * 3 + blobrefcount * 2 + length,
- 16);
-
- checkState(length <= buffer.length);
-
- int pos = refcount * 16;
- if (pos + length <= buffer.length) {
- // the whole segment fits to the space *after* the referenced
- // segment identifiers we've already written, so we can safely
- // copy those bits ahead even if concurrent code is still
- // reading from that part of the buffer
- System.arraycopy(buffer, 0, buffer, buffer.length-length, pos);
- pos += buffer.length - length;
- } else {
- // this might leave some empty space between the header and
- // the record data, but this case only occurs when the
- // segment is >252kB in size and the maximum overhead is <<4kB,
- // which is acceptable
- length = buffer.length;
- }
-
- for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) {
- int offset = entry.getKey().getOffset();
- buffer[pos++] = (byte) entry.getValue().ordinal();
- buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
- buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
- }
-
- for (RecordId blobref : blobrefs) {
- int offset = blobref.getOffset();
- buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
- buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+ public void flush() {
+ // Id of the segment to be written in the file store. If the segment id
+ // is not null, a segment will be written outside of the synchronized block.
+ SegmentId segmentId = null;
+
+ // Buffer containing segment data, and offset and length to locate the
+ // segment data into the buffer. These variable will be initialized in
+ // the synchronized block.
+ byte[] segmentBuffer = null;
+ int segmentOffset = 0;
+ int segmentLength = 0;
+
+ synchronized (this) {
+ if (length > 0) {
+ int refcount = segment.getRefCount();
+
+ int rootcount = roots.size();
+ buffer[Segment.ROOT_COUNT_OFFSET] = (byte) (rootcount >> 8);
+ buffer[Segment.ROOT_COUNT_OFFSET + 1] = (byte) rootcount;
+
+ int blobrefcount = blobrefs.size();
+ buffer[Segment.BLOBREF_COUNT_OFFSET] = (byte) (blobrefcount >> 8);
+ buffer[Segment.BLOBREF_COUNT_OFFSET + 1] = (byte) blobrefcount;
+
+ length = align(
+ refcount * 16 + rootcount * 3 + blobrefcount * 2 + length,
+ 16);
+
+ checkState(length <= buffer.length);
+
+ int pos = refcount * 16;
+ if (pos + length <= buffer.length) {
+ // the whole segment fits to the space *after* the referenced
+ // segment identifiers we've already written, so we can safely
+ // copy those bits ahead even if concurrent code is still
+ // reading from that part of the buffer
+ System.arraycopy(buffer, 0, buffer, buffer.length - length, pos);
+ pos += buffer.length - length;
+ } else {
+ // this might leave some empty space between the header and
+ // the record data, but this case only occurs when the
+ // segment is >252kB in size and the maximum overhead is <<4kB,
+ // which is acceptable
+ length = buffer.length;
+ }
+
+ for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) {
+ int offset = entry.getKey().getOffset();
+ buffer[pos++] = (byte) entry.getValue().ordinal();
+ buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
+ buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+ }
+
+ for (RecordId blobref : blobrefs) {
+ int offset = blobref.getOffset();
+ buffer[pos++] = (byte) (offset >> (8 + Segment.RECORD_ALIGN_BITS));
+ buffer[pos++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
+ }
+
+ segmentId = segment.getSegmentId();
+ segmentBuffer = buffer;
+ segmentOffset = buffer.length - length;
+ segmentLength = length;
+
+ buffer = createNewBuffer(version);
+ roots.clear();
+ blobrefs.clear();
+ length = 0;
+ position = buffer.length;
+ segment = new Segment(tracker, buffer);
}
+ }
- SegmentId id = segment.getSegmentId();
- log.debug("Writing data segment {} ({} bytes)", id, length);
- store.writeSegment(id, buffer, buffer.length - length, length);
+ if (segmentId != null) {
+ log.debug("Writing data segment {} ({} bytes)", segmentId, segmentLength);
+ store.writeSegment(segmentId, segmentBuffer, segmentOffset, segmentLength);
// Keep this segment in memory as it's likely to be accessed soon
ByteBuffer data;
- if (buffer.length - length > 4096) {
- data = ByteBuffer.allocate(length);
- data.put(buffer, buffer.length - length, length);
+ if (segmentOffset > 4096) {
+ data = ByteBuffer.allocate(segmentLength);
+ data.put(segmentBuffer, segmentOffset, segmentLength);
data.rewind();
} else {
- data = ByteBuffer.wrap(buffer, buffer.length - length, length);
+ data = ByteBuffer.wrap(segmentBuffer, segmentOffset, segmentLength);
}
- tracker.setSegment(id, new Segment(tracker, id, data));
- buffer = createNewBuffer(version);
- roots.clear();
- blobrefs.clear();
- length = 0;
- position = buffer.length;
- segment = new Segment(tracker, buffer);
- segment.getSegmentId().setSegment(segment);
+ // It is important to put the segment into the cache only *after* it has been
+ // written to the store since as soon as it is in the cache it becomes eligible
+ // for eviction, which might lead to SNFEs when it is not yet in the store at that point.
+ tracker.setSegment(segmentId, new Segment(tracker, segmentId, data));
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java?rev=1707189&r1=1707188&r2=1707189&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java Wed Oct 7 07:02:41 2015
@@ -99,7 +99,9 @@ public class CompactionAndCleanupIT {
// really long time span, no binary cloning
- FileStore fileStore = new FileStore(directory, 1);
+ FileStore fileStore = FileStore.newFileStore(directory)
+ .withMaxFileSize(1)
+ .create();
final SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
CompactionStrategy custom = new CompactionStrategy(false, false,
CLEAN_OLD, TimeUnit.HOURS.toMillis(1), (byte) 0) {