You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/10/13 16:17:28 UTC

svn commit: r1708402 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Author: mduerig
Date: Tue Oct 13 14:17:28 2015
New Revision: 1708402

URL: http://svn.apache.org/viewvc?rev=1708402&view=rev
Log:
OAK-3330: FileStore lock contention with concurrent writers
User read/write lock instead of a single lock for all operations

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1708402&r1=1708401&r2=1708402&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Tue Oct 13 14:17:28 2015
@@ -52,6 +52,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -199,6 +201,8 @@ public class FileStore implements Segmen
      */
     private volatile boolean shutdown;
 
+    private final ReadWriteLock fileStoreLock = new ReentrantReadWriteLock();
+
     /**
      * Create a new instance of a {@link Builder} for a file store.
      * @param directory  directory where the tar files are stored
@@ -666,12 +670,17 @@ public class FileStore implements Segmen
         return dataFiles;
     }
 
-    public synchronized long size() {
-        long size = writeFile.length();
-        for (TarReader reader : readers) {
-            size += reader.size();
+    public long size() {
+        fileStoreLock.readLock().lock();
+        try {
+            long size = writeFile.length();
+            for (TarReader reader : readers) {
+                size += reader.size();
+            }
+            return size;
+        } finally {
+            fileStoreLock.readLock().unlock();
         }
-        return size;
     }
 
     /**
@@ -679,15 +688,20 @@ public class FileStore implements Segmen
      *
      * @return number of segments
      */
-    private synchronized int count() {
-        int count = 0;
-        if (writer != null) {
-            count += writer.count();
-        }
-        for (TarReader reader : readers) {
-            count += reader.count();
+    private int count() {
+        fileStoreLock.readLock().lock();
+        try {
+            int count = 0;
+            if (writer != null) {
+                count += writer.count();
+            }
+            for (TarReader reader : readers) {
+                count += reader.count();
+            }
+            return count;
+        } finally {
+            fileStoreLock.readLock().unlock();
         }
-        return count;
     }
 
     /**
@@ -698,13 +712,16 @@ public class FileStore implements Segmen
      */
     CompactionGainEstimate estimateCompactionGain(Supplier<Boolean> stop) {
         CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop);
-        synchronized (this) {
+        fileStoreLock.readLock().lock();
+        try {
             for (TarReader reader : readers) {
                 reader.accept(estimate);
                 if (stop.get()) {
                     break;
                 }
             }
+        } finally {
+            fileStoreLock.readLock().unlock();
         }
         return estimate;
     }
@@ -724,11 +741,14 @@ public class FileStore implements Segmen
                 // prevent the flush from stopping concurrent reads and writes
                 writer.flush();
 
-                synchronized (this) {
+                fileStoreLock.writeLock().lock();
+                try {
                     log.debug("TarMK journal update {} -> {}", before, after);
                     journalFile.writeBytes(after.toString10() + " root\n");
                     journalFile.getChannel().force(false);
                     persistedHead.set(after);
+                } finally {
+                    fileStoreLock.writeLock().unlock();
                 }
 
                 // Needs to happen outside the synchronization block above to
@@ -773,7 +793,8 @@ public class FileStore implements Segmen
         Set<UUID> referencedIds = newHashSet();
         Map<TarReader, TarReader> cleaned = newLinkedHashMap();
 
-        synchronized (this) {
+        fileStoreLock.writeLock().lock();
+        try {
             gcMonitor.info("TarMK revision cleanup started. Current repository size {}",
                     humanReadableByteCount(initialSize));
 
@@ -791,6 +812,8 @@ public class FileStore implements Segmen
             for (TarReader reader : readers) {
                 cleaned.put(reader, reader);
             }
+        } finally {
+            fileStoreLock.writeLock().unlock();
         }
 
         // Do actual cleanup outside of the lock to prevent blocking
@@ -807,7 +830,8 @@ public class FileStore implements Segmen
         }
 
         List<TarReader> oldReaders = newArrayList();
-        synchronized (this) {
+        fileStoreLock.writeLock().lock();
+        try {
             // Replace current list of reader with the cleaned readers taking care not to lose
             // any new reader that might have come in through concurrent calls to newWriter()
             List<TarReader> newReaders = newArrayList();
@@ -825,6 +849,8 @@ public class FileStore implements Segmen
                 }
             }
             readers = newReaders;
+        } finally {
+            fileStoreLock.writeLock().unlock();
         }
 
         // Close old readers *after* setting readers to the new readers to avoid accessing
@@ -991,23 +1017,28 @@ public class FileStore implements Segmen
         });
     }
 
