You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2016/06/01 15:20:49 UTC

svn commit: r1746478 - in /jackrabbit/oak/trunk: oak-run/src/main/java/org/apache/jackrabbit/oak/run/ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/ oak-segme...

Author: alexparvulescu
Date: Wed Jun  1 15:20:49 2016
New Revision: 1746478

URL: http://svn.apache.org/viewvc?rev=1746478&view=rev
Log:
OAK-4279 Rework offline compaction


Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentTarUtils.java Wed Jun  1 15:20:49 2016
@@ -32,6 +32,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.segment.SegmentGraph.writeSegmentGraph;
 import static org.apache.jackrabbit.oak.segment.SegmentNodeStateHelper.getTemplateId;
 import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.DEFAULT;
 import static org.apache.jackrabbit.oak.segment.file.tooling.ConsistencyChecker.checkConsistency;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -86,6 +87,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentVersion;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStore.Builder;
 import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore;
 import org.apache.jackrabbit.oak.segment.file.JournalReader;
 import org.apache.jackrabbit.oak.segment.file.tooling.RevisionHistory;
@@ -178,9 +180,9 @@ final class SegmentTarUtils {
     }
 
     static void compact(File directory, boolean force) throws IOException {
-        FileStore store = openFileStore(directory.getAbsolutePath(), force);
+        FileStore store = newFileStoreBuilder(directory.getAbsolutePath(),
+                force).withGCOptions(DEFAULT.setOffline()).build();
         try {
-            boolean persistCM = Boolean.getBoolean("tar.PersistCompactionMap");
             System.out.println("Compacting " + directory);
             System.out.println("    before " + Arrays.toString(directory.list()));
             long sizeBefore = FileUtils.sizeOfDirectory(directory);
@@ -194,7 +196,8 @@ final class SegmentTarUtils {
         }
 
         System.out.println("    -> cleaning up");
-        store = openFileStore(directory.getAbsolutePath(), false);
+        store = newFileStoreBuilder(directory.getAbsolutePath(), force)
+                .withGCOptions(DEFAULT.setOffline()).build();
         try {
             for (File file : store.cleanup()) {
                 if (!file.exists() || file.delete()) {
@@ -643,10 +646,16 @@ final class SegmentTarUtils {
                 .buildReadOnly();
     }
 
-    private static FileStore openFileStore(String directory, boolean force) throws IOException {
+    private static Builder newFileStoreBuilder(String directory, boolean force)
+            throws IOException {
         return FileStore.builder(checkFileStoreVersionOrFail(directory, force))
                 .withCacheSize(TAR_SEGMENT_CACHE_SIZE)
-                .withMemoryMapping(TAR_STORAGE_MEMORY_MAPPED).build();
+                .withMemoryMapping(TAR_STORAGE_MEMORY_MAPPED);
+    }
+
+    private static FileStore openFileStore(String directory, boolean force)
+            throws IOException {
+        return newFileStoreBuilder(directory, force).build();
     }
 
     private static File checkFileStoreVersionOrFail(String path, boolean force) throws IOException {

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java?rev=1746478&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java Wed Jun  1 15:20:49 2016
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+import static org.apache.jackrabbit.oak.api.Type.BINARIES;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.hash.Hashing;
+
+/**
+ * Tool for compacting segments.
+ */
+public class Compactor {
+
+    /** Logger instance */
+    private static final Logger log = LoggerFactory.getLogger(Compactor.class);
+
+    private final SegmentReader reader;
+
+    private final BlobStore blobStore;
+
+    private final SegmentWriter writer;
+
+    /**
+     * Filters nodes that will be included in the compaction map, allowing for
+     * optimization in case of an offline compaction
+     */
+    private final Predicate<NodeState> includeInMap = new OfflineCompactionPredicate();
+
+    private final ProgressTracker progress = new ProgressTracker();
+
+    /**
+     * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted blob
+     * record identifiers. Used to de-duplicate copies of the same binary
+     * values.
+     */
+    private final Map<String, List<RecordId>> binaries = newHashMap();
+
+    /**
+     * Flag to use content equality verification before actually compacting the
+     * state, on the childNodeChanged diff branch (Used in Backup scenario)
+     */
+    private boolean contentEqualityCheck;
+
+    /**
+     * Allows the cancellation of the compaction process. If this
+     * {@code Supplier} returns {@code true}, this compactor will cancel
+     * compaction and return a partial {@code SegmentNodeState} containing the
+     * changes compacted before the cancellation.
+     */
+    private final Supplier<Boolean> cancel;
+
+    private static final int cacheSize;
+
+    static {
+        Integer ci = Integer.getInteger("compress-interval");
+        Integer size = Integer.getInteger("oak.segment.compaction.cacheSize");
+        if (size != null) {
+            cacheSize = size;
+        } else if (ci != null) {
+            log.warn("Deprecated argument 'compress-interval', please use 'oak.segment.compaction.cacheSize' instead.");
+            cacheSize = ci;
+        } else {
+            cacheSize = 100000;
+        }
+    }
+
+    private final RecordCache<RecordId> cache = RecordCache.<RecordId> factory(
+            cacheSize).get();
+
+    public Compactor(SegmentReader reader, SegmentWriter writer,
+            BlobStore blobStore, Supplier<Boolean> cancel) {
+        this.reader = reader;
+        this.writer = writer;
+        this.blobStore = blobStore;
+        this.cancel = cancel;
+    }
+
+    private SegmentNodeBuilder process(NodeState before, NodeState after,
+            NodeState onto) throws IOException {
+        SegmentNodeBuilder builder = new SegmentNodeBuilder(
+                writer.writeNode(onto), writer);
+        new CompactDiff(builder).diff(before, after);
+        return builder;
+    }
+
+    /**
+     * Compact the differences between a {@code before} and a {@code after} on
+     * top of an {@code onto} state.
+     * 
+     * @param before
+     *            the before state
+     * @param after
+     *            the after state
+     * @param onto
+     *            the onto state
+     * @return the compacted state
+     */
+    public SegmentNodeState compact(NodeState before, NodeState after,
+            NodeState onto) throws IOException {
+        progress.start();
+        SegmentNodeState compacted = process(before, after, onto)
+                .getNodeState();
+        writer.flush();
+        progress.stop();
+        return compacted;
+    }
+
+    private class CompactDiff extends ApplyDiff {
+        private IOException exception;
+
+        /**
+         * Current processed path, or null if the trace log is not enabled at
+         * the beginning of the compaction call. The null check will also be
+         * used to verify if a trace log will be needed or not
+         */
+        private final String path;
+
+        CompactDiff(NodeBuilder builder) {
+            super(builder);
+            if (log.isTraceEnabled()) {
+                this.path = "/";
+            } else {
+                this.path = null;
+            }
+        }
+
+        private CompactDiff(NodeBuilder builder, String path, String childName) {
+            super(builder);
+            if (path != null) {
+                this.path = concat(path, childName);
+            } else {
+                this.path = null;
+            }
+        }
+
+        boolean diff(NodeState before, NodeState after) throws IOException {
+            boolean success = after.compareAgainstBaseState(before,
+                    new CancelableDiff(this, cancel));
+            if (exception != null) {
+                throw new IOException(exception);
+            }
+            return success;
+        }
+
+        @Override
+        public boolean propertyAdded(PropertyState after) {
+            if (path != null) {
+                log.trace("propertyAdded {}/{}", path, after.getName());
+            }
+            progress.onProperty();
+            return super.propertyAdded(compact(after));
+        }
+
+        @Override
+        public boolean propertyChanged(PropertyState before, PropertyState after) {
+            if (path != null) {
+                log.trace("propertyChanged {}/{}", path, after.getName());
+            }
+            progress.onProperty();
+            return super.propertyChanged(before, compact(after));
+        }
+
+        @Override
+        public boolean childNodeAdded(String name, NodeState after) {
+            if (path != null) {
+                log.trace("childNodeAdded {}/{}", path, name);
+            }
+
+            RecordId id = null;
+            if (after instanceof SegmentNodeState) {
+                id = ((SegmentNodeState) after).getRecordId();
+                RecordId compactedId = cache.get(id);
+                if (compactedId != null) {
+                    builder.setChildNode(name, new SegmentNodeState(reader,
+                            writer, compactedId));
+                    return true;
+                }
+            }
+
+            progress.onNode();
+            try {
+                NodeBuilder child = EMPTY_NODE.builder();
+                boolean success = new CompactDiff(child, path, name).diff(
+                        EMPTY_NODE, after);
+                if (success) {
+                    SegmentNodeState state = writer.writeNode(child
+                            .getNodeState());
+                    builder.setChildNode(name, state);
+                    if (id != null && includeInMap.apply(after)) {
+                        cache.put(id, state.getRecordId());
+                    }
+                }
+                return success;
+            } catch (IOException e) {
+                exception = e;
+                return false;
+            }
+        }
+
+        @Override
+        public boolean childNodeChanged(String name, NodeState before,
+                NodeState after) {
+            if (path != null) {
+                log.trace("childNodeChanged {}/{}", path, name);
+            }
+
+            RecordId id = null;
+            if (after instanceof SegmentNodeState) {
+                id = ((SegmentNodeState) after).getRecordId();
+                RecordId compactedId = cache.get(id);
+                if (compactedId != null) {
+                    builder.setChildNode(name, new SegmentNodeState(reader,
+                            writer, compactedId));
+                    return true;
+                }
+            }
+
+            if (contentEqualityCheck && before.equals(after)) {
+                return true;
+            }
+
+            progress.onNode();
+            try {
+                NodeBuilder child = builder.getChildNode(name);
+                boolean success = new CompactDiff(child, path, name).diff(
+                        before, after);
+                if (success) {
+                    RecordId compactedId = writer.writeNode(
+                            child.getNodeState()).getRecordId();
+                    if (id != null) {
+                        cache.put(id, compactedId);
+                    }
+                }
+                return success;
+            } catch (IOException e) {
+                exception = e;
+                return false;
+            }
+        }
+    }
+
+    private PropertyState compact(PropertyState property) {
+        String name = property.getName();
+        Type<?> type = property.getType();
+        if (type == BINARY) {
+            Blob blob = compact(property.getValue(Type.BINARY));
+            return BinaryPropertyState.binaryProperty(name, blob);
+        } else if (type == BINARIES) {
+            List<Blob> blobs = new ArrayList<Blob>();
+            for (Blob blob : property.getValue(BINARIES)) {
+                blobs.add(compact(blob));
+            }
+            return MultiBinaryPropertyState.binaryPropertyFromBlob(name, blobs);
+        } else {
+            Object value = property.getValue(type);
+            return PropertyStates.createProperty(name, value, type);
+        }
+    }
+
+    /**
+     * Compacts (and de-duplicates) the given blob.
+     * 
+     * @param blob
+     *            blob to be compacted
+     * @return compacted blob
+     */
+    private Blob compact(Blob blob) {
+        if (blob instanceof SegmentBlob) {
+            SegmentBlob sb = (SegmentBlob) blob;
+            try {
+                // Check if we've already cloned this specific record
+                RecordId id = sb.getRecordId();
+                RecordId compactedId = cache.get(id);
+                if (compactedId != null) {
+                    return new SegmentBlob(blobStore, compactedId);
+                }
+
+                progress.onBinary();
+
+                // if the blob is external, just clone it
+                if (sb.isExternal()) {
+                    SegmentBlob clone = writer.writeBlob(sb);
+                    cache.put(id, clone.getRecordId());
+                    return clone;
+                }
+                // if the blob is inlined, just clone it
+                if (sb.length() < Segment.MEDIUM_LIMIT) {
+                    SegmentBlob clone = writer.writeBlob(blob);
+                    cache.put(id, clone.getRecordId());
+                    return clone;
+                }
+
+                // alternatively look if the exact same binary has been cloned
+                String key = getBlobKey(blob);
+                List<RecordId> ids = binaries.get(key);
+                if (ids != null) {
+                    for (RecordId duplicateId : ids) {
+                        if (new SegmentBlob(blobStore, duplicateId).equals(sb)) {
+                            cache.put(id, duplicateId);
+                            return new SegmentBlob(blobStore, duplicateId);
+                        }
+                    }
+                }
+
+                // if not, clone the large blob and keep track of the result
+                sb = writer.writeBlob(blob);
+
+                cache.put(id, sb.getRecordId());
+                if (ids == null) {
+                    ids = newArrayList();
+                    binaries.put(key, ids);
+                }
+                ids.add(sb.getRecordId());
+
+                return sb;
+            } catch (IOException e) {
+                log.warn("Failed to compact a blob", e);
+                // fall through
+            }
+        }
+
+        // no way to compact this blob, so we'll just keep it as-is
+        return blob;
+    }
+
+    private static String getBlobKey(Blob blob) throws IOException {
+        InputStream stream = blob.getNewStream();
+        try {
+            byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE];
+            int n = IOUtils.readFully(stream, buffer, 0, buffer.length);
+            return blob.length() + ":" + Hashing.sha1().hashBytes(buffer, 0, n);
+        } finally {
+            stream.close();
+        }
+    }
+
+    private static class ProgressTracker {
+        private final long logAt = Long.getLong("compaction-progress-log",
+                150000);
+
+        private long start = 0;
+
+        private long nodes = 0;
+        private long properties = 0;
+        private long binaries = 0;
+
+        void start() {
+            nodes = 0;
+            properties = 0;
+            binaries = 0;
+            start = System.currentTimeMillis();
+        }
+
+        void onNode() {
+            if (++nodes % logAt == 0) {
+                logProgress(start, false);
+                start = System.currentTimeMillis();
+            }
+        }
+
+        void onProperty() {
+            properties++;
+        }
+
+        void onBinary() {
+            binaries++;
+        }
+
+        void stop() {
+            logProgress(start, true);
+        }
+
+        private void logProgress(long start, boolean done) {
+            log.debug(
+                    "Compacted {} nodes, {} properties, {} binaries in {} ms.",
+                    nodes, properties, binaries, System.currentTimeMillis()
+                            - start);
+            if (done) {
+                log.info(
+                        "Finished compaction: {} nodes, {} properties, {} binaries.",
+                        nodes, properties, binaries);
+            }
+        }
+    }
+
+    private static class OfflineCompactionPredicate implements
+            Predicate<NodeState> {
+
+        /**
+         * over 64K in size, node will be included in the compaction map
+         */
+        private static final long offlineThreshold = 65536;
+
+        @Override
+        public boolean apply(NodeState state) {
+            if (state.getChildNodeCount(2) > 1) {
+                return true;
+            }
+            long count = 0;
+            for (PropertyState ps : state.getProperties()) {
+                Type<?> type = ps.getType();
+                for (int i = 0; i < ps.count(); i++) {
+                    long size = 0;
+                    if (type == BINARY || type == BINARIES) {
+                        Blob blob = ps.getValue(BINARY, i);
+                        if (blob instanceof SegmentBlob) {
+                            if (!((SegmentBlob) blob).isExternal()) {
+                                size += blob.length();
+                            }
+                        } else {
+                            size += blob.length();
+                        }
+                    } else {
+                        size = ps.size(i);
+                    }
+                    count += size;
+                    if (size >= offlineThreshold || count >= offlineThreshold) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+    }
+
+    public void setContentEqualityCheck(boolean contentEqualityCheck) {
+        this.contentEqualityCheck = contentEqualityCheck;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Compactor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed Jun  1 15:20:49 2016
@@ -638,6 +638,9 @@ public class SegmentWriter {
                 if (!isOldGeneration(segmentBlob.getRecordId())) {
                     return segmentBlob.getRecordId();
                 }
+                if (segmentBlob.isExternal()) {
+                    return writeBlobId(segmentBlob.getBlobId());
+                }
             }
 
             String reference = blob.getReference();

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java Wed Jun  1 15:20:49 2016
@@ -82,6 +82,8 @@ public class SegmentGCOptions {
 
     private int retainedGenerations = RETAINED_GENERATIONS_DEFAULT;
 
+    private boolean offline = false;
+
     public SegmentGCOptions(boolean paused, int memoryThreshold, int gainThreshold,
                             int retryCount, boolean forceAfterFail, int lockWaitTime) {
         this.paused = paused;
@@ -245,7 +247,8 @@ public class SegmentGCOptions {
                 ", retryCount=" + retryCount +
                 ", forceAfterFail=" + forceAfterFail +
                 ", lockWaitTime=" + lockWaitTime +
-                ", retainedGenerations=" + retainedGenerations + '}';
+                ", retainedGenerations=" + retainedGenerations +
+                ", offline=" + offline + "}";
     }
 
     /**
@@ -262,4 +265,18 @@ public class SegmentGCOptions {
         return availableDiskSpace > 0.25 * repositoryDiskSpace;
     }
 
+    public boolean isOffline() {
+        return offline;
+    }
+
+    /**
+     * Enables the offline compaction mode, allowing for certain optimizations,
+     * like reducing the retained generation to 1.
+     * @return this instance
+     */
+    public SegmentGCOptions setOffline() {
+        this.offline = true;
+        this.retainedGenerations = 1;
+        return this;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed Jun  1 15:20:49 2016
@@ -83,6 +83,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentBufferWriter;
 import org.apache.jackrabbit.oak.segment.SegmentCache;
 import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
+import org.apache.jackrabbit.oak.segment.Compactor;
 import org.apache.jackrabbit.oak.segment.SegmentId;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
@@ -1133,10 +1134,17 @@ public class FileStore implements Segmen
         }
     }
 
-    private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState node,
+    private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head,
                                      Supplier<Boolean> cancel)
     throws IOException {
-        return segmentWriter.writeNode(node, bufferWriter, cancel);
+        if (gcOptions.isOffline()) {
+            // Capital C to indicate offline compaction
+            SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, tracker, bufferWriter);
+            return new Compactor(segmentReader, writer, blobStore, cancel)
+                    .compact(EMPTY_NODE, head, EMPTY_NODE);
+        } else {
+            return segmentWriter.writeNode(head, bufferWriter, cancel);
+        }
     }
 
     private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Wed Jun  1 15:20:49 2016
@@ -169,10 +169,97 @@ public class CompactionAndCleanupIT {
         }
     }
 
+    @Test
+    public void offlineCompaction()
+    throws IOException, CommitFailedException {
+        SegmentGCOptions gcOptions = DEFAULT.setOffline();
+        FileStore fileStore = FileStore.builder(getFileStoreFolder())
+                .withMaxFileSize(1)
+                .withGCOptions(gcOptions)
+                .build();
+        SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+
+        try {
+            // 5MB blob
+            int blobSize = 5 * 1024 * 1024;
+
+            // Create ~2MB of data
+            NodeBuilder extra = nodeStore.getRoot().builder();
+            NodeBuilder content = extra.child("content");
+            for (int i = 0; i < 10000; i++) {
+                NodeBuilder c = content.child("c" + i);
+                for (int j = 0; j < 1000; j++) {
+                    c.setProperty("p" + i, "v" + i);
+                }
+            }
+            nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            fileStore.flush();
+
+            long size1 = fileStore.size();
+            log.debug("File store size {}", byteCountToDisplaySize(size1));
+
+            // Create a property with 5 MB blob
+            NodeBuilder builder = nodeStore.getRoot().builder();
+            builder.setProperty("blob1", createBlob(nodeStore, blobSize));
+            nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            fileStore.flush();
+
+            long size2 = fileStore.size();
+            assertSize("1st blob added", size2, size1 + blobSize, size1 + blobSize + (blobSize / 100));
+
+            // Now remove the property. No gc yet -> size doesn't shrink
+            builder = nodeStore.getRoot().builder();
+            builder.removeProperty("blob1");
+            nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            fileStore.flush();
+
+            long size3 = fileStore.size();
+            assertSize("1st blob removed", size3, size2, size2 + 4096);
+
+            // 1st gc cycle -> 1st blob should get collected
+            fileStore.compact();
+            fileStore.cleanup();
+
+            long size4 = fileStore.size();
+            assertSize("1st gc", size4, size3 - blobSize - size1, size3
+                    - blobSize);
+
+            // Add another 5MB binary
+            builder = nodeStore.getRoot().builder();
+            builder.setProperty("blob2", createBlob(nodeStore, blobSize));
+            nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            fileStore.flush();
+
+            long size5 = fileStore.size();
+            assertSize("2nd blob added", size5, size4 + blobSize, size4 + blobSize + (blobSize / 100));
+
+            // 2st gc cycle -> 2nd blob should *not* be collected
+            fileStore.compact();
+            fileStore.cleanup();
+
+            long size6 = fileStore.size();
+            assertSize("2nd gc", size6, size5 * 10/11, size5 * 10/9);
+
+            // 3rd gc cycle -> no significant change
+            fileStore.compact();
+            fileStore.cleanup();
+
+            long size7 = fileStore.size();
+            assertSize("3rd gc", size7, size6 * 10/11 , size6 * 10/9);
+
+            // No data loss
+            byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot()
+                    .getProperty("blob2").getValue(Type.BINARY).getNewStream());
+            assertEquals(blobSize, blob.length);
+        } finally {
+            fileStore.close();
+        }
+    }
+
     private static void assertSize(String info, long size, long lower, long upper) {
         log.debug("File Store {} size {}, expected in interval [{},{}]",
                 info, size, lower, upper);
-        assertTrue("File Store " + log + " size expected in interval " +
+        assertTrue("File Store " + info + " size expected in interval " +
                 "[" + (lower) + "," + (upper) + "] but was: " + (size),
                 size >= lower && size <= (upper));
     }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTest.java Wed Jun  1 15:20:49 2016
@@ -18,8 +18,13 @@
  */
 package org.apache.jackrabbit.oak.segment;
 
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 import java.io.IOException;
 
+import com.google.common.base.Suppliers;
 import org.apache.jackrabbit.oak.Oak;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
@@ -30,14 +35,51 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.junit.Before;
+import org.junit.Test;
 
 public class CompactorTest {
 
-    private SegmentStore segmentStore;
+    private MemoryStore memoryStore;
 
     @Before
     public void openSegmentStore() throws IOException {
-        segmentStore = new MemoryStore();
+        memoryStore = new MemoryStore();
+    }
+
+    @Test
+    public void testCompactor() throws Exception {
+        NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build();
+        init(store);
+
+        SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1);
+        Compactor compactor = new Compactor(memoryStore.getReader(), writer,
+                memoryStore.getBlobStore(), Suppliers.ofInstance(false));
+        addTestContent(store, 0);
+
+        NodeState initial = store.getRoot();
+        SegmentNodeState after = compactor.compact(initial, store.getRoot(),
+                initial);
+        assertEquals(store.getRoot(), after);
+
+        addTestContent(store, 1);
+        after = compactor.compact(initial, store.getRoot(), initial);
+        assertEquals(store.getRoot(), after);
+    }
+
+    @Test
+    public void testCancel() throws Throwable {
+
+        // Create a Compactor that will cancel itself as soon as possible. The
+        // early cancellation is the reason why the returned SegmentNodeState
+        // doesn't have the child named "b".
+
+        NodeStore store = SegmentNodeStoreBuilders.builder(memoryStore).build();
+        SegmentWriter writer = SegmentWriters.segmentWriter(memoryStore, LATEST_VERSION, "c", 1);
+        Compactor compactor = new Compactor(memoryStore.getReader(), writer, memoryStore.getBlobStore(),
+                Suppliers.ofInstance(true));
+        SegmentNodeState sns = compactor.compact(store.getRoot(),
+                addChild(store.getRoot(), "b"), store.getRoot());
+        assertFalse(sns.hasChildNode("b"));
     }
 
     private NodeState addChild(NodeState current, String name) {

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java?rev=1746478&r1=1746477&r2=1746478&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ExternalBlobIT.java Wed Jun  1 15:20:49 2016
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.segmen
 
 import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK;
 import static org.apache.jackrabbit.oak.commons.FixturesHelper.getFixtures;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.DEFAULT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -36,6 +37,7 @@ import java.util.Random;
 import javax.annotation.Nonnull;
 
 import com.google.common.collect.Lists;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.core.data.FileDataStore;
@@ -45,6 +47,7 @@ import org.apache.jackrabbit.oak.api.Typ
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.FileBlob;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -159,11 +162,13 @@ public class ExternalBlobIT {
         if (store != null) {
             store.close();
         }
+        nodeStore = null;
     }
 
     protected SegmentNodeStore getNodeStore(BlobStore blobStore) throws IOException {
         if (nodeStore == null) {
-            store = FileStore.builder(getWorkDir()).withBlobStore(blobStore).withMaxFileSize(256).withMemoryMapping(false).build();
+            store = FileStore.builder(getWorkDir()).withBlobStore(blobStore)
+                    .withMaxFileSize(1).build();
             nodeStore = SegmentNodeStoreBuilders.builder(store).build();
         }
         return nodeStore;
@@ -256,4 +261,34 @@ public class ExternalBlobIT {
         assertEquals(size, ps.size());
         // assertEquals("{" + size + " bytes}", ps.toString());
     }
+
+    @Test
+    public void testOfflineCompaction() throws Exception {
+        FileDataStore fds = createFileDataStore();
+        DataStoreBlobStore dbs = new DataStoreBlobStore(fds);
+        nodeStore = getNodeStore(dbs);
+
+        int size = 2 * 1024 * 1024;
+        byte[] data2 = new byte[size];
+        new Random().nextBytes(data2);
+
+        Blob b = nodeStore.createBlob(new ByteArrayInputStream(data2));
+        NodeBuilder builder = nodeStore.getRoot().builder();
+        builder.child("hello").setProperty("world", b);
+        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        store.flush();
+
+        // blob went to the external store
+        assertTrue(store.size() < 10 * 1024);
+        close();
+
+        SegmentGCOptions gcOptions = DEFAULT.setOffline();
+        store = FileStore.builder(getWorkDir()).withMaxFileSize(1)
+                .withGCOptions(gcOptions).build();
+        assertTrue(store.size() < 10 * 1024);
+
+        store.compact();
+        store.cleanup();
+
+    }
 }