You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/04/02 07:20:58 UTC

svn commit: r1583882 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentTracker.java SegmentWriter.java file/FileStore.java file/TarWriter.java

Author: jukka
Date: Wed Apr  2 05:20:57 2014
New Revision: 1583882

URL: http://svn.apache.org/r1583882
Log:
OAK-631: SegmentMK: Implement garbage collection

Improved flushing of the tar writer

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java Wed Apr  2 05:20:57 2014
@@ -100,6 +100,11 @@ public class SegmentTracker {
 
     Segment getSegment(SegmentId id) {
         Segment segment = store.readSegment(id);
+        setSegment(id, segment);
+        return segment;
+    }
+
+    void setSegment(SegmentId id, Segment segment) {
         id.setSegment(segment);
 
         LinkedList<Segment> segments = dataSegments;
@@ -117,11 +122,8 @@ public class SegmentTracker {
                 currentSize -= remove.getCacheSize();
             }
         }
-
-        return segment;
     }
 
-
     /**
      * Returns all segment identifiers that are currently referenced in memory.
      *

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Apr  2 05:20:57 2014
@@ -208,7 +208,15 @@ public class SegmentWriter {
             store.writeSegment(
                     segment.getSegmentId(),
                     buffer, buffer.length - length, length);
-            segment.getSegmentId().setSegment(null);
+
+            if (length < buffer.length / 2) {
+                byte[] data = new byte[length];
+                System.arraycopy(
+                        buffer, buffer.length - length, data, 0, length);
+                segment = new Segment(
+                        tracker, segment.getSegmentId(), ByteBuffer.wrap(data));
+            }
+            tracker.setSegment(segment.getSegmentId(), segment);
 
             buffer = createNewBuffer();
             roots.clear();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Apr  2 05:20:57 2014
@@ -25,6 +25,7 @@ import static com.google.common.collect.
 import static com.google.common.collect.Maps.newTreeMap;
 import static java.lang.String.format;
 import static java.util.Collections.singletonMap;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
@@ -214,13 +215,16 @@ public class FileStore implements Segmen
                 try {
                     timeToClose.await(1, SECONDS);
                     while (timeToClose.getCount() > 0) {
+                        long start = System.nanoTime();
                         try {
                             flush();
                         } catch (IOException e) {
                             log.warn("Failed to flush the TarMK at" +
                                     directory, e);
                         }
-                        timeToClose.await(5, SECONDS);
+                        long time = SECONDS.convert(
+                                System.nanoTime() - start, NANOSECONDS);
+                        timeToClose.await(Math.max(5, 2 * time), SECONDS);
                     }
                 } catch (InterruptedException e) {
                     log.warn("TarMK flush thread interrupted");
@@ -298,8 +302,11 @@ public class FileStore implements Segmen
                 // avoid a deadlock with another thread flushing the writer
                 tracker.getWriter().flush();
 
+                // needs to happen outside the synchronization block below to
+                // prevent the flush from stopping concurrent reads and writes
+                writer.flush();
+
                 synchronized (this) {
-                    writer.flush();
                     journalFile.writeBytes(after + " root\n");
                     journalFile.getChannel().force(false);
                     persistedHead.set(after);
@@ -391,9 +398,22 @@ public class FileStore implements Segmen
                 return true;
             }
         }
+
         synchronized (this) {
-            return writer.containsEntry(msb, lsb);
+            if (writer.containsEntry(msb, lsb)) {
+                return true;
+            }
+        }
+
+        // the writer might have switched to a new file,
+        // so we need to re-check the readers
+        for (TarReader reader : readers) {
+            if (reader.containsEntry(msb, lsb)) {
+                return true;
+            }
         }
+
+        return false;
     }
 
     @Override
@@ -419,6 +439,8 @@ public class FileStore implements Segmen
             }
         }
 
+        // the writer might have switched to a new file,
+        // so we need to re-check the readers
         for (TarReader reader : readers) {
             try {
                 ByteBuffer buffer = reader.readEntry(msb, lsb);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java?rev=1583882&r1=1583881&r2=1583882&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java Wed Apr  2 05:20:57 2014
@@ -24,6 +24,7 @@ import static com.google.common.collect.
 import static com.google.common.collect.Sets.newHashSet;
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
@@ -59,10 +60,35 @@ class TarWriter {
         }
     }
 
+    /**
+     * The file being written. This instance is also used as an additional
+     * synchronization point by {@link #flush()} and {@link #close()} to
+     * allow {@link #flush()} to work concurrently with normal reads and
+     * writes, but not with a concurrent {@link #close()}.
+     */
     private final File file;
 
