You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/10 16:58:29 UTC
svn commit: r1549878 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Author: jukka
Date: Tue Dec 10 15:58:29 2013
New Revision: 1549878
URL: http://svn.apache.org/r1549878
Log:
OAK-593: Segment-based MK
Avoid deadlock between SegmentWriter and the FileStore flush thread
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1549878&r1=1549877&r2=1549878&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Tue Dec 10 15:58:29 2013
@@ -77,7 +77,7 @@ public class FileStore extends AbstractS
* The persisted head of the root journal, used to determine whether the
* latest {@link #head} value should be written to the disk.
*/
- private RecordId persistedHead = null;
+ private final AtomicReference<RecordId> persistedHead;
/**
* The background flush thread. Automatically flushes the TarMK state
@@ -132,22 +132,26 @@ public class FileStore extends AbstractS
journalFile = new RandomAccessFile(
new File(directory, JOURNAL_FILE_NAME), "rw");
+
+ RecordId id = null;
String line = journalFile.readLine();
while (line != null) {
int space = line.indexOf(' ');
if (space != -1) {
- persistedHead = RecordId.fromString(line.substring(0, space));
+ id = RecordId.fromString(line.substring(0, space));
}
line = journalFile.readLine();
}
- if (persistedHead != null) {
- head = new AtomicReference<RecordId>(persistedHead);
+ if (id != null) {
+ head = new AtomicReference<RecordId>(id);
+ persistedHead = new AtomicReference<RecordId>(id);
} else {
NodeBuilder builder = EMPTY_NODE.builder();
builder.setChildNode("root", initial);
head = new AtomicReference<RecordId>(
getWriter().writeNode(builder.getNodeState()).getRecordId());
+ persistedHead = new AtomicReference<RecordId>(null);
}
this.flushThread = new Thread(new Runnable() {
@@ -175,19 +179,27 @@ public class FileStore extends AbstractS
flushThread.start();
}
- public synchronized void flush() throws IOException {
- RecordId id = head.get();
- if (!id.equals(persistedHead)) {
- getWriter().flush();
- for (TarFile file : bulkFiles) {
- file.flush();
- }
- for (TarFile file : dataFiles) {
- file.flush();
- }
- journalFile.writeBytes(id + " root\n");
- journalFile.getChannel().force(false);
- persistedHead = id;
+ public void flush() throws IOException {
+ synchronized (persistedHead) {
+ RecordId before = persistedHead.get();
+ RecordId after = head.get();
+ if (!after.equals(before)) {
+ // needs to happen outside the synchronization block below to
+ // avoid a deadlock with another thread flushing the writer
+ getWriter().flush();
+
+ synchronized (this) {
+ for (TarFile file : bulkFiles) {
+ file.flush();
+ }
+ for (TarFile file : dataFiles) {
+ file.flush();
+ }
+ journalFile.writeBytes(after + " root\n");
+ journalFile.getChannel().force(false);
+ persistedHead.set(after);
+ }
+ }
}
}