You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/07/01 15:44:55 UTC
svn commit: r1607077 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/f...
Author: jukka
Date: Tue Jul 1 13:44:55 2014
New Revision: 1607077
URL: http://svn.apache.org/r1607077
Log:
OAK-1932: TarMK compaction can create mixed segments
Allow a different SegmentWriter to be used with the compactor
Adjust compaction and cleanup code to make it easier to test and monitor
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java Tue Jul 1 13:44:55 2014
@@ -64,7 +64,7 @@ import java.util.UUID;
* average, the amortized size of each entry in this mapping is about
* {@code 20/n + 8} bytes, assuming compressed pointers.
*/
-class CompactionMap {
+public class CompactionMap {
private final int compressInterval;
private final Map<RecordId, RecordId> recent = newHashMap();
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Tue Jul 1 13:44:55 2014
@@ -52,33 +52,6 @@ public class Compactor {
/** Logger instance */
private static final Logger log = LoggerFactory.getLogger(Compactor.class);
- public static CompactionMap compact(SegmentStore store) {
- SegmentWriter writer = store.getTracker().getWriter();
- Compactor compactor = new Compactor(writer);
-
- log.debug("TarMK compaction");
-
- SegmentNodeBuilder builder = writer.writeNode(EMPTY_NODE).builder();
- SegmentNodeState before = store.getHead();
- EmptyNodeState.compareAgainstEmptyState(
- before, compactor.newCompactDiff(builder));
-
- SegmentNodeState after = builder.getNodeState();
- while (!store.setHead(before, after)) {
- // Some other concurrent changes have been made.
- // Rebase (and compact) those changes on top of the
- // compacted state before retrying to set the head.
- SegmentNodeState head = store.getHead();
- head.compareAgainstBaseState(
- before, compactor.newCompactDiff(builder));
- before = head;
- after = builder.getNodeState();
- }
-
- compactor.map.compress();
- return compactor.map;
- }
-
/**
* Locks down the RecordId persistence structure
*/
@@ -89,6 +62,8 @@ public class Compactor {
private final SegmentWriter writer;
+ private final SegmentNodeBuilder builder;
+
private CompactionMap map = new CompactionMap(100000);
/**
@@ -98,12 +73,19 @@ public class Compactor {
*/
private final Map<String, List<RecordId>> binaries = newHashMap();
- private Compactor(SegmentWriter writer) {
+ public Compactor(SegmentWriter writer) {
this.writer = writer;
+ this.builder = writer.writeNode(EMPTY_NODE).builder();
+ }
+
+ public SegmentNodeState compact(NodeState before, NodeState after) {
+ after.compareAgainstBaseState(before, new CompactDiff(builder));
+ return builder.getNodeState();
}
- private CompactDiff newCompactDiff(NodeBuilder builder) {
- return new CompactDiff(builder);
+ public CompactionMap getCompactionMap() {
+ map.compress();
+ return map;
}
private class CompactDiff extends ApplyDiff {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Tue Jul 1 13:44:55 2014
@@ -58,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -239,7 +240,6 @@ public class FileStore implements Segmen
timeToClose.await(1, SECONDS);
while (timeToClose.getCount() > 0) {
long start = System.nanoTime();
- compact();
try {
flush();
} catch (IOException e) {
@@ -335,7 +335,19 @@ public class FileStore implements Segmen
return dataFiles;
}
+ public synchronized long size() throws IOException {
+ long size = writeFile.length();
+ for (TarReader reader : readers) {
+ size += reader.size();
+ }
+ return size;
+ }
+
public void flush() throws IOException {
+ if (compactNeeded.getAndSet(false)) {
+ compact();
+ }
+
synchronized (persistedHead) {
RecordId before = persistedHead.get();
RecordId after = head.get();
@@ -356,39 +368,7 @@ public class FileStore implements Segmen
persistedHead.set(after);
if (cleanup) {
- long start = System.nanoTime();
-
- // Suggest to the JVM that now would be a good time
- // to clear stale weak references in the SegmentTracker
- System.gc();
-
- Set<UUID> ids = newHashSet();
- for (SegmentId id : tracker.getReferencedSegmentIds()) {
- ids.add(new UUID(
- id.getMostSignificantBits(),
- id.getLeastSignificantBits()));
- }
- writer.cleanup(ids);
-
- List<TarReader> list =
- newArrayListWithCapacity(readers.size());
- for (TarReader reader : readers) {
- TarReader cleaned = reader.cleanup(ids);
- if (cleaned == reader) {
- list.add(reader);
- } else {
- if (cleaned != null) {
- list.add(cleaned);
- }
- File file = reader.close();
- log.info("TarMK GC: Cleaned up file {}", file);
- toBeRemoved.addLast(file);
- }
- }
- readers = list;
-
- log.debug("TarMK GC: Completed in {}ms",
- MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
+ cleanup();
}
}
@@ -405,15 +385,65 @@ public class FileStore implements Segmen
}
}
- public void compact() {
- if (compactNeeded.getAndSet(false)) {
- long start = System.nanoTime();
- tracker.getWriter().dropCache();
- tracker.setCompactionMap(Compactor.compact(this));
- log.info("TarMK Compaction: Completed in {}ms", MILLISECONDS
- .convert(System.nanoTime() - start, NANOSECONDS));
- cleanupNeeded.set(true);
+ synchronized void cleanup() throws IOException {
+ long start = System.nanoTime();
+ log.info("TarMK revision cleanup started");
+
+ // Suggest to the JVM that now would be a good time
+ // to clear stale weak references in the SegmentTracker
+ System.gc();
+
+ Set<UUID> ids = newHashSet();
+ for (SegmentId id : tracker.getReferencedSegmentIds()) {
+ ids.add(new UUID(
+ id.getMostSignificantBits(),
+ id.getLeastSignificantBits()));
+ }
+ writer.cleanup(ids);
+
+ List<TarReader> list =
+ newArrayListWithCapacity(readers.size());
+ for (TarReader reader : readers) {
+ TarReader cleaned = reader.cleanup(ids);
+ if (cleaned == reader) {
+ list.add(reader);
+ } else {
+ if (cleaned != null) {
+ list.add(cleaned);
+ }
+ File file = reader.close();
+ log.info("TarMK revision cleanup reclaiming {}", file.getName());
+ toBeRemoved.addLast(file);
+ }
}
+ readers = list;
+
+ log.info("TarMK revision cleanup completed in {}ms",
+ MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
+ }
+
+ public void compact() {
+ long start = System.nanoTime();
+ log.info("TarMK compaction started");
+
+ SegmentWriter writer = new SegmentWriter(this, tracker);
+ Compactor compactor = new Compactor(writer);
+
+ SegmentNodeState before = getHead();
+ SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
+ while (!setHead(before, after)) {
+ // Some other concurrent changes have been made.
+ // Rebase (and compact) those changes on top of the
+ // compacted state before retrying to set the head.
+ SegmentNodeState head = getHead();
+ after = compactor.compact(before, head);
+ before = head;
+ }
+ tracker.setCompactionMap(compactor.getCompactionMap());
+
+ log.info("TarMK compaction completed in {}ms", MILLISECONDS
+ .convert(System.nanoTime() - start, NANOSECONDS));
+ cleanupNeeded.set(true);
}
public synchronized Iterable<SegmentId> getSegmentIds() {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java Tue Jul 1 13:44:55 2014
@@ -481,6 +481,10 @@ class TarReader {
this.graph = loadGraph(file, access, index);
}
+ long size() {
+ return file.length();
+ }
+
Set<UUID> getUUIDs() {
Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
int position = index.position();
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java Tue Jul 1 13:44:55 2014
@@ -31,8 +31,12 @@ import java.util.Map;
import java.util.Random;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.Compactor;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.junit.Before;
import org.junit.Test;
@@ -84,6 +88,88 @@ public class FileStoreTest {
}
@Test
+ public void testCompaction() throws IOException {
+ int largeBinarySize = 10 * 1024 * 1024;
+
+ FileStore store = new FileStore(directory, 1, false);
+ SegmentWriter writer = store.getTracker().getWriter();
+
+ SegmentNodeState base = store.getHead();
+ SegmentNodeBuilder builder = base.builder();
+ byte[] data = new byte[largeBinarySize];
+ new Random().nextBytes(data);
+ SegmentBlob blob = writer.writeStream(new ByteArrayInputStream(data));
+ builder.setProperty("foo", blob);
+ builder.getNodeState(); // write the blob reference to the segment
+ builder.setProperty("foo", "bar");
+ SegmentNodeState head = builder.getNodeState();
+ assertTrue(store.setHead(base, head));
+ assertEquals("bar", store.getHead().getString("foo"));
+ store.close();
+
+ // First simulate the case where during compaction a reference to the
+ // older segments is added to a segment that the compactor is writing
+ store = new FileStore(directory, 1, false);
+ head = store.getHead();
+ assertTrue(store.size() > largeBinarySize);
+ Compactor compactor = new Compactor(writer);
+ SegmentNodeState compacted =
+ compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+ builder = head.builder();
+ builder.setChildNode("old", head); // reference to pre-compacted state
+ builder.getNodeState();
+ assertTrue(store.setHead(head, compacted));
+ store.close();
+
+ // In this case the revision cleanup is unable to reclaim the old data
+ store = new FileStore(directory, 1, false);
+ assertTrue(store.size() > largeBinarySize);
+ store.cleanup();
+ assertTrue(store.size() > largeBinarySize);
+ store.close();
+
+ // Now we do the same thing, but let the compactor use a different
+ // SegmentWriter
+ store = new FileStore(directory, 1, false);
+ head = store.getHead();
+ assertTrue(store.size() > largeBinarySize);
+ writer = new SegmentWriter(store, store.getTracker());
+ compactor = new Compactor(writer);
+ compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+ builder = head.builder();
+ builder.setChildNode("old", head); // reference to pre-compacted state
+ builder.getNodeState();
+ writer.flush();
+ assertTrue(store.setHead(head, compacted));
+ store.close();
+
+ // Revision cleanup is still unable to reclaim extra space (OAK-1932)
+ store = new FileStore(directory, 1, false);
+ assertTrue(store.size() > largeBinarySize);
+ store.cleanup();
+ assertTrue(store.size() > largeBinarySize); // FIXME: should be <
+ store.close();
+
+ // Finally do the compaction without concurrent writes
+ store = new FileStore(directory, 1, false);
+ head = store.getHead();
+ assertTrue(store.size() > largeBinarySize);
+ writer = new SegmentWriter(store, store.getTracker());
+ compactor = new Compactor(writer);
+ compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
+ writer.flush();
+ assertTrue(store.setHead(head, compacted));
+ store.close();
+
+ // Now the revision cleanup is able to reclaim the extra space
+ store = new FileStore(directory, 1, false);
+ assertTrue(store.size() > largeBinarySize);
+ store.cleanup();
+ assertTrue(store.size() < largeBinarySize);
+ store.close();
+ }
+
+ @Test
public void testRecovery() throws IOException {
FileStore store = new FileStore(directory, 1, false);
store.flush(); // first 1kB
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1607077&r1=1607076&r2=1607077&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Tue Jul 1 13:44:55 2014
@@ -61,7 +61,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.Compactor;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -216,7 +215,7 @@ public class Main {
System.out.println(" -> compacting");
FileStore store = new FileStore(directory, 256, false);
try {
- Compactor.compact(store);
+ store.compact();
} finally {
store.close();
}