-    public synchronized Iterable<SegmentId> getSegmentIds() {
-        List<SegmentId> ids = newArrayList();
-        if (writer != null) {
-            for (UUID uuid : writer.getUUIDs()) {
-                ids.add(tracker.getSegmentId(
-                        uuid.getMostSignificantBits(),
-                        uuid.getLeastSignificantBits()));
+    public Iterable<SegmentId> getSegmentIds() {
+        fileStoreLock.readLock().lock();
+        try {
+            List<SegmentId> ids = newArrayList();
+            if (writer != null) {
+                for (UUID uuid : writer.getUUIDs()) {
+                    ids.add(tracker.getSegmentId(
+                            uuid.getMostSignificantBits(),
+                            uuid.getLeastSignificantBits()));
+                }
             }
-        }
-        for (TarReader reader : readers) {
-            for (UUID uuid : reader.getUUIDs()) {
-                ids.add(tracker.getSegmentId(
-                        uuid.getMostSignificantBits(),
-                        uuid.getLeastSignificantBits()));
+            for (TarReader reader : readers) {
+                for (UUID uuid : reader.getUUIDs()) {
+                    ids.add(tracker.getSegmentId(
+                            uuid.getMostSignificantBits(),
+                            uuid.getLeastSignificantBits()));
+                }
             }
+            return ids;
+        } finally {
+            fileStoreLock.readLock().unlock();
         }
-        return ids;
     }
 
     @Override
@@ -1040,7 +1071,8 @@ public class FileStore implements Segmen
         try {
             flush();
             tracker.getWriter().dropCache();
-            synchronized (this) {
+            fileStoreLock.writeLock().lock();
+            try {
                 closeAndLogOnFail(writer);
 
                 List<TarReader> list = readers;
@@ -1054,6 +1086,8 @@ public class FileStore implements Segmen
                 }
                 closeAndLogOnFail(lockFile);
                 closeAndLogOnFail(journalFile);
+            } finally {
+                fileStoreLock.writeLock().unlock();
             }
         } catch (IOException e) {
             throw new RuntimeException(
@@ -1084,10 +1118,13 @@ public class FileStore implements Segmen
         }
 
         if (writer != null) {
-            synchronized (this) {
+            fileStoreLock.readLock().lock();
+            try {
                 if (writer.containsEntry(msb, lsb)) {
                     return true;
                 }
+            } finally {
+                fileStoreLock.readLock().unlock();
             }
         }
 
@@ -1126,7 +1163,8 @@ public class FileStore implements Segmen
         }
 
         if (writer != null) {
-            synchronized (this) {
+            fileStoreLock.readLock().lock();
+            try {
                 try {
                     ByteBuffer buffer = writer.readEntry(msb, lsb);
                     if (buffer != null) {
@@ -1135,6 +1173,8 @@ public class FileStore implements Segmen
                 } catch (IOException e) {
                     log.warn("Failed to read from tar file " + writer, e);
                 }
+            } finally {
+                fileStoreLock.readLock().unlock();
             }
         }
 
@@ -1162,8 +1202,8 @@ public class FileStore implements Segmen
     }
 
     @Override
-    public synchronized void writeSegment(
-            SegmentId id, byte[] data, int offset, int length) {
+    public void writeSegment(SegmentId id, byte[] data, int offset, int length) {
+        fileStoreLock.writeLock().lock();
         try {
             long size = writer.writeEntry(
                     id.getMostSignificantBits(),
@@ -1175,6 +1215,8 @@ public class FileStore implements Segmen
             approximateSize.addAndGet(TarWriter.BLOCK_SIZE + length + TarWriter.getPaddingSize(length));
         } catch (IOException e) {
             throw new RuntimeException(e);
+        } finally {
+            fileStoreLock.writeLock().unlock();
         }
     }
 
@@ -1249,10 +1291,15 @@ public class FileStore implements Segmen
         return this;
     }
 
-    private synchronized void setRevision(String rootRevision) {
-        RecordId id = RecordId.fromString(tracker, rootRevision);
-        head.set(id);
-        persistedHead.set(id);
+    private void setRevision(String rootRevision) {
+        fileStoreLock.writeLock().lock();
+        try {
+            RecordId id = RecordId.fromString(tracker, rootRevision);
+            head.set(id);
+            persistedHead.set(id);
+        } finally {
+            fileStoreLock.writeLock().unlock();
+        }
     }
 
     private void checkDiskSpace() {
@@ -1291,7 +1338,7 @@ public class FileStore implements Segmen
          *
          * @param revision
          */
-        public synchronized void setRevision(String revision) {
+        public void setRevision(String revision) {
             super.setRevision(revision);
         }
 
@@ -1301,7 +1348,7 @@ public class FileStore implements Segmen
         }
 
         @Override
-        public synchronized void writeSegment(SegmentId id, byte[] data,
+        public void writeSegment(SegmentId id, byte[] data,
                 int offset, int length) {
             throw new UnsupportedOperationException("Read Only Store");
         }
@@ -1313,7 +1360,7 @@ public class FileStore implements Segmen
         public void flush() { /* nop */ }
 
         @Override
-        public synchronized LinkedList<File> cleanup() {
+        public LinkedList<File> cleanup() {
             throw new UnsupportedOperationException("Read Only Store");
         }