You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/10/13 16:17:28 UTC
svn commit: r1708402 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Author: mduerig
Date: Tue Oct 13 14:17:28 2015
New Revision: 1708402
URL: http://svn.apache.org/viewvc?rev=1708402&view=rev
Log:
OAK-3330: FileStore lock contention with concurrent writers
User read/write lock instead of a single lock for all operations
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1708402&r1=1708401&r2=1708402&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Tue Oct 13 14:17:28 2015
@@ -52,6 +52,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -199,6 +201,8 @@ public class FileStore implements Segmen
*/
private volatile boolean shutdown;
+ private final ReadWriteLock fileStoreLock = new ReentrantReadWriteLock();
+
/**
* Create a new instance of a {@link Builder} for a file store.
* @param directory directory where the tar files are stored
@@ -666,12 +670,17 @@ public class FileStore implements Segmen
return dataFiles;
}
- public synchronized long size() {
- long size = writeFile.length();
- for (TarReader reader : readers) {
- size += reader.size();
+ public long size() {
+ fileStoreLock.readLock().lock();
+ try {
+ long size = writeFile.length();
+ for (TarReader reader : readers) {
+ size += reader.size();
+ }
+ return size;
+ } finally {
+ fileStoreLock.readLock().unlock();
}
- return size;
}
/**
@@ -679,15 +688,20 @@ public class FileStore implements Segmen
*
* @return number of segments
*/
- private synchronized int count() {
- int count = 0;
- if (writer != null) {
- count += writer.count();
- }
- for (TarReader reader : readers) {
- count += reader.count();
+ private int count() {
+ fileStoreLock.readLock().lock();
+ try {
+ int count = 0;
+ if (writer != null) {
+ count += writer.count();
+ }
+ for (TarReader reader : readers) {
+ count += reader.count();
+ }
+ return count;
+ } finally {
+ fileStoreLock.readLock().unlock();
}
- return count;
}
/**
@@ -698,13 +712,16 @@ public class FileStore implements Segmen
*/
CompactionGainEstimate estimateCompactionGain(Supplier<Boolean> stop) {
CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop);
- synchronized (this) {
+ fileStoreLock.readLock().lock();
+ try {
for (TarReader reader : readers) {
reader.accept(estimate);
if (stop.get()) {
break;
}
}
+ } finally {
+ fileStoreLock.readLock().unlock();
}
return estimate;
}
@@ -724,11 +741,14 @@ public class FileStore implements Segmen
// prevent the flush from stopping concurrent reads and writes
writer.flush();
- synchronized (this) {
+ fileStoreLock.writeLock().lock();
+ try {
log.debug("TarMK journal update {} -> {}", before, after);
journalFile.writeBytes(after.toString10() + " root\n");
journalFile.getChannel().force(false);
persistedHead.set(after);
+ } finally {
+ fileStoreLock.writeLock().unlock();
}
// Needs to happen outside the synchronization block above to
@@ -773,7 +793,8 @@ public class FileStore implements Segmen
Set<UUID> referencedIds = newHashSet();
Map<TarReader, TarReader> cleaned = newLinkedHashMap();
- synchronized (this) {
+ fileStoreLock.writeLock().lock();
+ try {
gcMonitor.info("TarMK revision cleanup started. Current repository size {}",
humanReadableByteCount(initialSize));
@@ -791,6 +812,8 @@ public class FileStore implements Segmen
for (TarReader reader : readers) {
cleaned.put(reader, reader);
}
+ } finally {
+ fileStoreLock.writeLock().unlock();
}
// Do actual cleanup outside of the lock to prevent blocking
@@ -807,7 +830,8 @@ public class FileStore implements Segmen
}
List<TarReader> oldReaders = newArrayList();
- synchronized (this) {
+ fileStoreLock.writeLock().lock();
+ try {
// Replace current list of reader with the cleaned readers taking care not to lose
// any new reader that might have come in through concurrent calls to newWriter()
List<TarReader> newReaders = newArrayList();
@@ -825,6 +849,8 @@ public class FileStore implements Segmen
}
}
readers = newReaders;
+ } finally {
+ fileStoreLock.writeLock().unlock();
}
// Close old readers *after* setting readers to the new readers to avoid accessing
@@ -991,23 +1017,28 @@ public class FileStore implements Segmen
});
}
- public synchronized Iterable<SegmentId> getSegmentIds() {
- List<SegmentId> ids = newArrayList();
- if (writer != null) {
- for (UUID uuid : writer.getUUIDs()) {
- ids.add(tracker.getSegmentId(
- uuid.getMostSignificantBits(),
- uuid.getLeastSignificantBits()));
+ public Iterable<SegmentId> getSegmentIds() {
+ fileStoreLock.readLock().lock();
+ try {
+ List<SegmentId> ids = newArrayList();
+ if (writer != null) {
+ for (UUID uuid : writer.getUUIDs()) {
+ ids.add(tracker.getSegmentId(
+ uuid.getMostSignificantBits(),
+ uuid.getLeastSignificantBits()));
+ }
}
- }
- for (TarReader reader : readers) {
- for (UUID uuid : reader.getUUIDs()) {
- ids.add(tracker.getSegmentId(
- uuid.getMostSignificantBits(),
- uuid.getLeastSignificantBits()));
+ for (TarReader reader : readers) {
+ for (UUID uuid : reader.getUUIDs()) {
+ ids.add(tracker.getSegmentId(
+ uuid.getMostSignificantBits(),
+ uuid.getLeastSignificantBits()));
+ }
}
+ return ids;
+ } finally {
+ fileStoreLock.readLock().unlock();
}
- return ids;
}
@Override
@@ -1040,7 +1071,8 @@ public class FileStore implements Segmen
try {
flush();
tracker.getWriter().dropCache();
- synchronized (this) {
+ fileStoreLock.writeLock().lock();
+ try {
closeAndLogOnFail(writer);
List<TarReader> list = readers;
@@ -1054,6 +1086,8 @@ public class FileStore implements Segmen
}
closeAndLogOnFail(lockFile);
closeAndLogOnFail(journalFile);
+ } finally {
+ fileStoreLock.writeLock().unlock();
}
} catch (IOException e) {
throw new RuntimeException(
@@ -1084,10 +1118,13 @@ public class FileStore implements Segmen
}
if (writer != null) {
- synchronized (this) {
+ fileStoreLock.readLock().lock();
+ try {
if (writer.containsEntry(msb, lsb)) {
return true;
}
+ } finally {
+ fileStoreLock.readLock().unlock();
}
}
@@ -1126,7 +1163,8 @@ public class FileStore implements Segmen
}
if (writer != null) {
- synchronized (this) {
+ fileStoreLock.readLock().lock();
+ try {
try {
ByteBuffer buffer = writer.readEntry(msb, lsb);
if (buffer != null) {
@@ -1135,6 +1173,8 @@ public class FileStore implements Segmen
} catch (IOException e) {
log.warn("Failed to read from tar file " + writer, e);
}
+ } finally {
+ fileStoreLock.readLock().unlock();
}
}
@@ -1162,8 +1202,8 @@ public class FileStore implements Segmen
}
@Override
- public synchronized void writeSegment(
- SegmentId id, byte[] data, int offset, int length) {
+ public void writeSegment(SegmentId id, byte[] data, int offset, int length) {
+ fileStoreLock.writeLock().lock();
try {
long size = writer.writeEntry(
id.getMostSignificantBits(),
@@ -1175,6 +1215,8 @@ public class FileStore implements Segmen
approximateSize.addAndGet(TarWriter.BLOCK_SIZE + length + TarWriter.getPaddingSize(length));
} catch (IOException e) {
throw new RuntimeException(e);
+ } finally {
+ fileStoreLock.writeLock().unlock();
}
}
@@ -1249,10 +1291,15 @@ public class FileStore implements Segmen
return this;
}
- private synchronized void setRevision(String rootRevision) {
- RecordId id = RecordId.fromString(tracker, rootRevision);
- head.set(id);
- persistedHead.set(id);
+ private void setRevision(String rootRevision) {
+ fileStoreLock.writeLock().lock();
+ try {
+ RecordId id = RecordId.fromString(tracker, rootRevision);
+ head.set(id);
+ persistedHead.set(id);
+ } finally {
+ fileStoreLock.writeLock().unlock();
+ }
}
private void checkDiskSpace() {
@@ -1291,7 +1338,7 @@ public class FileStore implements Segmen
*
* @param revision
*/
- public synchronized void setRevision(String revision) {
+ public void setRevision(String revision) {
super.setRevision(revision);
}
@@ -1301,7 +1348,7 @@ public class FileStore implements Segmen
}
@Override
- public synchronized void writeSegment(SegmentId id, byte[] data,
+ public void writeSegment(SegmentId id, byte[] data,
int offset, int length) {
throw new UnsupportedOperationException("Read Only Store");
}
@@ -1313,7 +1360,7 @@ public class FileStore implements Segmen
public void flush() { /* nop */ }
@Override
- public synchronized LinkedList<File> cleanup() {
+ public LinkedList<File> cleanup() {
throw new UnsupportedOperationException("Read Only Store");
}