You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/06/12 20:24:02 UTC

svn commit: r1602256 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: jukka
Date: Thu Jun 12 18:24:01 2014
New Revision: 1602256

URL: http://svn.apache.org/r1602256
Log:
OAK-1804: TarMK compaction

Automatically compress the compaction map data structure once every 100k entries.
Also only keep track of compacted nodes with at least two children.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1602256&r1=1602255&r2=1602256&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java Thu Jun 12 18:24:01 2014
@@ -16,13 +16,15 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.collect.Maps.newHashMap;
 import static com.google.common.collect.Maps.newTreeMap;
+import static com.google.common.collect.Sets.newTreeSet;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
 
 /**
  * Immutable, space-optimized mapping of compacted record identifiers.
@@ -30,8 +32,12 @@ import java.util.Map.Entry;
  * without holding references to the {@link SegmentId} instances of the old,
  * compacted segments.
  * <p>
- * The data structure used by this class consists of three parts:
+ * The data structure used by this class consists of four parts:
  * <ol>
+ *   <li>The {@link #recent} map of recently compacted entries is maintained
+ *       while the compaction is in progress and new entries need to be added.
+ *       These entries are periodically compressed into the more
+ *       memory-efficient structure described below.
  *   <li>The {@link #msbs} and {@link #lsbs} arrays store the identifiers
  *       of all old, compacted segments. The identifiers are stored in
  *       increasing order, with the i'th identifier stored in the
@@ -60,85 +66,167 @@ import java.util.Map.Entry;
  */
 class CompactionMap {
 
-    private final long[] msbs;
-    private final long[] lsbs;
-    private final int[] entryIndex;
-
-    private final short[] beforeOffsets;
-    private final SegmentId[] afterSegmentIds;
-    private final short[] afterOffsets;
+    private final int compressInterval;
+    private final Map<RecordId, RecordId> recent = newHashMap();
 
-    CompactionMap() {
-        this(Collections.<RecordId, RecordId>emptyMap());
+    private long[] msbs = new long[0];
+    private long[] lsbs = new long[0];
+    private int[] entryIndex = new int[0];
+
+    private short[] beforeOffsets = new short[0];
+    private SegmentId[] afterSegmentIds = new SegmentId[0];
+    private short[] afterOffsets = new short[0];
+
+    CompactionMap(int compressInterval) {
+        this.compressInterval = compressInterval;
     }
 
     /**
-     * Creates a compaction map based from the given record identifiers.
+     * Checks whether the record with the given {@code before} identifier was
+     * compacted to a new record with the given {@code after} identifier.
+     *
+     * @param before before record identifier
+     * @param after after record identifier
+     * @return whether {@code before} was compacted to {@code after}
      */
-    CompactionMap(Map<RecordId, RecordId> compacted) {
-        Map<SegmentId, Map<Integer, RecordId>> mapping = newTreeMap();
-        for (Entry<RecordId, RecordId> entry : compacted.entrySet()) {
+    boolean wasCompactedTo(RecordId before, RecordId after) {
+        return after.equals(get(before));
+    }
+
+    public RecordId get(RecordId before) {
+        RecordId after = recent.get(before);
+        if (after != null) {
+            return after;
+        }
+
+        SegmentId segmentId = before.getSegmentId();
+        long msb = segmentId.getMostSignificantBits();
+        long lsb = segmentId.getLeastSignificantBits();
+        int offset = before.getOffset();
+
+        int entry = findEntry(msb, lsb);
+        if (entry != -1) {
+            int index = entryIndex[entry];
+            int limit = entryIndex[entry + 1];
+            for (int i = index; i < limit; i++) {
+                int o = (beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS;
+                if (o == offset) {
+                    // found it!
+                    return new RecordId(
+                            afterSegmentIds[i],
+                            (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS);
+                } else if (o > offset) {
+                    return null;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    void put(RecordId before, RecordId after) {
+        recent.put(before, after);
+        if (recent.size() >= compressInterval) {
+            compress();
+        }
+    }
+
+    void compress() {
+        Set<UUID> uuids = newTreeSet();
+
+        Map<UUID, Map<Integer, RecordId>> mapping = newTreeMap();
+        for (Entry<RecordId, RecordId> entry : recent.entrySet()) {
             RecordId before = entry.getKey();
+
             SegmentId id = before.getSegmentId();
-            Map<Integer, RecordId> map = mapping.get(id);
+            UUID uuid = new UUID(
+                    id.getMostSignificantBits(),
+                    id.getLeastSignificantBits());
+            uuids.add(uuid);
+
+            Map<Integer, RecordId> map = mapping.get(uuid);
             if (map == null) {
                 map = newTreeMap();
-                mapping.put(id, map);
+                mapping.put(uuid, map);
             }
             map.put(before.getOffset(), entry.getValue());
         }
 
-        SegmentId[] ids = mapping.keySet().toArray(new SegmentId[mapping.size()]);
-        Arrays.sort(ids);
+        for (int i = 0; i < msbs.length; i++) {
+            uuids.add(new UUID(msbs[i], lsbs[i]));
+        }
+
+        long[] newmsbs = new long[uuids.size()];
+        long[] newlsbs = new long[uuids.size()];
+        int[] newEntryIndex = new int[uuids.size() + 1];
+
+        int newEntries = beforeOffsets.length + recent.size();
+        short[] newBeforeOffsets = new short[newEntries];
+        SegmentId[] newAfterSegmentIds = new SegmentId[newEntries];
+        short[] newAfterOffsets = new short[newEntries];
+
+        int newIndex = 0;
+        int newEntry = 0;
+        int oldEntry = 0;
+        for (UUID uuid : uuids) {
+            newmsbs[newEntry] = uuid.getMostSignificantBits();
+            newlsbs[newEntry] = uuid.getLeastSignificantBits();
+
+            Map<Integer, RecordId> map = mapping.get(uuid);
+            if (map == null) {
+                map = newTreeMap();
+            }
 
-        this.msbs = new long[ids.length];
-        this.lsbs = new long[ids.length];
-        this.entryIndex = new int[ids.length + 1];
-
-        this.beforeOffsets = new short[compacted.size()];
-        this.afterSegmentIds = new SegmentId[compacted.size()];
-        this.afterOffsets = new short[compacted.size()];
-
-        int index = 0;
-        for (int i = 0; i < ids.length; i++) {
-            msbs[i] = ids[i].getMostSignificantBits();
-            lsbs[i] = ids[i].getLeastSignificantBits();
-            entryIndex[i] = index;
+            if (oldEntry < msbs.length
+                    && msbs[oldEntry] == newmsbs[newEntry]
+                    && lsbs[oldEntry] == newlsbs[newEntry]) {
+                int index = entryIndex[oldEntry];
+                int limit = entryIndex[oldEntry + 1];
+                for (int i = index; i < limit; i++) {
+                    map.put((beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS,
+                            new RecordId(
+                                    afterSegmentIds[i],
+                                    (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS));
+                }
+                oldEntry++;
+            }
 
-            Map<Integer, RecordId> map = mapping.get(ids[i]);
+            newEntryIndex[newEntry++] = newIndex;
             for (Entry<Integer, RecordId> entry : map.entrySet()) {
                 int key = entry.getKey();
                 RecordId id = entry.getValue();
-                beforeOffsets[index] = (short) (key >> RECORD_ALIGN_BITS);
-                afterSegmentIds[index] = id.getSegmentId();
-                afterOffsets[index] = (short) (id.getOffset() >> RECORD_ALIGN_BITS);
-                index++;
+                newBeforeOffsets[newIndex] = (short) (key >> RECORD_ALIGN_BITS);
+                newAfterSegmentIds[newIndex] = id.getSegmentId();
+                newAfterOffsets[newIndex] = (short) (id.getOffset() >> RECORD_ALIGN_BITS);
+                newIndex++;
             }
         }
-        entryIndex[ids.length] = index;
+
+        newEntryIndex[newEntry] = newIndex;
+
+        this.msbs = newmsbs;
+        this.lsbs = newlsbs;
+        this.entryIndex = newEntryIndex;
+
+        this.beforeOffsets = newBeforeOffsets;
+        this.afterSegmentIds = newAfterSegmentIds;
+        this.afterOffsets = newAfterOffsets;
+
+        recent.clear();
     }
 
     /**
-     * Checks whether the record with the given {@code before} identifier was
-     * compacted to a new record with the given {@code after} identifier.
+     * Finds the given segment identifier (UUID) within the list of
+     * identifiers of compacted segments tracked by this instance.
+     * Since the UUIDs are randomly generated and we keep the list
+     * sorted, we can use interpolation search to achieve
+     * {@code O(log log n)} lookup performance.
      *
-     * @param before before record identifier
-     * @param after after record identifier
-     * @return whether {@code before} was compacted to {@code after}
+     * @param msb most significant bits of the UUID
+     * @param lsb least significant bits of the UUID
+     * @return entry index, or {@code -1} if not found
      */
-    boolean wasCompactedTo(RecordId before, RecordId after) {
-        // this a copy of the TarReader#findEntry with tiny changes around the
-        // entry sizes
-
-        SegmentId segmentId = before.getSegmentId();
-        long msb = segmentId.getMostSignificantBits();
-        long lsb = segmentId.getLeastSignificantBits();
-        int offset = before.getOffset();
-
-        // The segment identifiers are randomly generated with uniform
-        // distribution, so we can use interpolation search to find the
-        // matching entry in the index. The average runtime is O(log log n).
-
+    private final int findEntry(long msb, long lsb) {
         int lowIndex = 0;
         int highIndex = msbs.length - 1;
 
@@ -153,7 +241,9 @@ class CompactionMap {
         while (lowIndex <= highIndex) {
             int guessIndex = lowIndex;
             float valueRange = highValue - lowValue;
-            if (valueRange >= 1) {
+            if (valueRange >= 1) { // no point in interpolating further
+                // Math.round() also prevents IndexOutOfBoundsExceptions
+                // caused by possible inaccuracy in the float computations.
                 guessIndex += Math.round(
                         (highIndex - lowIndex) * (targetValue - lowValue)
                         / valueRange);
@@ -176,28 +266,14 @@ class CompactionMap {
                     highIndex = guessIndex + 1;
                     highValue = m;
                 } else {
-                    // getting even closer...
-                    int index = entryIndex[guessIndex];
-                    int limit = entryIndex[guessIndex + 1];
-                    for (int i = index; i < limit; i++) {
-                        int o = (beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS;
-                        if (o < offset) {
-                            index++;
-                        } else if (o == offset) {
-                            // found it! now compare the value
-                            return afterSegmentIds[i] == after.getSegmentId()
-                                    && (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS == after.getOffset();
-                        } else {
-                            return false;
-                        }
-                    }
+                    // found it!
+                    return guessIndex;
                 }
             }
         }
 
         // not found
-        return false;
+        return -1;
     }
 
-
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1602256&r1=1602255&r2=1602256&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Thu Jun 12 18:24:01 2014
@@ -75,7 +75,8 @@ public class Compactor {
             after = builder.getNodeState();
         }
 
-        return new CompactionMap(compactor.compacted);
+        compactor.map.compress();
+        return compactor.map;
     }
 
     /**
@@ -88,13 +89,7 @@ public class Compactor {
 
     private final SegmentWriter writer;
 
-    /**
-     * Map from the identifiers of old records to the identifiers of their
-     * compacted copies. Used to prevent the compaction code from duplicating
-     * things like checkpoints that share most of their content with other
-     * subtrees.
-     */
-    private final Map<RecordId, RecordId> compacted = newHashMap();
+    private CompactionMap map = new CompactionMap(100000);
 
     /**
      * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted
@@ -134,7 +129,7 @@ public class Compactor {
             RecordId id = null;
             if (after instanceof SegmentNodeState) {
                 id = ((SegmentNodeState) after).getRecordId();
-                RecordId compactedId = compacted.get(id);
+                RecordId compactedId = map.get(id);
                 if (compactedId != null) {
                     builder.setChildNode(name, new SegmentNodeState(compactedId));
                     return true;
@@ -145,10 +140,10 @@ public class Compactor {
             boolean success = EmptyNodeState.compareAgainstEmptyState(
                     after, new CompactDiff(child));
 
-            if (success && id != null && child.getChildNodeCount(1) > 0) {
+            if (success && id != null && child.getChildNodeCount(2) > 1) {
                 RecordId compactedId =
                         writer.writeNode(child.getNodeState()).getRecordId();
-                compacted.put(id, compactedId);
+                map.put(id, compactedId);
             }
 
             return success;
@@ -160,7 +155,7 @@ public class Compactor {
             RecordId id = null;
             if (after instanceof SegmentNodeState) {
                 id = ((SegmentNodeState) after).getRecordId();
-                RecordId compactedId = compacted.get(id);
+                RecordId compactedId = map.get(id);
                 if (compactedId != null) {
                     builder.setChildNode(name, new SegmentNodeState(compactedId));
                     return true;
@@ -171,10 +166,10 @@ public class Compactor {
             boolean success = after.compareAgainstBaseState(
                     before, new CompactDiff(child));
 
-            if (success && id != null && child.getChildNodeCount(1) > 0) {
+            if (success && id != null && child.getChildNodeCount(2) > 1) {
                 RecordId compactedId =
                         writer.writeNode(child.getNodeState()).getRecordId();
-                compacted.put(id, compactedId);
+                map.put(id, compactedId);
             }
 
             return success;
@@ -218,7 +213,7 @@ public class Compactor {
 
                 // else check if we've already cloned this specific record
                 RecordId id = sb.getRecordId();
-                RecordId compactedId = compacted.get(id);
+                RecordId compactedId = map.get(id);
                 if (compactedId != null) {
                     return new SegmentBlob(compactedId);
                 }
@@ -236,7 +231,7 @@ public class Compactor {
 
                 // if not, clone the blob and keep track of the result
                 sb = sb.clone(writer);
-                compacted.put(id, sb.getRecordId());
+                map.put(id, sb.getRecordId());
                 if (ids == null) {
                     ids = newArrayList();
                     binaries.put(key, ids);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1602256&r1=1602255&r2=1602256&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java Thu Jun 12 18:24:01 2014
@@ -72,7 +72,7 @@ public class SegmentTracker {
      * after compaction.
      */
     private final AtomicReference<CompactionMap> compactionMap =
-            new AtomicReference<CompactionMap>(new CompactionMap());
+            new AtomicReference<CompactionMap>(new CompactionMap(1));
 
     private final long cacheSize;
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java?rev=1602256&r1=1602255&r2=1602256&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java Thu Jun 12 18:24:01 2014
@@ -16,12 +16,12 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.collect.Maps.newHashMap;
 import static junit.framework.Assert.assertTrue;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.MAX_SEGMENT_SIZE;
 import static org.junit.Assert.assertFalse;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -31,32 +31,65 @@ import org.junit.Test;
 
 public class CompactionMapTest {
 
+    public static void main(String[] args) {
+        // check the memory use of really large mappings, 1M compacted
+        // segments with 10 records each.
+        Runtime runtime = Runtime.getRuntime();
+
+        System.gc();
+        System.out.println((runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024));
+
+        CompactionMap map = new CompactionMap(100000);
+        SegmentTracker factory = new MemoryStore().getTracker();
+        for (int i = 0; i < 1000000; i++) {
+            if (i % 1000 == 0) {
+                System.gc();
+                System.out.println(i + ": " + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + "MB");
+            }
+            SegmentId sid = factory.newDataSegmentId();
+            for (int j = 0; j < 10; j++) {
+                RecordId rid = new RecordId(sid, j << RECORD_ALIGN_BITS);
+                map.put(rid, rid);
+            }
+        }
+
+        System.gc();
+        System.out.println("final: " + (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024) + "MB");
+    }
+
     @Test
     public void testCompactionMap() {
         int maxSegments = 1000;
         int maxEntriesPerSegment = 10;
         int seed = new Random().nextInt();
+        Random r = new Random(seed);
 
         SegmentTracker factory = new MemoryStore().getTracker();
-        Map<RecordId, RecordId> map = new HashMap<RecordId, RecordId>();
+        CompactionMap map = new CompactionMap(r.nextInt(maxSegments / 2));
+        Map<RecordId, RecordId> entries = newHashMap();
 
-        Random r = new Random(seed);
         int segments = r.nextInt(maxSegments);
         for (int i = 0; i < segments; i++) {
             SegmentId id = factory.newDataSegmentId();
-            int entries = r.nextInt(maxEntriesPerSegment);
-            for (int j = 0; j < entries; j++) {
-                map.put(new RecordId(id, newValidOffset(r)),
-                        new RecordId(factory.newDataSegmentId(), newValidOffset(r)));
+            int n = r.nextInt(maxEntriesPerSegment);
+            for (int j = 0; j < n; j++) {
+                RecordId before = new RecordId(id, newValidOffset(r));
+                RecordId after = new RecordId(factory.newDataSegmentId(), newValidOffset(r));
+                entries.put(before, after);
+                map.put(before, after);
+                assertTrue("Failed with seed " + seed,
+                        map.wasCompactedTo(before, after));
+                assertFalse("Failed with seed " + seed,
+                        map.wasCompactedTo(after, before));
             }
         }
+        map.compress();
 
-        CompactionMap compaction = new CompactionMap(map);
-        for (Entry<RecordId, RecordId> e : map.entrySet()) {
+        for (Entry<RecordId, RecordId> entry : entries.entrySet()) {
             assertTrue("Failed with seed " + seed,
-                    compaction.wasCompactedTo(e.getKey(), e.getValue()));
+                    map.wasCompactedTo(entry.getKey(), entry.getValue()));
             assertFalse("Failed with seed " + seed,
-                    compaction.wasCompactedTo(e.getValue(), e.getKey()));
+                    map.wasCompactedTo(entry.getValue(), entry.getKey()));
         }
     }