You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/04/02 07:20:58 UTC
svn commit: r1583882 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment:
SegmentTracker.java SegmentWriter.java file/FileStore.java
file/TarWriter.java
Author: jukka
Date: Wed Apr 2 05:20:57 2014
New Revision: 1583882
URL: http://svn.apache.org/r1583882
Log:
OAK-631: SegmentMK: Implement garbage collection
Improved flushing of the tar writer
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java Wed Apr 2 05:20:57 2014
@@ -100,6 +100,11 @@ public class SegmentTracker {
Segment getSegment(SegmentId id) {
Segment segment = store.readSegment(id);
+ setSegment(id, segment);
+ return segment;
+ }
+
+ void setSegment(SegmentId id, Segment segment) {
id.setSegment(segment);
LinkedList<Segment> segments = dataSegments;
@@ -117,11 +122,8 @@ public class SegmentTracker {
currentSize -= remove.getCacheSize();
}
}
-
- return segment;
}
-
/**
* Returns all segment identifiers that are currently referenced in memory.
*
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Apr 2 05:20:57 2014
@@ -208,7 +208,15 @@ public class SegmentWriter {
store.writeSegment(
segment.getSegmentId(),
buffer, buffer.length - length, length);
- segment.getSegmentId().setSegment(null);
+
+ if (length < buffer.length / 2) {
+ byte[] data = new byte[length];
+ System.arraycopy(
+ buffer, buffer.length - length, data, 0, length);
+ segment = new Segment(
+ tracker, segment.getSegmentId(), ByteBuffer.wrap(data));
+ }
+ tracker.setSegment(segment.getSegmentId(), segment);
buffer = createNewBuffer();
roots.clear();
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Apr 2 05:20:57 2014
@@ -25,6 +25,7 @@ import static com.google.common.collect.
import static com.google.common.collect.Maps.newTreeMap;
import static java.lang.String.format;
import static java.util.Collections.singletonMap;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -214,13 +215,16 @@ public class FileStore implements Segmen
try {
timeToClose.await(1, SECONDS);
while (timeToClose.getCount() > 0) {
+ long start = System.nanoTime();
try {
flush();
} catch (IOException e) {
log.warn("Failed to flush the TarMK at" +
directory, e);
}
- timeToClose.await(5, SECONDS);
+ long time = SECONDS.convert(
+ System.nanoTime() - start, NANOSECONDS);
+ timeToClose.await(Math.max(5, 2 * time), SECONDS);
}
} catch (InterruptedException e) {
log.warn("TarMK flush thread interrupted");
@@ -298,8 +302,11 @@ public class FileStore implements Segmen
// avoid a deadlock with another thread flushing the writer
tracker.getWriter().flush();
+ // needs to happen outside the synchronization block below to
+ // prevent the flush from stopping concurrent reads and writes
+ writer.flush();
+
synchronized (this) {
- writer.flush();
journalFile.writeBytes(after + " root\n");
journalFile.getChannel().force(false);
persistedHead.set(after);
@@ -391,9 +398,22 @@ public class FileStore implements Segmen
return true;
}
}
+
synchronized (this) {
- return writer.containsEntry(msb, lsb);
+ if (writer.containsEntry(msb, lsb)) {
+ return true;
+ }
+ }
+
+ // the writer might have switched to a new file,
+ // so we need to re-check the readers
+ for (TarReader reader : readers) {
+ if (reader.containsEntry(msb, lsb)) {
+ return true;
+ }
}
+
+ return false;
}
@Override
@@ -419,6 +439,8 @@ public class FileStore implements Segmen
}
}
+ // the writer might have switched to a new file,
+ // so we need to re-check the readers
for (TarReader reader : readers) {
try {
ByteBuffer buffer = reader.readEntry(msb, lsb);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java Wed Apr 2 05:20:57 2014
@@ -24,6 +24,7 @@ import static com.google.common.collect.
import static com.google.common.collect.Sets.newHashSet;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
@@ -59,10 +60,35 @@ class TarWriter {
}
}
+ /**
+ * The file being written. This instance is also used as an additional
+ * synchronization point by {@link #flush()} and {@link #close()} to
+ * allow {@link #flush()} to work concurrently with normal reads and
+ * writes, but not with a concurrent {@link #close()}.
+ */
private final File file;
+ /**
+ * File handle. Initialized lazily in
+ * {@link #writeEntry(long, long, byte[], int, int)} to avoid creating
+ * an extra empty file when just reading from the repository.
+ * Should only be accessed from synchronized code.
+ */
private RandomAccessFile access = null;
+ /**
+ * Flag to indicate a closed writer. Accessing a closed writer is illegal.
+ * Should only be accessed from synchronized code.
+ */
+ private boolean closed = false;
+
+ /**
+ * Map of the entries that have already been written. Used by the
+ * {@link #containsEntry(long, long)} and {@link #readEntry(long, long)}
+ * methods to retrieve data from this file while it's still being written,
+ * and finally by the {@link #close()} method to generate the tar index.
+ * Should only be accessed from synchronized code;
+ */
private final Map<UUID, TarEntry> index = newHashMap();
TarWriter(File file) {
@@ -74,10 +100,12 @@ class TarWriter {
}
synchronized boolean containsEntry(long msb, long lsb) {
+ checkState(!closed);
return index.containsKey(new UUID(msb, lsb));
}
synchronized ByteBuffer readEntry(long msb, long lsb) {
+ checkState(!closed);
TarEntry entry = index.get(new UUID(msb, lsb));
if (entry != null) {
checkState(access != null); // implied by entry != null
@@ -118,6 +146,7 @@ class TarWriter {
private synchronized long writeEntry(
UUID uuid, byte[] header, byte[] data, int offset, int size)
throws IOException {
+ checkState(!closed);
if (access == null) {
access = new RandomAccessFile(file, "rw");
}
@@ -139,14 +168,55 @@ class TarWriter {
return length;
}
- synchronized void flush() throws IOException {
- if (access != null) {
- access.getFD().sync();
+ /**
+ * Flushes the entries that have so far been written to the disk.
+ * This method is <em>not</em> synchronized to allow concurrent reads
+ * and writes to proceed while the file is being flushed. However,
+ * this method <em>is</em> carefully synchronized with {@link #close()}
+ * to prevent accidental flushing of an already closed file.
+ *
+ * @throws IOException if the tar file could not be flushed
+ */
+ void flush() throws IOException {
+ synchronized (file) {
+ FileDescriptor descriptor = null;
+
+ synchronized (this) {
+ if (access != null && !closed) {
+ descriptor = access.getFD();
+ }
+ }
+
+ if (descriptor != null) {
+ descriptor.sync();
+ }
}
}
- synchronized void close() throws IOException {
- if (access != null) {
+ /**
+ * Closes this tar file.
+ *
+ * @throws IOException if the tar file could not be closed
+ */
+ void close() throws IOException {
+ // Mark this writer as closed. Note that we only need to synchronize
+ // this part, as no other synchronized methods should get invoked
+ // once close() has been initiated (see related checkState calls).
+ synchronized (this) {
+ checkState(!closed);
+ closed = true;
+ }
+
+ // If nothing was written to this file, then we're already done.
+ if (access == null) {
+ return;
+ }
+
+ // Complete the tar file by adding the index and the trailing two
+ // zero blocks. This code is synchronized on the file instance to
+ // ensure that no concurrent thread is still flushing the file when
+ // we close the file handle.
+ synchronized (file) {
int indexSize = index.size() * 24 + 16;
int padding = getPaddingSize(indexSize);
@@ -179,8 +249,6 @@ class TarWriter {
access.write(ZERO_BYTES);
access.write(ZERO_BYTES);
access.close();
-
- access = null;
}
}