You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/12/10 16:58:29 UTC

svn commit: r1549878 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Author: jukka
Date: Tue Dec 10 15:58:29 2013
New Revision: 1549878

URL: http://svn.apache.org/r1549878
Log:
OAK-593: Segment-based MK

Avoid deadlock between SegmentWriter and the FileStore flush thread

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1549878&r1=1549877&r2=1549878&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Tue Dec 10 15:58:29 2013
@@ -77,7 +77,7 @@ public class FileStore extends AbstractS
      * The persisted head of the root journal, used to determine whether the
      * latest {@link #head} value should be written to the disk.
      */
-    private RecordId persistedHead = null;
+    private final AtomicReference<RecordId> persistedHead;
 
     /**
      * The background flush thread. Automatically flushes the TarMK state
@@ -132,22 +132,26 @@ public class FileStore extends AbstractS
 
         journalFile = new RandomAccessFile(
                 new File(directory, JOURNAL_FILE_NAME), "rw");
+
+        RecordId id = null;
         String line = journalFile.readLine();
         while (line != null) {
             int space = line.indexOf(' ');
             if (space != -1) {
-                persistedHead = RecordId.fromString(line.substring(0, space));
+                id = RecordId.fromString(line.substring(0, space));
             }
             line = journalFile.readLine();
         }
 
-        if (persistedHead != null) {
-            head = new AtomicReference<RecordId>(persistedHead);
+        if (id != null) {
+            head = new AtomicReference<RecordId>(id);
+            persistedHead = new AtomicReference<RecordId>(id);
         } else {
             NodeBuilder builder = EMPTY_NODE.builder();
             builder.setChildNode("root", initial);
             head = new AtomicReference<RecordId>(
                     getWriter().writeNode(builder.getNodeState()).getRecordId());
+            persistedHead = new AtomicReference<RecordId>(null);
         }
 
         this.flushThread = new Thread(new Runnable() {
@@ -175,19 +179,27 @@ public class FileStore extends AbstractS
         flushThread.start();
     }
 
-    public synchronized void flush() throws IOException {
-        RecordId id = head.get();
-        if (!id.equals(persistedHead)) {
-            getWriter().flush();
-            for (TarFile file : bulkFiles) {
-                file.flush();
-            }
-            for (TarFile file : dataFiles) {
-                file.flush();
-            }
-            journalFile.writeBytes(id + " root\n");
-            journalFile.getChannel().force(false);
-            persistedHead = id;
+    public void flush() throws IOException {
+        synchronized (persistedHead) {
+            RecordId before = persistedHead.get();
+            RecordId after = head.get();
+            if (!after.equals(before)) {
+                // needs to happen outside the synchronization block below to
+                // avoid a deadlock with another thread flushing the writer
+                getWriter().flush();
+
+                synchronized (this) {
+                    for (TarFile file : bulkFiles) {
+                        file.flush();
+                    }
+                    for (TarFile file : dataFiles) {
+                        file.flush();
+                    }
+                    journalFile.writeBytes(after + " root\n");
+                    journalFile.getChannel().force(false);
+                    persistedHead.set(after);
+                }
+            }
         }
     }