+    /**
+     * File handle. Initialized lazily in
+     * {@link #writeEntry(long, long, byte[], int, int)} to avoid creating
+     * an extra empty file when just reading from the repository.
+     * Should only be accessed from synchronized code.
+     */
     private RandomAccessFile access = null;
 
+    /**
+     * Flag to indicate a closed writer. Accessing a closed writer is illegal.
+     * Should only be accessed from synchronized code.
+     */
+    private boolean closed = false;
+
+    /**
+     * Map of the entries that have already been written. Used by the
+     * {@link #containsEntry(long, long)} and {@link #readEntry(long, long)}
+     * methods to retrieve data from this file while it's still being written,
+     * and finally by the {@link #close()} method to generate the tar index.
+     * Should only be accessed from synchronized code;
+     */
     private final Map<UUID, TarEntry> index = newHashMap();
 
     TarWriter(File file) {
@@ -74,10 +100,12 @@ class TarWriter {
     }
 
     synchronized boolean containsEntry(long msb, long lsb) {
+        checkState(!closed);
         return index.containsKey(new UUID(msb, lsb));
     }
 
     synchronized ByteBuffer readEntry(long msb, long lsb) {
+        checkState(!closed);
         TarEntry entry = index.get(new UUID(msb, lsb));
         if (entry != null) {
             checkState(access != null); // implied by entry != null
@@ -118,6 +146,7 @@ class TarWriter {
     private synchronized long writeEntry(
             UUID uuid, byte[] header, byte[] data, int offset, int size)
             throws IOException {
+        checkState(!closed);
         if (access == null) {
             access = new RandomAccessFile(file, "rw");
         }
@@ -139,14 +168,55 @@ class TarWriter {
         return length;
     }
 
-    synchronized void flush() throws IOException {
-        if (access != null) {
-            access.getFD().sync();
+    /**
+     * Flushes the entries that have so far been written to the disk.
+     * This method is <em>not</em> synchronized to allow concurrent reads
+     * and writes to proceed while the file is being flushed. However,
+     * this method <em>is</em> carefully synchronized with {@link #close()}
+     * to prevent accidental flushing of an already closed file.
+     *
+     * @throws IOException if the tar file could not be flushed
+     */
+    void flush() throws IOException {
+        synchronized (file) {
+            FileDescriptor descriptor = null;
+
+            synchronized (this) {
+                if (access != null && !closed) {
+                    descriptor = access.getFD();
+                }
+            }
+
+            if (descriptor != null) {
+                descriptor.sync();
+            }
         }
     }
 
-    synchronized void close() throws IOException {
-        if (access != null) {
+    /**
+     * Closes this tar file.
+     *
+     * @throws IOException if the tar file could not be closed
+     */
+    void close() throws IOException {
+        // Mark this writer as closed. Note that we only need to synchronize
+        // this part, as no other synchronized methods should get invoked
+        // once close() has been initiated (see related checkState calls).
+        synchronized (this) {
+            checkState(!closed);
+            closed = true;
+        }
+
+        // If nothing was written to this file, then we're already done.
+        if (access == null) {
+            return;
+        }
+
+        // Complete the tar file by adding the index and the trailing two
+        // zero blocks. This code is synchronized on the file instance to
+        // ensure that no concurrent thread is still flushing the file when
+        // we close the file handle.
+        synchronized (file) {
             int indexSize = index.size() * 24 + 16;
             int padding = getPaddingSize(indexSize);
 
@@ -179,8 +249,6 @@ class TarWriter {
             access.write(ZERO_BYTES);
             access.write(ZERO_BYTES);
             access.close();
-
-            access = null;
         }
     }