You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/05/30 15:39:04 UTC

svn commit: r1598595 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java

Author: jukka
Date: Fri May 30 13:39:03 2014
New Revision: 1598595

URL: http://svn.apache.org/r1598595
Log:
OAK-1804: TarMK compaction

Add de-duplication of binaries.
Minor cleanups.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1598595&r1=1598594&r2=1598595&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Fri May 30 13:39:03 2014
@@ -16,12 +16,14 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@ import java.util.Map;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState;
 import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
 import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState;
@@ -39,7 +42,7 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.minlog.Log;
+import com.google.common.hash.Hashing;
 
 /**
  * Tool for compacting segments.
@@ -49,31 +52,16 @@ public class Compactor {
     /** Logger instance */
     private static final Logger log = LoggerFactory.getLogger(Compactor.class);
 
-    private final SegmentStore store;
+    public static void compact(SegmentStore store) {
+        SegmentWriter writer = store.getTracker().getWriter();
+        Compactor compactor = new Compactor(writer);
 
-    private final SegmentWriter writer;
-
-    /**
-     * Map from the identifiers of old records to the identifiers of their
-     * compacted copies. Used to prevent the compaction code from duplicating
-     * things like checkpoints that share most of their content with other
-     * subtrees.
-     */
-    private final Map<RecordId, RecordId> compacted = newHashMap();
-
-    public Compactor(SegmentStore store) {
-        this.store = store;
-        this.writer = store.getTracker().getWriter();
-    }
-
-    public void compact() throws IOException {
         log.debug("TarMK compaction");
 
         SegmentNodeBuilder builder = writer.writeNode(EMPTY_NODE).builder();
         SegmentNodeState before = store.getHead();
         EmptyNodeState.compareAgainstEmptyState(
-                before, new CompactDiff(builder));
-        System.out.println(compacted.size() + " nodes compacted");
+                before, compactor.newCompactDiff(builder));
 
         SegmentNodeState after = builder.getNodeState();
         while (!store.setHead(before, after)) {
@@ -81,13 +69,38 @@ public class Compactor {
             // Rebase (and compact) those changes on top of the
             // compacted state before retrying to set the head.
             SegmentNodeState head = store.getHead();
-            head.compareAgainstBaseState(before, new CompactDiff(builder));
-            System.out.println(compacted.size() + " nodes compacted");
+            head.compareAgainstBaseState(
+                    before, compactor.newCompactDiff(builder));
             before = head;
             after = builder.getNodeState();
         }
     }
 
+    private final SegmentWriter writer;
+
+    /**
+     * Map from the identifiers of old records to the identifiers of their
+     * compacted copies. Used to prevent the compaction code from duplicating
+     * things like checkpoints that share most of their content with other
+     * subtrees.
+     */
+    private final Map<RecordId, RecordId> compacted = newHashMap();
+
+    /**
+     * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted
+     * blob record identifiers. Used to de-duplicate copies of the same
+     * binary values.
+     */
+    private final Map<String, List<RecordId>> binaries = newHashMap();
+
+    private Compactor(SegmentWriter writer) {
+        this.writer = writer;
+    }
+
+    private CompactDiff newCompactDiff(NodeBuilder builder) {
+        return new CompactDiff(builder);
+    }
+
     private class CompactDiff extends ApplyDiff {
 
         CompactDiff(NodeBuilder builder) {
@@ -177,28 +190,68 @@ public class Compactor {
         }
     }
 
+    /**
+     * Compacts (and de-duplicates) the given blob.
+     *
+     * @param blob blob to be compacted
+     * @return compacted blob
+     */
     private Blob compact(Blob blob) {
+        // first check if we've already cloned this record
         if (blob instanceof SegmentBlob) {
             SegmentBlob sb = (SegmentBlob) blob;
-            RecordId id = sb.getRecordId();
+            RecordId id = compacted.get(sb.getRecordId());
+            if (id != null) {
+                return new SegmentBlob(id);
+            }
+        }
 
-            // first check if we've already cloned this blob
-            RecordId compactedId = compacted.get(id);
-            if (compactedId != null) {
-                return new SegmentBlob(compactedId);
+        try {
+            // then look if the exact same binary has been cloned
+            String key = getBlobKey(blob);
+            List<RecordId> ids = binaries.get(key);
+            if (ids != null) {
+                for (RecordId id : ids) {
+                    if (new SegmentBlob(id).equals(blob)) {
+                        return new SegmentBlob(id);
+                    }
+                }
             }
 
-            // if not, clone it and keep track of the resulting id
-            try {
+            // if not, try to clone the blob and keep track of the result
+            if (blob instanceof SegmentBlob) {
+                SegmentBlob sb = (SegmentBlob) blob;
+                RecordId id = sb.getRecordId();
+
                 sb = sb.clone(writer);
+
                 compacted.put(id, sb.getRecordId());
+                if (ids == null) {
+                    ids = newArrayList();
+                    binaries.put(key, ids);
+                }
+                ids.add(sb.getRecordId());
+
                 return sb;
-            } catch (IOException e) {
-                Log.warn("Failed to clone a binary value", e);
-                // fall through
             }
+        } catch (IOException e) {
+            log.warn("Failed to compcat a blob", e);
+            // fall through
         }
+
+        // no way to compact this blob, so we'll just keep it as-is
         return blob;
     }
 
+    private String getBlobKey(Blob blob) throws IOException {
+        InputStream stream = blob.getNewStream();
+        try {
+            byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE];
+            int n = IOUtils.readFully(stream, buffer, 0, buffer.length);
+            return blob.length() + ":" + Hashing.sha1().hashBytes(buffer, 0, n);
+        } finally {
+            stream.close();
+        }
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java?rev=1598595&r1=1598594&r2=1598595&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Main.java Fri May 30 13:39:03 2014
@@ -192,7 +192,7 @@ public class Main {
             System.out.println("    -> compacting");
             FileStore store = new FileStore(directory, 256, false);
             try {
-                new Compactor(store).compact();
+                Compactor.compact(store);
             } finally {
                 store.close();
             }