You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2016/06/01 15:20:49 UTC
svn commit: r1746478 - in /jackrabbit/oak/trunk:
oak-run/src/main/java/org/apache/jackrabbit/oak/run/
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/
oak-segme...
Author: alexparvulescu
Date: Wed Jun 1 15:20:49 2016
New Revision: 1746478
URL: http://svn.apache.org/viewvc?rev=1746478&view=rev
Log:
OAK-4279 Rework offline compaction
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (with props)
Modified:
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java Wed Jun 1 15:20:49 2016
@@ -32,6 +32,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.segment.SegmentGraph.writeSegmentGraph;
import static org.apache.jackrabbit.oak.segment.SegmentNodeStateHelper.getTemplateId;
import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.DEFAULT;
import static org.apache.jackrabbit.oak.segment.file.tooling.ConsistencyChecker.checkConsistency;
import static org.slf4j.LoggerFactory.getLogger;
@@ -86,6 +87,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentVersion;
import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStore.Builder;
import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore;
import org.apache.jackrabbit.oak.segment.file.JournalReader;
import org.apache.jackrabbit.oak.segment.file.tooling.RevisionHistory;
@@ -178,9 +180,9 @@ final class SegmentTarUtils {
}
static void compact(File directory, boolean force) throws IOException {
- FileStore store = openFileStore(directory.getAbsolutePath(), force);
+ FileStore store = newFileStoreBuilder(directory.getAbsolutePath(),
+ force).withGCOptions(DEFAULT.setOffline()).build();
try {
- boolean persistCM = Boolean.getBoolean("tar.PersistCompactionMap");
System.out.println("Compacting " + directory);
System.out.println(" before " + Arrays.toString(directory.list()));
long sizeBefore = FileUtils.sizeOfDirectory(directory);
@@ -194,7 +196,8 @@ final class SegmentTarUtils {
}
System.out.println(" -> cleaning up");
- store = openFileStore(directory.getAbsolutePath(), false);
+ store = newFileStoreBuilder(directory.getAbsolutePath(), force)
+ .withGCOptions(DEFAULT.setOffline()).build();
try {
for (File file : store.cleanup()) {
if (!file.exists() || file.delete()) {
@@ -643,10 +646,16 @@ final class SegmentTarUtils {
.buildReadOnly();
}
- private static FileStore openFileStore(String directory, boolean force) throws IOException {
+ private static Builder newFileStoreBuilder(String directory, boolean force)
+ throws IOException {
return FileStore.builder(checkFileStoreVersionOrFail(directory, force))
.withCacheSize(TAR_SEGMENT_CACHE_SIZE)
- .withMemoryMapping(TAR_STORAGE_MEMORY_MAPPED).build();
+ .withMemoryMapping(TAR_STORAGE_MEMORY_MAPPED);
+ }
+
+ private static FileStore openFileStore(String directory, boolean force)
+ throws IOException {
+ return newFileStoreBuilder(directory, force).build();
}
private static File checkFileStoreVersionOrFail(String path, boolean force) throws IOException {
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java?rev=1746478&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java Wed Jun 1 15:20:49 2016
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.api.Type.BINARIES;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.hash.Hashing;
+
+/**
+ * Tool for compacting segments.
+ */
+public class Compactor {
+
+ /** Logger instance */
+ private static final Logger log = LoggerFactory.getLogger(Compactor.class);
+
+ private final SegmentReader reader;
+
+ private final BlobStore blobStore;
+
+ private final SegmentWriter writer;
+
+ /**
+ * Filters nodes that will be included in the compaction map, allowing for
+ * optimization in case of an offline compaction
+ */
+ private final Predicate<NodeState> includeInMap = new OfflineCompactionPredicate();
+
+ private final ProgressTracker progress = new ProgressTracker();
+
+ /**
+ * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted blob
+ * record identifiers. Used to de-duplicate copies of the same binary
+ * values.
+ */
+ private final Map<String, List<RecordId>> binaries = newHashMap();
+
+ /**
+ * Flag to use content equality verification before actually compacting the
+ * state, on the childNodeChanged diff branch (Used in Backup scenario)
+ */
+ private boolean contentEqualityCheck;
+
+ /**
+ * Allows the cancellation of the compaction process. If this
+ * {@code Supplier} returns {@code true}, this compactor will cancel
+ * compaction and return a partial {@code SegmentNodeState} containing the
+ * changes compacted before the cancellation.
+ */
+ private final Supplier<Boolean> cancel;
+
+ private static final int cacheSize;
+
+ static {
+ Integer ci = Integer.getInteger("compress-interval");
+ Integer size = Integer.getInteger("oak.segment.compaction.cacheSize");
+ if (size != null) {
+ cacheSize = size;
+ } else if (ci != null) {
+ log.warn("Deprecated argument 'compress-interval', please use 'oak.segment.compaction.cacheSize' instead.");
+ cacheSize = ci;
+ } else {
+ cacheSize = 100000;
+ }
+ }
+
+ private final RecordCache<RecordId> cache = RecordCache.<RecordId> factory(
+ cacheSize).get();
+
+ public Compactor(SegmentReader reader, SegmentWriter writer,
+ BlobStore blobStore, Supplier<Boolean> cancel) {
+ this.reader = reader;
+ this.writer = writer;
+ this.blobStore = blobStore;
+ this.cancel = cancel;
+ }
+
+ private SegmentNodeBuilder process(NodeState before, NodeState after,
+ NodeState onto) throws IOException {
+ SegmentNodeBuilder builder = new SegmentNodeBuilder(
+ writer.writeNode(onto), writer);
+ new CompactDiff(builder).diff(before, after);
+ return builder;
+ }
+
+ /**
+ * Compact the differences between a {@code before} and a {@code after} on
+ * top of an {@code onto} state.
+ *
+ * @param before
+ * the before state
+ * @param after
+ * the after state
+ * @param onto
+ * the onto state
+ * @return the compacted state
+ */
+ public SegmentNodeState compact(NodeState before, NodeState after,
+ NodeState onto) throws IOException {
+ progress.start();
+ SegmentNodeState compacted = process(before, after, onto)
+ .getNodeState();
+ writer.flush();
+ progress.stop();
+ return compacted;
+ }
+
+ private class CompactDiff extends ApplyDiff {
+ private IOException exception;
+
+ /**
+ * Current processed path, or null if the trace log is not enabled at
+ * the beginning of the compaction call. The null check will also be
+ * used to verify if a trace log will be needed or not
+ */
+ private final String path;
+
+ CompactDiff(NodeBuilder builder) {
+ super(builder);
+ if (log.isTraceEnabled()) {
+ this.path = "/";
+ } else {
+ this.path = null;
+ }
+ }
+
+ private CompactDiff(NodeBuilder builder, String path, String childName) {
+ super(builder);
+ if (path != null) {
+ this.path = concat(path, childName);
+ } else {
+ this.path = null;
+ }
+ }
+
+ boolean diff(NodeState before, NodeState after) throws IOException {
+ boolean success = after.compareAgainstBaseState(before,
+ new CancelableDiff(this, cancel));
+ if (exception != null) {
+ throw new IOException(exception);
+ }
+ return success;
+ }
+
+ @Override
+ public boolean propertyAdded(PropertyState after) {
+ if (path != null) {
+ log.trace("propertyAdded {}/{}", path, after.getName());
+ }
+ progress.onProperty();
+ return super.propertyAdded(compact(after));
+ }
+
+ @Override
+ public boolean propertyChanged(PropertyState before, PropertyState after) {
+ if (path != null) {
+ log.trace("propertyChanged {}/{}", path, after.getName());
+ }
+ progress.onProperty();
+ return super.propertyChanged(before, compact(after));
+ }
+
+ @Override
+ public boolean childNodeAdded(String name, NodeState after) {
+ if (path != null) {
+ log.trace("childNodeAdded {}/{}", path, name);
+ }
+
+ RecordId id = null;
+ if (after instanceof SegmentNodeState) {
+ id = ((SegmentNodeState) after).getRecordId();
+ RecordId compactedId = cache.get(id);
+ if (compactedId != null) {
+ builder.setChildNode(name, new SegmentNodeState(reader,
+ writer, compactedId));
+ return true;
+ }
+ }
+
+ progress.onNode();
+ try {
+ NodeBuilder child = EMPTY_NODE.builder();
+ boolean success = new CompactDiff(child, path, name).diff(
+ EMPTY_NODE, after);
+ if (success) {
+ SegmentNodeState state = writer.writeNode(child
+ .getNodeState());
+ builder.setChildNode(name, state);
+ if (id != null && includeInMap.apply(after)) {
+ cache.put(id, state.getRecordId());
+ }
+ }
+ return success;
+ } catch (IOException e) {
+ exception = e;
+ return false;
+ }
+ }
+
+ @Override
+ public boolean childNodeChanged(String name, NodeState before,
+ NodeState after) {
+ if (path != null) {
+ log.trace("childNodeChanged {}/{}", path, name);
+ }
+
+ RecordId id = null;
+ if (after instanceof SegmentNodeState) {
+ id = ((SegmentNodeState) after).getRecordId();
+ RecordId compactedId = cache.get(id);
+ if (compactedId != null) {
+ builder.setChildNode(name, new SegmentNodeState(reader,
+ writer, compactedId));
+ return true;
+ }
+ }
+
+ if (contentEqualityCheck && before.equals(after)) {
+ return true;
+ }
+
+ progress.onNode();
+ try {
+ NodeBuilder child = builder.getChildNode(name);
+ boolean success = new CompactDiff(child, path, name).diff(
+ before, after);
+ if (success) {
+ RecordId compactedId = writer.writeNode(
+ child.getNodeState()).getRecordId();
+ if (id != null) {
+ cache.put(id, compactedId);
+ }
+ }
+ return success;
+ } catch (IOException e) {
+ exception = e;
+ return false;
+ }
+ }
+ }
+
+ private PropertyState compact(PropertyState property) {
+ String name = property.getName();
+ Type<?> type = property.getType();
+ if (type == BINARY) {
+ Blob blob = compact(property.getValue(Type.BINARY));
+ return BinaryPropertyState.binaryProperty(name, blob);
+ } else if (type == BINARIES) {
+ List<Blob> blobs = new ArrayList<Blob>();
+ for (Blob blob : property.getValue(BINARIES)) {
+ blobs.add(compact(blob));
+ }
+ return MultiBinaryPropertyState.binaryPropertyFromBlob(name, blobs);
+ } else {
+ Object value = property.getValue(type);
+ return PropertyStates.createProperty(name, value, type);
+ }
+ }
+
+ /**
+ * Compacts (and de-duplicates) the given blob.
+ *
+ * @param blob
+ * blob to be compacted
+ * @return compacted blob
+ */
+ private Blob compact(Blob blob) {
+ if (blob instanceof SegmentBlob) {
+ SegmentBlob sb = (SegmentBlob) blob;
+ try {
+ // Check if we've already cloned this specific record
+ RecordId id = sb.getRecordId();
+ RecordId compactedId = cache.get(id);
+ if (compactedId != null) {
+ return new SegmentBlob(blobStore, compactedId);
+ }
+
+ progress.onBinary();
+
+ // if the blob is external, just clone it
+ if (sb.isExternal()) {
+ SegmentBlob clone = writer.writeBlob(sb);
+ cache.put(id, clone.getRecordId());
+ return clone;
+ }
+ // if the blob is inlined, just clone it
+ if (sb.length() < Segment.MEDIUM_LIMIT) {
+ SegmentBlob clone = writer.writeBlob(blob);
+ cache.put(id, clone.getRecordId());
+ return clone;
+ }
+
+ // alternatively look if the exact same binary has been cloned
+ String key = getBlobKey(blob);
+ List<RecordId> ids = binaries.get(key);
+ if (ids != null) {
+ for (RecordId duplicateId : ids) {
+ if (new SegmentBlob(blobStore, duplicateId).equals(sb)) {
+ cache.put(id, duplicateId);
+ return new SegmentBlob(blobStore, duplicateId);
+ }
+ }
+ }
+
+ // if not, clone the large blob and keep track of the result
+ sb = writer.writeBlob(blob);
+
+ cache.put(id, sb.getRecordId());
+ if (ids == null) {
+ ids = newArrayList();
+ binaries.put(key, ids);
+ }
+ ids.add(sb.getRecordId());
+
+ return sb;
+ } catch (IOException e) {
+ log.warn("Failed to compact a blob", e);
+ // fall through
+ }
+ }
+
+ // no way to compact this blob, so we'll just keep it as-is
+ return blob;
+ }
+
+ private static String getBlobKey(Blob blob) throws IOException {
+ InputStream stream = blob.getNewStream();
+ try {
+ byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE];
+ int n = IOUtils.readFully(stream, buffer, 0, buffer.length);
+ return blob.length() + ":" + Hashing.sha1().hashBytes(buffer, 0, n);
+ } finally {
+ stream.close();
+ }
+ }
+
+ private static class ProgressTracker {
+ private final long logAt = Long.getLong("compaction-progress-log",
+ 150000);
+
+ private long start = 0;
+
+ private long nodes = 0;
+ private long properties = 0;
+ private long binaries = 0;
+
+ void start() {
+ nodes = 0;
+ properties = 0;
+ binaries = 0;
+ start = System.currentTimeMillis();
+ }
+
+ void onNode() {
+ if (++nodes % logAt == 0) {
+ logProgress(start, false);
+ start = System.currentTimeMillis();
+ }
+ }
+
+ void onProperty() {
+ properties++;
+ }
+
+ void onBinary() {
+ binaries++;
+ }
+
+ void stop() {
+ logProgress(start, true);
+ }
+
+ private void logProgress(long start, boolean done) {
+ log.debug(
+ "Compacted {} nodes, {} properties, {} binaries in {} ms.",
+ nodes, properties, binaries, System.currentTimeMillis()
+ - start);
+ if (done) {
+ log.info(
+ "Finished compaction: {} nodes, {} properties, {} binaries.",
+ nodes, properties, binaries);
+ }
+ }
+ }
+
+ private static class OfflineCompactionPredicate implements
+ Predicate<NodeState> {
+
+ /**
+ * over 64K in size, node will be included in the compaction map
+ */
+ private static final long offlineThreshold = 65536;
+
+ @Override
+ public boolean apply(NodeState state) {
+ if (state.getChildNodeCount(2) > 1) {
+ return true;
+ }
+ long count = 0;
+ for (PropertyState ps : state.getProperties()) {
+ Type<?> type = ps.getType();
+ for (int i = 0; i < ps.count(); i++) {
+ long size = 0;
+ if (type == BINARY || type == BINARIES) {
+ Blob blob = ps.getValue(BINARY, i);
+ if (blob instanceof SegmentBlob) {
+ if (!((SegmentBlob) blob).isExternal()) {
+ size += blob.length();
+ }
+ } else {
+ size += blob.length();
+ }
+ } else {
+ size = ps.size(i);
+ }
+ count += size;
+ if (size >= offlineThreshold || count >= offlineThreshold) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ }
+
+ public void setContentEqualityCheck(boolean contentEqualityCheck) {
+ this.contentEqualityCheck = contentEqualityCheck;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed Jun 1 15:20:49 2016
@@ -638,6 +638,9 @@ public class SegmentWriter {
if (!isOldGeneration(segmentBlob.getRecordId())) {
return segmentBlob.getRecordId();
}
+ if (segmentBlob.isExternal()) {
+ return writeBlobId(segmentBlob.getBlobId());
+ }
}
String reference = blob.getReference();
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java Wed Jun 1 15:20:49 2016
@@ -82,6 +82,8 @@ public class SegmentGCOptions {
private int retainedGenerations = RETAINED_GENERATIONS_DEFAULT;
+ private boolean offline = false;
+
public SegmentGCOptions(boolean paused, int memoryThreshold, int gainThreshold,
int retryCount, boolean forceAfterFail, int lockWaitTime) {
this.paused = paused;
@@ -245,7 +247,8 @@ public class SegmentGCOptions {
", retryCount=" + retryCount +
", forceAfterFail=" + forceAfterFail +
", lockWaitTime=" + lockWaitTime +
- ", retainedGenerations=" + retainedGenerations + '}';
+ ", retainedGenerations=" + retainedGenerations +
+ ", offline=" + offline + "}";
}
/**
@@ -262,4 +265,18 @@ public class SegmentGCOptions {
return availableDiskSpace > 0.25 * repositoryDiskSpace;
}
+ public boolean isOffline() {
+ return offline;
+ }
+
+ /**
+ * Enables the offline compaction mode, allowing for certain optimizations,
+ * like reducing the retained generation to 1.
+ * @return this instance
+ */
+ public SegmentGCOptions setOffline() {
+ this.offline = true;
+ this.retainedGenerations = 1;
+ return this;
+ }
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed Jun 1 15:20:49 2016
@@ -83,6 +83,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentBufferWriter;
import org.apache.jackrabbit.oak.segment.SegmentCache;
import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
+import org.apache.jackrabbit.oak.segment.Compactor;
import org.apache.jackrabbit.oak.segment.SegmentId;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
@@ -1133,10 +1134,17 @@ public class FileStore implements Segmen
}
}
- private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState node,
+ private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head,
Supplier<Boolean> cancel)
throws IOException {
- return segmentWriter.writeNode(node, bufferWriter, cancel);
+ if (gcOptions.isOffline()) {
+ // Capital C to indicate offline compaction
+ SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, tracker, bufferWriter);
+ return new Compactor(segmentReader, writer, blobStore, cancel)
+ .compact(EMPTY_NODE, head, EMPTY_NODE);
+ } else {
+ return segmentWriter.writeNode(head, bufferWriter, cancel);
+ }
}
private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Wed Jun 1 15:20:49 2016
@@ -169,10 +169,97 @@ public class CompactionAndCleanupIT {
}
}
+ @Test
+ public void offlineCompaction()
+ throws IOException, CommitFailedException {
+ SegmentGCOptions gcOptions = DEFAULT.setOffline();
+ FileStore fileStore = FileStore.builder(getFileStoreFolder())
+ .withMaxFileSize(1)
+ .withGCOptions(gcOptions)
+ .build();
+ SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+
+ try {
+ // 5MB blob
+ int blobSize = 5 * 1024 * 1024;
+
+ // Create ~2MB of data
+ NodeBuilder extra = nodeStore.getRoot().builder();
+ NodeBuilder content = extra.child("content");
+ for (int i = 0; i < 10000; i++) {
+ NodeBuilder c = content.child("c" + i);
+ for (int j = 0; j < 1000; j++) {
+ c.setProperty("p" + i, "v" + i);
+ }
+ }
+ nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fileStore.flush();
+
+ long size1 = fileStore.size();
+ log.debug("File store size {}", byteCountToDisplaySize(size1));
+
+ // Create a property with 5 MB blob
+ NodeBuilder builder = nodeStore.getRoot().builder();
+ builder.setProperty("blob1", createBlob(nodeStore, blobSize));
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fileStore.flush();
+
+ long size2 = fileStore.size();
+ assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100));
+
+ // Now remove the property. No gc yet -> size doesn't shrink
+ builder = nodeStore.getRoot().builder();
+ builder.removeProperty("blob1");
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fileStore.flush();
+
+ long size3 = fileStore.size();
+ assertSize("1st blob removed", size3, size2, size2 + 4096);
+
+ // 1st gc cycle -> 1st blob should get collected
+ fileStore.compact();
+ fileStore.cleanup();
+
+ long size4 = fileStore.size();
+ assertSize("1st gc", size4, size3 - blobSize - size1, size3
+ - blobSize);
+
+ // Add another 5MB binary
+ builder = nodeStore.getRoot().builder();
+ builder.setProperty("blob2", createBlob(nodeStore, blobSize));
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fileStore.flush();
+
+ long size5 = fileStore.size();
+ assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100));
+
+ // 2st gc cycle -> 2nd blob should *not* be collected
+ fileStore.compact();
+ fileStore.cleanup();
+
+ long size6 = fileStore.size();
+ assertSize("2nd gc", size6, size5 * 10/11, size5 * 10/9);
+
+ // 3rd gc cycle -> no significant change
+ fileStore.compact();
+ fileStore.cleanup();
+
+ long size7 = fileStore.size();
+ assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9);
+
+ // No data loss
+ byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot()
+ .getProperty("blob2").getValue(Type.BINARY).getNewStream());
+ assertEquals(blobSize, blob.length);
+ } finally {
+ fileStore.close();
+ }
+ }
+
private static void assertSize(String info, long size, long lower, long upper) {
log.debug("File Store {} size {}, expected in interval [{},{}]",
info, size, lower, upper);
- assertTrue("File Store " + log + " size expected in interval " +
+ assertTrue("File Store " + info + " size expected in interval " +
"[" + (lower) + "," + (upper) + "] but was: " + (size),
size >= lower && size <= (upper));
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java Wed Jun 1 15:20:49 2016
@@ -18,8 +18,13 @@
*/
package org.apache.jackrabbit.oak.segment;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
import java.io.IOException;
+import com.google.common.base.Suppliers;
import org.apache.jackrabbit.oak.Oak;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
@@ -30,14 +35,51 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.Before;
+import org.junit.Test;
public class CompactorTest {
- private SegmentStore segmentStore;
+ private MemoryStore memoryStore;
@Before
public void openSegmentStore() throws IOException {
- segmentStore = new MemoryStore();
+ memoryStore = new MemoryStore();
+ }
+
+ @Test
+ public void testCompactor() throws Exception {
+ NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build();
+ init(store);
+
+ SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1);
+ Compactor compactor = new Compactor(memoryStore.getReader(), writer,
+ memoryStore.getBlobStore(), Suppliers.ofInstance(false));
+ addTestContent(store, 0);
+
+ NodeState initial = store.getRoot();
+ SegmentNodeState after = compactor.compact(initial, store.getRoot(),
+ initial);
+ assertEquals(store.getRoot(), after);
+
+ addTestContent(store, 1);
+ after = compactor.compact(initial, store.getRoot(), initial);
+ assertEquals(store.getRoot(), after);
+ }
+
+ @Test
+ public void testCancel() throws Throwable {
+
+ // Create a Compactor that will cancel itself as soon as possible. The
+ // early cancellation is the reason why the returned SegmentNodeState
+ // doesn't have the child named "b".
+
+ NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build();
+ SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1);
+ Compactor compactor = new Compactor(memoryStore.getReader(), writer, memoryStore.getBlobStore(),
+ Suppliers.ofInstance(true));
+ SegmentNodeState sns = compactor.compact(store.getRoot(),
+ addChild(store.getRoot(), "b"), store.getRoot());
+ assertFalse(sns.hasChildNode("b"));
}
private NodeState addChild(NodeState current, String name) {
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java Wed Jun 1 15:20:49 2016
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.segmen
import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK;
import static org.apache.jackrabbit.oak.commons.FixturesHelper.getFixtures;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -36,6 +37,7 @@ import java.util.Random;
import javax.annotation.Nonnull;
import com.google.common.collect.Lists;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.FileDataStore;
@@ -45,6 +47,7 @@ import org.apache.jackrabbit.oak.api.Typ
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.FileBlob;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -159,11 +162,13 @@ public class ExternalBlobIT {
if (store != null) {
store.close();
}
+ nodeStore = null;
}
protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws IOException {
if (nodeStore == null) {
- store = FileStore.builder(getWorkDir()).withBlobStore(blobStore).withMaxFileSize(256).withMemoryMapping(false).build();
+ store = FileStore.builder(getWorkDir()).withBlobStore(blobStore)
+ .withMaxFileSize(1).build();
nodeStore = SegmentNodeStoreBuilders.builder(store).build();
}
return nodeStore;
@@ -256,4 +261,34 @@ public class ExternalBlobIT {
assertEquals(size, ps.size());
// assertEquals("{" + size + " bytes}", ps.toString());
}
+
+ @Test
+ public void testOfflineCompaction() throws Exception {
+ FileDataStore fds = createFileDataStore();
+ DataStoreBlobStore dbs = new DataStoreBlobStore(fds);
+ nodeStore = getNodeStore(dbs);
+
+ int size = 2 * 1024 * 1024;
+ byte[] data2 = new byte[size];
+ new Random().nextBytes(data2);
+
+ Blob b = nodeStore.createBlob(new ByteArrayInputStream(data2));
+ NodeBuilder builder = nodeStore.getRoot().builder();
+ builder.child("hello").setProperty("world", b);
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ store.flush();
+
+ // blob went to the external store
+ assertTrue(store.size() < 10 * 1024);
+ close();
+
+ SegmentGCOptions gcOptions = DEFAULT.setOffline();
+ store = FileStore.builder(getWorkDir()).withMaxFileSize(1)
+ .withGCOptions(gcOptions).build();
+ assertTrue(store.size() < 10 * 1024);
+
+ store.compact();
+ store.cleanup();
+
+ }
}