You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/07/01 15:44:55 UTC

svn commit: r1607077 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/f...

Author: jukka
Date: Tue Jul  1 13:44:55 2014
New Revision: 1607077

URL: http://svn.apache.org/r1607077
Log:
OAK-1932: TarMK compaction can create mixed segments

Allow a different SegmentWriter to be used with the compactor
Adjust compaction and cleanup code to make it easier to test and monitor

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java Tue Jul  1 13:44:55 2014
@@ -64,7 +64,7 @@ import java.util.UUID;
  * average, the amortized size of each entry in this mapping is about
  * {@code 20/n + 8} bytes, assuming compressed pointers.
  */
-class CompactionMap {
+public class CompactionMap {
 
     private final int compressInterval;
     private final Map<RecordId, RecordId> recent = newHashMap();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Tue Jul  1 13:44:55 2014
@@ -52,33 +52,6 @@ public class Compactor {
     /** Logger instance */
     private static final Logger log = LoggerFactory.getLogger(Compactor.class);
 
-    public static CompactionMap compact(SegmentStore store) {
-        SegmentWriter writer = store.getTracker().getWriter();
-        Compactor compactor = new Compactor(writer);
-
-        log.debug("TarMK compaction");
-
-        SegmentNodeBuilder builder = writer.writeNode(EMPTY_NODE).builder();
-        SegmentNodeState before = store.getHead();
-        EmptyNodeState.compareAgainstEmptyState(
-                before, compactor.newCompactDiff(builder));
-
-        SegmentNodeState after = builder.getNodeState();
-        while (!store.setHead(before, after)) {
-            // Some other concurrent changes have been made.
-            // Rebase (and compact) those changes on top of the
-            // compacted state before retrying to set the head.
-            SegmentNodeState head = store.getHead();
-            head.compareAgainstBaseState(
-                    before, compactor.newCompactDiff(builder));
-            before = head;
-            after = builder.getNodeState();
-        }
-
-        compactor.map.compress();
-        return compactor.map;
-    }
-
     /**
      * Locks down the RecordId persistence structure
      */
@@ -89,6 +62,8 @@ public class Compactor {
 
     private final SegmentWriter writer;
 
+    private final SegmentNodeBuilder builder;
+
     private CompactionMap map = new CompactionMap(100000);
 
     /**
@@ -98,12 +73,19 @@ public class Compactor {
      */
     private final Map<String, List<RecordId>> binaries = newHashMap();
 
-    private Compactor(SegmentWriter writer) {
+    public Compactor(SegmentWriter writer) {
         this.writer = writer;
+        this.builder = writer.writeNode(EMPTY_NODE).builder();
+    }
+
+    public SegmentNodeState compact(NodeState before, NodeState after) {
+        after.compareAgainstBaseState(before, new CompactDiff(builder));
+        return builder.getNodeState();
     }
 
-    private CompactDiff newCompactDiff(NodeBuilder builder) {
-        return new CompactDiff(builder);
+    public CompactionMap getCompactionMap() {
+        map.compress();
+        return map;
     }
 
     private class CompactDiff extends ApplyDiff {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Tue Jul  1 13:44:55 2014
@@ -58,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -239,7 +240,6 @@ public class FileStore implements Segmen
                     timeToClose.await(1, SECONDS);
                     while (timeToClose.getCount() > 0) {
                         long start = System.nanoTime();
-                        compact();
                         try {
                             flush();
                         } catch (IOException e) {
@@ -335,7 +335,19 @@ public class FileStore implements Segmen
         return dataFiles;
     }
 
+    public synchronized long size() throws IOException {
+        long size = writeFile.length();
+        for (TarReader reader : readers) {
+            size += reader.size();
+        }
+        return size;
+    }
+
     public void flush() throws IOException {
+        if (compactNeeded.getAndSet(false)) {
+            compact();
+        }
+
         synchronized (persistedHead) {
             RecordId before = persistedHead.get();
             RecordId after = head.get();
@@ -356,39 +368,7 @@ public class FileStore implements Segmen
                     persistedHead.set(after);
 
                     if (cleanup) {
-                        long start = System.nanoTime();
-
-                        // Suggest to the JVM that now would be a good time
-                        // to clear stale weak references in the SegmentTracker
-                        System.gc();
-
-                        Set<UUID> ids = newHashSet();
-                        for (SegmentId id : tracker.getReferencedSegmentIds()) {
-                            ids.add(new UUID(
-                                    id.getMostSignificantBits(),
-                                    id.getLeastSignificantBits()));
-                        }
-                        writer.cleanup(ids);
-
-                        List<TarReader> list =
-                                newArrayListWithCapacity(readers.size());
-                        for (TarReader reader : readers) {
-                            TarReader cleaned = reader.cleanup(ids);
-                            if (cleaned == reader) {
-                                list.add(reader);
-                            } else {
-                                if (cleaned != null) {
-                                    list.add(cleaned);
-                                }
-                                File file = reader.close();
-                                log.info("TarMK GC: Cleaned up file {}", file);
-                                toBeRemoved.addLast(file);
-                            }
-                        }
-                        readers = list;
-
-                        log.debug("TarMK GC: Completed in {}ms",
-                                MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
+                        cleanup();
                     }
                 }
 
@@ -405,15 +385,65 @@ public class FileStore implements Segmen
         }
     }
 
-    public void compact() {
-        if (compactNeeded.getAndSet(false)) {
-            long start = System.nanoTime();
-            tracker.getWriter().dropCache();
-            tracker.setCompactionMap(Compactor.compact(this));
-            log.info("TarMK Compaction: Completed in {}ms", MILLISECONDS
-                    .convert(System.nanoTime() - start, NANOSECONDS));
-            cleanupNeeded.set(true);
+    synchronized void cleanup() throws IOException {
+        long start = System.nanoTime();
+        log.info("TarMK revision cleanup started");
+
+        // Suggest to the JVM that now would be a good time
+        // to clear stale weak references in the SegmentTracker
+        System.gc();
+
+        Set<UUID> ids = newHashSet();
+        for (SegmentId id : tracker.getReferencedSegmentIds()) {
+            ids.add(new UUID(
+                    id.getMostSignificantBits(),
+                    id.getLeastSignificantBits()));
+        }
+        writer.cleanup(ids);
+
+        List<TarReader> list =
+                newArrayListWithCapacity(readers.size());
+        for (TarReader reader : readers) {
+            TarReader cleaned = reader.cleanup(ids);
+            if (cleaned == reader) {
+                list.add(reader);
+            } else {
+                if (cleaned != null) {
+                    list.add(cleaned);
+                }
+                File file = reader.close();
+                log.info("TarMK revision cleanup reclaiming {}", file.getName());
+                toBeRemoved.addLast(file);
+            }
         }
+        readers = list;
+
+        log.info("TarMK revision cleanup completed in {}ms",
+                MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
+    }
+
+    public void compact() {
+        long start = System.nanoTime();
+        log.info("TarMK compaction started");
+
+        SegmentWriter writer = new SegmentWriter(this, tracker);
+        Compactor compactor = new Compactor(writer);
+
+        SegmentNodeState before = getHead();
+        SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
+        while (!setHead(before, after)) {
+            // Some other concurrent changes have been made.
+            // Rebase (and compact) those changes on top of the
+            // compacted state before retrying to set the head.
+            SegmentNodeState head = getHead();
+            after = compactor.compact(before, head);
+            before = head;
+        }
+        tracker.setCompactionMap(compactor.getCompactionMap());
+
+        log.info("TarMK compaction completed in {}ms", MILLISECONDS
+                .convert(System.nanoTime() - start, NANOSECONDS));
+        cleanupNeeded.set(true);
     }
 
     public synchronized Iterable<SegmentId> getSegmentIds() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java Tue Jul  1 13:44:55 2014
@@ -481,6 +481,10 @@ class TarReader {
         this.graph = loadGraph(file, access, index);
     }
 
+    long size() {
+        return file.length();
+    }
+
     Set<UUID> getUUIDs() {
         Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
         int position = index.position();

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java Tue Jul  1 13:44:55 2014
@@ -31,8 +31,12 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.Compactor;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,6 +88,88 @@ public class FileStoreTest {
     }
 
     @Test
+    public void testCompaction() throws IOException {
+        int largeBinarySize = 10 * 1024 * 1024;
+
+        FileStore store = new FileStore(directory, 1, false);
+        SegmentWriter writer = store.getTracker().getWriter();
+
+        SegmentNodeState base = store.getHead();
+        SegmentNodeBuilder builder = base.builder();
+        byte[] data = new byte[largeBinarySize];
+        new Random().nextBytes(data);
+        SegmentBlob blob = writer.writeStream(new ByteArrayInputStream(data));
+        builder.setProperty("foo", blob);
+        builder.getNodeState(); // write the blob reference to the segment
+        builder.setProperty("foo", "bar");
+        SegmentNodeState head = builder.getNodeState();
+        assertTrue(store.setHead(base, head));
+        assertEquals("bar", store.getHead().getString("foo"));
+        store.close();
+
+        // First simulate the case where during compaction a reference to the
+        // older segments is added to a segment that the compactor is writing
+        store = new FileStore(directory, 1, false);
+        head = store.getHead();
+        assertTrue(store.size() > largeBinarySize);
+        Compactor compactor = new Compactor(writer);
+        SegmentNodeState compacted =
+                compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+        builder = head.builder();
+        builder.setChildNode("old", head); // reference to pre-compacted state
+        builder.getNodeState();
+        assertTrue(store.setHead(head, compacted));
+        store.close();
+
+        // In this case the revision cleanup is unable to reclaim the old data
+        store = new FileStore(directory, 1, false);
+        assertTrue(store.size() > largeBinarySize);
+        store.cleanup();
+        assertTrue(store.size() > largeBinarySize);
+        store.close();
+
+        // Now we do the same thing, but let the compactor use a different
+        // SegmentWriter
+        store = new FileStore(directory, 1, false);
+        head = store.getHead();
+        assertTrue(store.size() > largeBinarySize);
+        writer = new SegmentWriter(store, store.getTracker());
+        compactor = new Compactor(writer);
+        compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+        builder = head.builder();
+        builder.setChildNode("old", head); // reference to pre-compacted state
+        builder.getNodeState();
+        writer.flush();
+        assertTrue(store.setHead(head, compacted));
+        store.close();
+
+        // Revision cleanup is still unable to reclaim extra space (OAK-1932)
+        store = new FileStore(directory, 1, false);
+        assertTrue(store.size() > largeBinarySize);
+        store.cleanup();
+        assertTrue(store.size() > largeBinarySize); // FIXME: should be <
+        store.close();
+
+        // Finally do the compaction without concurrent writes
+        store = new FileStore(directory, 1, false);
+        head = store.getHead();
+        assertTrue(store.size() > largeBinarySize);
+        writer = new SegmentWriter(store, store.getTracker());
+        compactor = new Compactor(writer);
+        compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+        writer.flush();
+        assertTrue(store.setHead(head, compacted));
+        store.close();
+
+        // Now the revision cleanup is able to reclaim the extra space
+        store = new FileStore(directory, 1, false);
+        assertTrue(store.size() > largeBinarySize);
+        store.cleanup();
+        assertTrue(store.size() < largeBinarySize);
+        store.close();
+    }
+
+    @Test
     public void testRecovery() throws IOException {
         FileStore store = new FileStore(directory, 1, false);
         store.flush(); // first 1kB

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Tue Jul  1 13:44:55 2014
@@ -61,7 +61,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.Compactor;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -216,7 +215,7 @@ public class Main {
             System.out.println("    -> compacting");
             FileStore store = new FileStore(directory, 256, false);
             try {
-                Compactor.compact(store);
+                store.compact();
             } finally {
                 store.close();
             }