You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/12/15 13:36:36 UTC

svn commit: r1645637 [1/2] - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/ main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java...

Author: mduerig
Date: Mon Dec 15 12:36:36 2014
New Revision: 1645637

URL: http://svn.apache.org/r1645637
Log:
OAK-2192: Concurrent commit during compaction results in mixed segments
- patch v10 (see OAK-2192)
- Avoid deadlock in SegmentNodeStoreService
- Use correct constant values for the compaction strategy in SegmentNodeStoreService

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java
      - copied, changed from r1645616, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/HeavyWriteIT.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java
Removed:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java Mon Dec 15 12:36:36 2014
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.collect.Maps.newHashMap;
 import static com.google.common.collect.Maps.newTreeMap;
 import static com.google.common.collect.Sets.newTreeSet;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS;
 
 import java.util.Map;
@@ -67,18 +68,27 @@ import java.util.UUID;
 public class CompactionMap {
 
     private final int compressInterval;
-    private final Map<RecordId, RecordId> recent = newHashMap();
+    private final SegmentTracker tracker;
+
+    private Map<RecordId, RecordId> recent = newHashMap();
 
     private long[] msbs = new long[0];
     private long[] lsbs = new long[0];
-    private int[] entryIndex = new int[0];
-
     private short[] beforeOffsets = new short[0];
-    private SegmentId[] afterSegmentIds = new SegmentId[0];
+
+    private int[] entryIndex = new int[0];
     private short[] afterOffsets = new short[0];
 
-    CompactionMap(int compressInterval) {
+    private int[] afterSegmentIds = new int[0];
+    private long[] amsbs = new long[0];
+    private long[] alsbs = new long[0];
+
+    private long prevWeight;
+    private CompactionMap prev;
+
+    CompactionMap(int compressInterval, SegmentTracker tracker) {
         this.compressInterval = compressInterval;
+        this.tracker = tracker;
     }
 
     /**
@@ -90,7 +100,37 @@ public class CompactionMap {
      * @return whether {@code before} was compacted to {@code after}
      */
     boolean wasCompactedTo(RecordId before, RecordId after) {
-        return after.equals(get(before));
+        return recursiveWasCompactedTo(before, after);
+    }
+
+    /**
+     * Given a record and a map I need to cycle down the #prev line to identify the compacted version.
+     * 
+     * @param before before record identifier
+     * @param after after record identifier
+     * @return whether {@code before} was compacted to {@code after}
+     */
+    private boolean recursiveWasCompactedTo(RecordId before,
+            RecordId after) {
+        RecordId potentialAfter = recursiveGet(this, before);
+        if (potentialAfter == null) {
+            return false;
+        }
+        if (after.equals(potentialAfter)) {
+            return true;
+        }
+        return recursiveWasCompactedTo(potentialAfter, after);
+    }
+
+    private static RecordId recursiveGet(CompactionMap map, RecordId before) {
+        RecordId after = map.get(before);
+        if (after != null) {
+            return after;
+        }
+        if (map.prev != null) {
+            return recursiveGet(map.prev, before);
+        }
+        return null;
     }
 
     /**
@@ -100,10 +140,21 @@ public class CompactionMap {
      * @param id segment identifier
      * @return whether the identified segment was compacted
      */
-    boolean wasCompacted(SegmentId id) {
+    public boolean wasCompacted(UUID id) {
         long msb = id.getMostSignificantBits();
         long lsb = id.getLeastSignificantBits();
-        return findEntry(msb, lsb) != -1;
+        return wasCompacted(this, msb, lsb);
+    }
+
+    private static boolean wasCompacted(CompactionMap map, long msb, long lsb) {
+        int find = map.findEntry(msb, lsb);
+        if (find != -1) {
+            return true;
+        }
+        if (map.prev != null) {
+            return wasCompacted(map.prev, msb, lsb);
+        }
+        return false;
     }
 
     public RecordId get(RecordId before) {
@@ -112,6 +163,11 @@ public class CompactionMap {
             return after;
         }
 
+        //empty map
+        if (msbs.length == 0) {
+            return null;
+        }
+
         SegmentId segmentId = before.getSegmentId();
         long msb = segmentId.getMostSignificantBits();
         long lsb = segmentId.getLeastSignificantBits();
@@ -122,12 +178,10 @@ public class CompactionMap {
             int index = entryIndex[entry];
             int limit = entryIndex[entry + 1];
             for (int i = index; i < limit; i++) {
-                int o = (beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS;
+                int o = decode(beforeOffsets[i]);
                 if (o == offset) {
                     // found it!
-                    return new RecordId(
-                            afterSegmentIds[i],
-                            (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS);
+                    return new RecordId(asSegmentId(i), decode(afterOffsets[i]));
                 } else if (o > offset) {
                     return null;
                 }
@@ -137,12 +191,32 @@ public class CompactionMap {
         return null;
     }
 
+    private static int decode(short offset) {
+        return (offset & 0xffff) << RECORD_ALIGN_BITS;
+    }
+
+    private static short encode(int offset) {
+        return (short) (offset >> RECORD_ALIGN_BITS);
+    }
+
+    private SegmentId asSegmentId(int index) {
+        int idx = afterSegmentIds[index];
+        return new SegmentId(tracker, amsbs[idx], alsbs[idx]);
+    }
+
+    private static UUID asUUID(SegmentId id) {
+        return new UUID(id.getMostSignificantBits(),
+                id.getLeastSignificantBits());
+    }
+
     /**
      * Adds a new entry to the compaction map. Overwriting a previously
      * added entry is not supported.
      */
     void put(RecordId before, RecordId after) {
-        assert get(before) == null;
+        if (get(before) != null) {
+            throw new IllegalArgumentException();
+        }
         recent.put(before, after);
         if (recent.size() >= compressInterval) {
             compress();
@@ -150,6 +224,10 @@ public class CompactionMap {
     }
 
     void compress() {
+        if (recent.isEmpty()) {
+            // noop
+            return;
+        }
         Set<UUID> uuids = newTreeSet();
 
         Map<UUID, Map<Integer, RecordId>> mapping = newTreeMap();
@@ -180,9 +258,11 @@ public class CompactionMap {
 
         int newEntries = beforeOffsets.length + recent.size();
         short[] newBeforeOffsets = new short[newEntries];
-        SegmentId[] newAfterSegmentIds = new SegmentId[newEntries];
         short[] newAfterOffsets = new short[newEntries];
 
+        int[] newAfterSegmentIds = new int[newEntries];
+        Map<UUID, Integer> newAfterSegments = newHashMap();
+
         int newIndex = 0;
         int newEntry = 0;
         int oldEntry = 0;
@@ -190,9 +270,10 @@ public class CompactionMap {
             newmsbs[newEntry] = uuid.getMostSignificantBits();
             newlsbs[newEntry] = uuid.getLeastSignificantBits();
 
-            Map<Integer, RecordId> map = mapping.get(uuid);
-            if (map == null) {
-                map = newTreeMap();
+            // offset -> record
+            Map<Integer, RecordId> newsegment = mapping.get(uuid);
+            if (newsegment == null) {
+                newsegment = newTreeMap();
             }
 
             if (oldEntry < msbs.length
@@ -201,21 +282,29 @@ public class CompactionMap {
                 int index = entryIndex[oldEntry];
                 int limit = entryIndex[oldEntry + 1];
                 for (int i = index; i < limit; i++) {
-                    map.put((beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS,
-                            new RecordId(
-                                    afterSegmentIds[i],
-                                    (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS));
+                    newsegment.put(decode(beforeOffsets[i]), new RecordId(
+                            asSegmentId(i), decode(afterOffsets[i])));
                 }
                 oldEntry++;
             }
 
             newEntryIndex[newEntry++] = newIndex;
-            for (Entry<Integer, RecordId> entry : map.entrySet()) {
+            for (Entry<Integer, RecordId> entry : newsegment.entrySet()) {
                 int key = entry.getKey();
                 RecordId id = entry.getValue();
-                newBeforeOffsets[newIndex] = (short) (key >> RECORD_ALIGN_BITS);
-                newAfterSegmentIds[newIndex] = id.getSegmentId();
-                newAfterOffsets[newIndex] = (short) (id.getOffset() >> RECORD_ALIGN_BITS);
+                newBeforeOffsets[newIndex] = encode(key);
+                newAfterOffsets[newIndex] = encode(id.getOffset());
+
+                UUID aUUID = asUUID(id.getSegmentId());
+                int aSIdx = -1;
+                if (newAfterSegments.containsKey(aUUID)) {
+                    aSIdx = newAfterSegments.get(aUUID);
+                } else {
+                    aSIdx = newAfterSegments.size();
+                    newAfterSegments.put(aUUID, aSIdx);
+                }
+                newAfterSegmentIds[newIndex] = aSIdx;
+
                 newIndex++;
             }
         }
@@ -227,10 +316,19 @@ public class CompactionMap {
         this.entryIndex = newEntryIndex;
 
         this.beforeOffsets = newBeforeOffsets;
-        this.afterSegmentIds = newAfterSegmentIds;
         this.afterOffsets = newAfterOffsets;
 
-        recent.clear();
+        this.afterSegmentIds = newAfterSegmentIds;
+        this.amsbs = new long[newAfterSegments.size()];
+        this.alsbs = new long[newAfterSegments.size()];
+        for (Entry<UUID, Integer> entry : newAfterSegments.entrySet()) {
+            this.amsbs[entry.getValue()] = entry.getKey()
+                    .getMostSignificantBits();
+            this.alsbs[entry.getValue()] = entry.getKey()
+                    .getLeastSignificantBits();
+        }
+
+        recent = newHashMap();
     }
 
     /**
@@ -294,4 +392,77 @@ public class CompactionMap {
         return -1;
     }
 
+    /**
+     * TODO: merge the 2 maps (assume that 'prev' is bigger than the current map
+     * as it contains the entire history, but don't change any values as it
+     * might still be in use by other threads)
+     */
+    void merge(CompactionMap prev) {
+        this.prev = prev;
+        this.prevWeight = prev.getEstimatedWeight();
+    }
+
+    public String getCompactionStats() {
+        StringBuilder sb = new StringBuilder();
+        CompactionMap cm = this;
+        while (cm != null) {
+            sb.append("[");
+            sb.append(getCompactionStats(cm));
+            sb.append("], ");
+            cm = cm.prev;
+        }
+        return sb.toString();
+    }
+
+    private static String getCompactionStats(CompactionMap cm) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Estimated Weight: ");
+        sb.append(humanReadableByteCount(getEstimatedWeight(cm)));
+        sb.append(", Records: ");
+        sb.append(cm.afterOffsets.length);
+        sb.append(", Segments: ");
+        sb.append(cm.amsbs.length);
+        return sb.toString();
+    }
+
+    public long getEstimatedWeight() {
+        long total = 0;
+        CompactionMap cm = this;
+        while (cm != null) {
+            total += getEstimatedWeight(cm);
+            cm = cm.prev;
+        }
+        return total;
+    }
+
+    public long getLastMergeWeight() {
+        return this.prevWeight;
+    }
+
+    private static long getEstimatedWeight(CompactionMap cm) {
+        // estimation of the object including empty 'recent' map
+        long total = 168;
+
+        // msbs
+        total += 24 + cm.msbs.length * 8;
+        // lsbs
+        total += 24 + cm.lsbs.length * 8;
+        // beforeOffsets
+        total += 24 + cm.beforeOffsets.length * 2;
+
+        // entryIndex
+        total += 24 + cm.entryIndex.length * 4;
+        // afterOffsets
+        total += 24 + cm.afterOffsets.length * 2;
+
+        // afterSegmentIds
+        total += 24 + cm.afterSegmentIds.length * 4;
+        // amsbs
+        total += 24 + cm.amsbs.length * 8;
+        // alsbs
+        total += 24 + cm.alsbs.length * 8;
+
+        return total;
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Mon Dec 15 12:36:36 2014
@@ -52,11 +52,6 @@ public class Compactor {
     private static final Logger log = LoggerFactory.getLogger(Compactor.class);
 
     /**
-     * over 64K in size, not will be included in the compaction map
-     */
-    private static final long threshold = 65536;
-
-    /**
      * Locks down the RecordId persistence structure
      */
     static long[] recordAsKey(RecordId r) {
@@ -66,7 +61,7 @@ public class Compactor {
 
     private final SegmentWriter writer;
 
-    private CompactionMap map = new CompactionMap(100000);
+    private final CompactionMap map;
 
     /**
      * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted
@@ -87,6 +82,7 @@ public class Compactor {
 
     public Compactor(SegmentWriter writer, boolean cloneBinaries) {
         this.writer = writer;
+        this.map = new CompactionMap(100000, writer.getTracker());
         this.cloneBinaries = cloneBinaries;
     }
 
@@ -98,7 +94,9 @@ public class Compactor {
     }
 
     public SegmentNodeState compact(NodeState before, NodeState after) {
-        return process(before, after).getNodeState();
+        SegmentNodeState compacted = process(before, after).getNodeState();
+        writer.flush();
+        return compacted;
     }
 
     public CompactionMap getCompactionMap() {
@@ -117,7 +115,6 @@ public class Compactor {
             return super.propertyAdded(compact(after));
         }
 
-
         @Override
         public boolean propertyChanged(
                 PropertyState before, PropertyState after) {
@@ -143,7 +140,7 @@ public class Compactor {
             if (success) {
                 SegmentNodeState state = writer.writeNode(child.getNodeState());
                 builder.setChildNode(name, state);
-                if (id != null && includeInMap(state)) {
+                if (id != null) {
                     map.put(id, state.getRecordId());
                 }
             }
@@ -151,23 +148,6 @@ public class Compactor {
             return success;
         }
 
-        private boolean includeInMap(SegmentNodeState state) {
-            if (state.getChildNodeCount(2) > 1) {
-                return true;
-            }
-            long count = 0;
-            for (PropertyState ps : state.getProperties()) {
-                for (int i = 0; i < ps.count(); i++) {
-                    long size = ps.size(i);
-                    count += size;
-                    if (size >= threshold || count >= threshold) {
-                        return true;
-                    }
-                }
-            }
-            return false;
-        }
-
         @Override
         public boolean childNodeChanged(
                 String name, NodeState before, NodeState after) {
@@ -182,13 +162,15 @@ public class Compactor {
             }
 
             NodeBuilder child = builder.getChildNode(name);
-            boolean success = after.compareAgainstBaseState(
-                    before, new CompactDiff(child));
+            boolean success = after.compareAgainstBaseState(before,
+                    new CompactDiff(child));
 
-            if (success && id != null && child.getChildNodeCount(2) > 1) {
-                RecordId compactedId =
-                        writer.writeNode(child.getNodeState()).getRecordId();
-                map.put(id, compactedId);
+            if (success) {
+                RecordId compactedId = writer.writeNode(child.getNodeState())
+                        .getRecordId();
+                if (id != null) {
+                    map.put(id, compactedId);
+                }
             }
 
             return success;
@@ -225,11 +207,6 @@ public class Compactor {
             SegmentBlob sb = (SegmentBlob) blob;
 
             try {
-                // if the blob is inlined or external, just clone it
-                if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) {
-                    return sb.clone(writer, cloneBinaries);
-                }
-
                 // else check if we've already cloned this specific record
                 RecordId id = sb.getRecordId();
                 RecordId compactedId = map.get(id);
@@ -237,6 +214,13 @@ public class Compactor {
                     return new SegmentBlob(compactedId);
                 }
 
+                // if the blob is inlined or external, just clone it
+                if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) {
+                    SegmentBlob clone = sb.clone(writer, cloneBinaries);
+                    map.put(id, clone.getRecordId());
+                    return clone;
+                }
+
                 // alternatively look if the exact same binary has been cloned
                 String key = getBlobKey(blob);
                 List<RecordId> ids = binaries.get(key);
@@ -269,7 +253,7 @@ public class Compactor {
         return blob;
     }
 
-    private String getBlobKey(Blob blob) throws IOException {
+    private static String getBlobKey(Blob blob) throws IOException {
         InputStream stream = blob.getNewStream();
         try {
             byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE];

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Mon Dec 15 12:36:36 2014
@@ -69,7 +69,7 @@ public class Segment {
      * The number of bytes (or bits of address space) to use for the
      * alignment boundary of segment records.
      */
-    static final int RECORD_ALIGN_BITS = 2; // align at the four-byte boundary
+    public static final int RECORD_ALIGN_BITS = 2; // align at the four-byte boundary
 
     /**
      * Maximum segment size. Record identifiers are stored as three-byte
@@ -193,8 +193,10 @@ public class Segment {
             if (!id.isDataSegmentId()) {
                 type = "bulk";
             }
+            long delta = System.currentTimeMillis() - id.getCreationTime();
             throw new IllegalStateException("RefId '" + index
-                    + "' doesn't exist in " + type + " segment " + id);
+                    + "' doesn't exist in " + type + " segment " + id
+                    + ". Creation date delta is " + delta + " ms.");
         }
         SegmentId refid = refids[index];
         if (refid == null) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java Mon Dec 15 12:36:36 2014
@@ -46,6 +46,8 @@ public class SegmentId implements Compar
 
     private final long lsb;
 
+    private final long creationTime;
+
     /**
      * A reference to the segment object, if it is available in memory. It is
      * used for fast lookup. The segment tracker will set or reset this field.
@@ -53,15 +55,17 @@ public class SegmentId implements Compar
     // TODO: possibly we could remove the volatile
     private volatile Segment segment;
 
-    public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) {
+    private SegmentId(SegmentTracker tracker, long msb, long lsb,
+            Segment segment, long creationTime) {
         this.tracker = tracker;
         this.msb = msb;
         this.lsb = lsb;
         this.segment = segment;
+        this.creationTime = creationTime;
     }
 
     public SegmentId(SegmentTracker tracker, long msb, long lsb) {
-        this(tracker, msb, lsb, null);
+        this(tracker, msb, lsb, null, System.currentTimeMillis());
     }
 
     /**
@@ -79,7 +83,7 @@ public class SegmentId implements Compar
      * @return {@code true} for a bulk segment, {@code false} otherwise
      */
     public boolean isBulkSegmentId() {
-        return (lsb >>> 60) == 0xBL; 
+        return (lsb >>> 60) == 0xBL;
     }
 
     public boolean equals(long msb, long lsb) {
@@ -117,18 +121,22 @@ public class SegmentId implements Compar
         return tracker;
     }
 
-    //--------------------------------------------------------< Comparable >--
+    public long getCreationTime() {
+        return creationTime;
+    }
+
+    // --------------------------------------------------------< Comparable >--
 
     @Override
     public int compareTo(SegmentId that) {
         int d = Long.valueOf(this.msb).compareTo(Long.valueOf(that.msb));
         if (d == 0) {
-            d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb)); 
+            d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb));
         }
         return d;
     }
 
-    //------------------------------------------------------------< Object >--
+    // ------------------------------------------------------------< Object >--
 
     @Override
     public String toString() {
@@ -137,7 +145,13 @@ public class SegmentId implements Compar
 
     @Override
     public boolean equals(Object object) {
-        return this == object;
+        if (this == object) {
+            return true;
+        } else if (object instanceof SegmentId) {
+            SegmentId that = (SegmentId) object;
+            return msb == that.msb && lsb == that.lsb;
+        }
+        return false;
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java Mon Dec 15 12:36:36 2014
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+
 /**
  * Hash table of weak references to segment identifiers.
  */
@@ -143,4 +145,24 @@ public class SegmentIdTable {
         return ((int) lsb) & (references.size() - 1);
     }
 
+    synchronized void clearSegmentIdTables(CompactionStrategy strategy) {
+        int size = references.size();
+        boolean dirty = false;
+        for (int i = 0; i < size; i++) {
+            WeakReference<SegmentId> reference = references.get(i);
+            if (reference != null) {
+                SegmentId id = reference.get();
+                if (id != null) {
+                    if (strategy.canRemove(id)) {
+                        reference.clear();
+                        references.set(i, null);
+                        dirty = true;
+                    }
+                }
+            }
+        }
+        if (dirty) {
+            refresh();
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Mon Dec 15 12:36:36 2014
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -105,6 +106,17 @@ public class SegmentNodeStore implements
         this.maximumBackoff = max;
     }
 
+    boolean locked(Callable<Boolean> c) throws Exception {
+        if (commitSemaphore.tryAcquire()) {
+            try {
+                return c.call();
+            } finally {
+                commitSemaphore.release();
+            }
+        }
+        return false;
+    }
+
     /**
      * Refreshes the head state. Should only be called while holding a
      * permit from the {@link #commitSemaphore}.

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Mon Dec 15 12:36:36 2014
@@ -18,6 +18,12 @@ package org.apache.jackrabbit.oak.plugin
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean;
+import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLEANUP_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLONE_BINARIES_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.PAUSE_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT;
 import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 
 import java.io.Closeable;
@@ -25,6 +31,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.concurrent.Callable;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.felix.scr.annotations.Activate;
@@ -32,6 +39,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.ReferencePolicy;
@@ -42,6 +50,10 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -81,11 +93,27 @@ public class SegmentNodeStoreService ext
     @Property(description="Cache size (MB)", intValue=256)
     public static final String CACHE = "cache";
 
-    @Property(description = "TarMK compaction paused flag", boolValue = true)
+    @Property(description = "TarMK compaction clone binaries flag", boolValue = CLONE_BINARIES_DEFAULT)
+    public static final String COMPACTION_CLONE_BINARIES = "compaction.cloneBinaries";
+
+    @Property(options = {
+            @PropertyOption(name = "CLEAN_ALL", value = "CLEAN_ALL"),
+            @PropertyOption(name = "CLEAN_NONE", value = "CLEAN_NONE"),
+            @PropertyOption(name = "CLEAN_OLD", value = "CLEAN_OLD") }, value = "CLEAN_OLD")
+    public static final String COMPACTION_CLEANUP = "compaction.cleanup";
+
+    @Property(description = "TarMK compaction strategy timestamp older (ms)", longValue = TIMESTAMP_DEFAULT)
+    public static final String COMPACTION_CLEANUP_TIMESTAMP = "compaction.cleanup.timestamp";
+
+    @Property(description = "TarMK compaction available memory multiplier needed to run compaction", byteValue = MEMORY_THRESHOLD_DEFAULT)
+    public static final String COMPACTION_MEMORY_THRESHOLD = "compaction.memoryThreshold";
+
+    @Property(description = "TarMK compaction paused flag", boolValue = PAUSE_DEFAULT)
     public static final String PAUSE_COMPACTION = "pauseCompaction";
 
     @Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false)
     public static final String STANDBY = "standby";
+
     /**
      * Boolean value indicating a blobStore is to be used
      */
@@ -112,6 +140,7 @@ public class SegmentNodeStoreService ext
     private Registration checkpointRegistration;
     private Registration revisionGCRegistration;
     private Registration blobGCRegistration;
+    private Registration compactionStrategyRegistration;
     private WhiteboardExecutor executor;
     private boolean customBlobStore;
 
@@ -126,20 +155,33 @@ public class SegmentNodeStoreService ext
         this.context = context;
         this.customBlobStore = Boolean.parseBoolean(lookup(context, CUSTOM_BLOB_STORE));
 
-        if(blobStore == null && customBlobStore){
+        if (blobStore == null && customBlobStore) {
             log.info("BlobStore use enabled. SegmentNodeStore would be initialized when BlobStore would be available");
-        }else{
+        } else {
             registerNodeStore();
         }
     }
 
-    public synchronized void registerNodeStore()
-            throws IOException {
-        if(context == null){
-            log.info("Component still not activated. Ignoring the initialization call");
-            return;
+    public void registerNodeStore() throws IOException {
+        if (registerSegmentStore()) {
+            Dictionary<String, String> props = new Hashtable<String, String>();
+            props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
+
+            boolean standby = toBoolean(lookup(context, STANDBY), false);
+            providerRegistration = context.getBundleContext().registerService(
+                    SegmentStoreProvider.class.getName(), this, props);
+            if (!standby) {
+                storeRegistration = context.getBundleContext().registerService(
+                        NodeStore.class.getName(), this, props);
+            }
         }
+    }
 
+    public synchronized boolean registerSegmentStore() throws IOException {
+        if (context == null) {
+            log.info("Component still not activated. Ignoring the initialization call");
+            return false;
+        }
         Dictionary<?, ?> properties = context.getProperties();
         name = String.valueOf(properties.get(NAME));
 
@@ -161,31 +203,49 @@ public class SegmentNodeStoreService ext
             size = System.getProperty(SIZE, "256");
         }
 
-        boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), true);
+        boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION),
+                PAUSE_DEFAULT);
+        boolean cloneBinaries = toBoolean(
+                lookup(context, COMPACTION_CLONE_BINARIES),
+                CLONE_BINARIES_DEFAULT);
+        long cleanupTs = toLong(lookup(context, COMPACTION_CLEANUP_TIMESTAMP),
+                TIMESTAMP_DEFAULT);
+        String cleanup = lookup(context, COMPACTION_CLEANUP);
+        if (cleanup == null) {
+            cleanup = CLEANUP_DEFAULT.toString();
+        }
+
+        String memoryThresholdS = lookup(context, COMPACTION_MEMORY_THRESHOLD);
+        byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT;
+        if (memoryThresholdS != null) {
+            memoryThreshold = Byte.valueOf(memoryThresholdS);
+        }
+        CompactionStrategy compactionStrategy = new CompactionStrategy(
+                pauseCompaction, cloneBinaries, CleanupType.valueOf(cleanup), cleanupTs,
+                memoryThreshold) {
+            @Override
+            public boolean compacted(Callable<Boolean> setHead) throws Exception {
+                // Need to guard against concurrent commits to avoid
+                // mixed segments. See OAK-2192.
+                return delegate.locked(setHead);
+            }
+        };
+
         boolean memoryMapping = "64".equals(mode);
+        int cacheSize = Integer.parseInt(size);
         if (customBlobStore) {
             log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore);
-            store = new FileStore(blobStore, new File(directory),
-                    Integer.parseInt(size), memoryMapping)
-                    .setPauseCompaction(pauseCompaction);
+            store = new FileStore(blobStore, new File(directory), cacheSize,
+                    memoryMapping).setCompactionStrategy(compactionStrategy);
         } else {
-            store = new FileStore(new File(directory), Integer.parseInt(size),
-                    memoryMapping).setPauseCompaction(pauseCompaction);
+            store = new FileStore(new File(directory), cacheSize, memoryMapping)
+                    .setCompactionStrategy(compactionStrategy);
         }
 
         delegate = new SegmentNodeStore(store);
         observerTracker = new ObserverTracker(delegate);
         observerTracker.start(context.getBundleContext());
 
-        Dictionary<String, String> props = new Hashtable<String, String>();
-        props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
-
-        boolean standby = toBoolean(lookup(context, STANDBY), false);
-        providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, props);
-        if (!standby) {
-            storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
-        }
-
         OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
         executor = new WhiteboardExecutor();
         executor.start(whiteboard);
@@ -218,7 +278,14 @@ public class SegmentNodeStoreService ext
                     BlobGCMBean.TYPE, "Segment node store blob garbage collection");
         }
 
+        compactionStrategyRegistration = registerMBean(whiteboard,
+                CompactionStrategyMBean.class,
+                new DefaultCompactionStrategyMBean(compactionStrategy),
+                CompactionStrategyMBean.TYPE,
+                "Segment node store compaction strategy settings");
+
         log.info("SegmentNodeStore initialized");
+        return true;
     }
 
     private static String lookup(ComponentContext context, String property) {
@@ -273,6 +340,10 @@ public class SegmentNodeStoreService ext
             blobGCRegistration.unregister();
             blobGCRegistration = null;
         }
+        if (compactionStrategyRegistration != null) {
+            compactionStrategyRegistration.unregister();
+            compactionStrategyRegistration = null;
+        }
         if (executor != null) {
             executor.stop();
             executor = null;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java Mon Dec 15 12:36:36 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,8 +74,7 @@ public class SegmentTracker {
      * identifiers and identifiers of the corresponding records
      * after compaction.
      */
-    private final AtomicReference<CompactionMap> compactionMap =
-            new AtomicReference<CompactionMap>(new CompactionMap(1));
+    private final AtomicReference<CompactionMap> compactionMap;
 
     private final long cacheSize;
 
@@ -101,6 +101,8 @@ public class SegmentTracker {
         this.store = store;
         this.writer = new SegmentWriter(store, this);
         this.cacheSize = cacheSizeMB * MB;
+        this.compactionMap = new AtomicReference<CompactionMap>(
+                new CompactionMap(1, this));
     }
 
     public SegmentTracker(SegmentStore store) {
@@ -116,9 +118,16 @@ public class SegmentTracker {
     }
 
     Segment getSegment(SegmentId id) {
-        Segment segment = store.readSegment(id);
-        setSegment(id, segment);
-        return segment;
+        try {
+            Segment segment = store.readSegment(id);
+            setSegment(id, segment);
+            return segment;
+        } catch (SegmentNotFoundException snfe) {
+            long delta = System.currentTimeMillis() - id.getCreationTime();
+            log.error("Segment not found: {}. Creation date delta is {} ms.",
+                    id, delta);
+            throw snfe;
+        }
     }
 
     void setSegment(SegmentId id, Segment segment) {
@@ -157,11 +166,12 @@ public class SegmentTracker {
     }
 
     public void setCompactionMap(CompactionMap compaction) {
+        compaction.merge(compactionMap.get());
         compactionMap.set(compaction);
     }
 
     @Nonnull
-    CompactionMap getCompactionMap() {
+    public CompactionMap getCompactionMap() {
         return compactionMap.get();
     }
 
@@ -234,4 +244,10 @@ public class SegmentTracker {
         return getSegmentId(msb, lsb);
     }
 
+    public synchronized void clearSegmentIdTables(CompactionStrategy strategy) {
+        for (int i = 0; i < tables.length; i++) {
+            tables[i].clearSegmentIdTables(strategy);
+        }
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Mon Dec 15 12:36:36 2014
@@ -657,7 +657,7 @@ public class SegmentWriter {
         return thisLevel.iterator().next();
     }
 
-    public MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) {
+    MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) {
         if (base != null && base.isDiff()) {
             Segment segment = base.getSegment();
             RecordId key = segment.readRecordId(base.getOffset(8));
@@ -1094,7 +1094,7 @@ public class SegmentWriter {
                     && store.containsSegment(((SegmentPropertyState) property).getRecordId().getSegmentId())) {
                 ids.add(((SegmentPropertyState) property).getRecordId());
             } else if (!(before instanceof SegmentNodeState)
-                    || store.containsSegment(((SegmentNodeState) before).getRecordId().getSegmentId())) {
+                    || store.containsSegment(before.getRecordId().getSegmentId())) {
                 ids.add(writeProperty(property));
             } else {
                 // reuse previously stored property, if possible
@@ -1125,4 +1125,8 @@ public class SegmentWriter {
         }
     }
 
+    public SegmentTracker getTracker() {
+        return tracker;
+    }
+
 }

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java?rev=1645637&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java Mon Dec 15 12:36:36 2014
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
+
+import java.util.concurrent.Callable;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+
+public class CompactionStrategy {
+
+    public enum CleanupType {
+
+        /**
+         * {@code CLEAN_ALL} <em>must</em> be used in conjunction with {@code cloneBinaries}
+         * otherwise segments can go away ({@code SegmentNotFoundException})
+         * <p>
+         * Pros: best compaction results
+         * <p>
+         * Cons: larger repo size <em>during</em> compaction (2x). High chances that a currently
+         * running diff (e.g. observation) fails with {@code SegmentNotFoundException}.
+         */
+        CLEAN_ALL,
+
+        CLEAN_NONE,
+
+        /**
+         * {@code CLEAN_OLD} with {@code cloneBinaries}
+         * <p>
+         * Pros: better compaction results
+         * <p>
+         * Cons: larger repo size {@code during} compaction (2x). {@code SegmentNotFoundException}
+         * with insufficiently large values for {@code olderThan}.
+         * <p>
+         * {@code CLEAN_OLD} without {@code cloneBinaries}
+         * <p>
+         * Pros: weakest compaction results, smaller size during compaction (1x + size of
+         * data-segments).
+         * <p>
+         * Cons: {@code SegmentNotFoundException} with insufficiently large values for
+         * {@code olderThan}.
+         */
+        CLEAN_OLD
+    }
+
+    public static final boolean PAUSE_DEFAULT = true;
+
+    public static final boolean CLONE_BINARIES_DEFAULT = false;
+
+    public static final CleanupType CLEANUP_DEFAULT = CleanupType.CLEAN_OLD;
+
+    public static final long TIMESTAMP_DEFAULT = 1000 * 60 * 5;
+
+    public static final byte MEMORY_THRESHOLD_DEFAULT = 5;
+
+    private boolean paused;
+
+    private boolean cloneBinaries;
+
+    @Nonnull
+    private CleanupType cleanupType;
+
+    /**
+     * anything that has a lifetime bigger than this will be removed. a value of
+     * 0 (or very small) acts like a CLEANUP.NONE, a value of -1 (or negative)
+     * acts like a CLEANUP.ALL
+     * 
+     */
+    private long olderThan;
+
+    private byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT;
+
+    private CompactionMap compactionMap;
+
+    private long compactionStart = currentTimeMillis();
+
+    public CompactionStrategy(boolean paused,
+            boolean cloneBinaries, @Nonnull CleanupType cleanupType, long olderThan, byte memoryThreshold) {
+        checkArgument(olderThan >= 0);
+        this.paused = paused;
+        this.cloneBinaries = cloneBinaries;
+        this.cleanupType = checkNotNull(cleanupType);
+        this.olderThan = olderThan;
+        this.memoryThreshold = memoryThreshold;
+    }
+
+    public boolean canRemove(SegmentId id) {
+        switch (cleanupType) {
+            case CLEAN_ALL:
+                return true;
+            case CLEAN_NONE:
+                return false;
+            case CLEAN_OLD:
+                return compactionStart - id.getCreationTime() > olderThan;
+        }
+        return false;
+    }
+
+    public boolean cloneBinaries() {
+        return cloneBinaries;
+    }
+
+    public boolean isPaused() {
+        return paused;
+    }
+
+    public void setPaused(boolean paused) {
+        this.paused = paused;
+    }
+
+    public void setCloneBinaries(boolean cloneBinaries) {
+        this.cloneBinaries = cloneBinaries;
+    }
+
+    public void setCleanupType(@Nonnull CleanupType cleanupType) {
+        this.cleanupType = checkNotNull(cleanupType);
+    }
+
+    public void setOlderThan(long olderThan) {
+        checkArgument(olderThan >= 0);
+        this.olderThan = olderThan;
+    }
+
+    public void setCompactionMap(@Nonnull CompactionMap compactionMap) {
+        this.compactionMap = checkNotNull(compactionMap);
+    }
+
+    String getCleanupType() {
+        return cleanupType.toString();
+    }
+
+    long getOlderThan() {
+        return olderThan;
+    }
+
+    @CheckForNull
+    public CompactionMap getCompactionMap() {
+        return this.compactionMap;
+    }
+
+    @Override
+    public String toString() {
+        return "DefaultCompactionStrategy [pauseCompaction=" + paused
+                + ", cloneBinaries=" + cloneBinaries + ", cleanup=" + cleanupType
+                + ", olderThan=" + olderThan + ']';
+    }
+
+    public void setCompactionStart(long ms) {
+        this.compactionStart = ms;
+    }
+
+    public byte getMemoryThreshold() {
+        return memoryThreshold;
+    }
+
+    public void setMemoryThreshold(byte memoryThreshold) {
+        this.memoryThreshold = memoryThreshold;
+    }
+
+    public boolean compacted(@Nonnull Callable<Boolean> setHead) throws Exception {
+        return checkNotNull(setHead).call();
+    }
+}

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java?rev=1645637&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java Mon Dec 15 12:36:36 2014
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.compaction;
+
+public interface CompactionStrategyMBean {
+
+    String TYPE = "CompactionStrategy";
+
+    boolean isCloneBinaries();
+
+    void setCloneBinaries(boolean cloneBinaries);
+
+    boolean isPausedCompaction();
+
+    void setPausedCompaction(boolean pausedCompaction);
+
+    String getCleanupStrategy();
+
+    void setCleanupStrategy(String cleanup);
+
+    long getOlderThan();
+
+    void setOlderThan(long olderThan);
+
+    byte getMemoryThreshold();
+
+    void setMemoryThreshold(byte memory);
+
+    String getCompactionMapStats();
+}

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java?rev=1645637&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java Mon Dec 15 12:36:36 2014
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.compaction;
+
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
+
+public class DefaultCompactionStrategyMBean implements CompactionStrategyMBean {
+
+    private final CompactionStrategy strategy;
+
+    public DefaultCompactionStrategyMBean(CompactionStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public boolean isCloneBinaries() {
+        return strategy.cloneBinaries();
+    }
+
+    @Override
+    public void setCloneBinaries(boolean cloneBinaries) {
+        strategy.setCloneBinaries(cloneBinaries);
+    }
+
+    @Override
+    public boolean isPausedCompaction() {
+        return strategy.isPaused();
+    }
+
+    @Override
+    public void setPausedCompaction(boolean pausedCompaction) {
+        strategy.setPaused(pausedCompaction);
+    }
+
+    @Override
+    public String getCleanupStrategy() {
+        return strategy.getCleanupType();
+    }
+
+    @Override
+    public void setCleanupStrategy(String cleanup) {
+        strategy.setCleanupType(CleanupType.valueOf(cleanup));
+    }
+
+    @Override
+    public long getOlderThan() {
+        return strategy.getOlderThan();
+    }
+
+    @Override
+    public void setOlderThan(long olderThan) {
+        strategy.setOlderThan(olderThan);
+    }
+
+    @Override
+    public byte getMemoryThreshold() {
+        return strategy.getMemoryThreshold();
+    }
+
+    @Override
+    public void setMemoryThreshold(byte memory) {
+        strategy.setMemoryThreshold(memory);
+    }
+
+    @Override
+    public String getCompactionMapStats() {
+        CompactionMap cm = strategy.getCompactionMap();
+        if (cm != null) {
+            return cm.getCompactionStats();
+        }
+        return "";
+    }
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Mon Dec 15 12:36:36 2014
@@ -26,10 +26,10 @@ import static com.google.common.collect.
 import static java.lang.String.format;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT;
 
 import java.io.File;
 import java.io.IOException;
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -53,6 +54,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Maps;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
 import org.apache.jackrabbit.oak.plugins.segment.Compactor;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
@@ -63,6 +65,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -86,7 +89,7 @@ public class FileStore implements Segmen
 
     private static final String JOURNAL_FILE_NAME = "journal.log";
 
-    private static final boolean MEMORY_MAPPING_DEFAULT =
+    static final boolean MEMORY_MAPPING_DEFAULT =
             "64".equals(System.getProperty("sun.arch.data.model", "32"));
 
     private final SegmentTracker tracker;
@@ -134,17 +137,15 @@ public class FileStore implements Segmen
      */
     private final BackgroundThread compactionThread;
 
+    private CompactionStrategy compactionStrategy = new CompactionStrategy(
+            true, false, CLEAN_NONE, 0, MEMORY_THRESHOLD_DEFAULT);
+
     /**
      * Flag to request revision cleanup during the next flush.
      */
     private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
 
     /**
-     * Flag to set the compaction on pause.
-     */
-    private volatile boolean pauseCompaction = true;
-
-    /**
      * List of old tar file generations that are waiting to be removed. They can
      * not be removed immediately, because they first need to be closed, and the
      * JVM needs to release the memory mapped file references.
@@ -257,39 +258,68 @@ public class FileStore implements Segmen
                 new Runnable() {
                     @Override
                     public void run() {
-                        log.info("TarMK compaction started");
-                        Stopwatch watch = Stopwatch.createStarted();
-                        CompactionGainEstimate estimate = estimateCompactionGain();
-                        long gain = estimate.estimateCompactionGain();
-                        if (gain >= 10) {
-                            log.info(
-                                    "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction",
-                                    watch, gain,
-                                    estimate.getReachableSize(),
-                                    estimate.getTotalSize(),
-                                    humanReadableByteCount(estimate.getReachableSize()),
-                                    humanReadableByteCount(estimate.getTotalSize()));
-                            if (!pauseCompaction) {
-                                compact();
-                            } else {
-                                log.info("TarMK compaction paused");
-                            }
-                        } else {
-                            log.info(
-                                    "Estimated compaction in {}ms, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now",
-                                    watch, gain,
-                                    estimate.getReachableSize(),
-                                    estimate.getTotalSize(),
-                                    humanReadableByteCount(estimate.getReachableSize()),
-                                    humanReadableByteCount(estimate.getTotalSize()));
-                        }
-                        cleanupNeeded.set(true);
+                        maybeCompact(true);
                     }
                 });
 
         log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
     }
 
+    public boolean maybeCompact(boolean cleanup) {
+        log.info("TarMK compaction started");
+
+        Runtime runtime = Runtime.getRuntime();
+        long avail = runtime.totalMemory() - runtime.freeMemory();
+        long delta = 0;
+        if (compactionStrategy.getCompactionMap() != null) {
+            delta = compactionStrategy.getCompactionMap().getLastMergeWeight();
+        }
+        long needed = delta * compactionStrategy.getMemoryThreshold();
+        if (needed >= avail) {
+            log.info(
+                    "Not enough available memory {}, needed {}, last merge delta {}, so skipping compaction for now",
+                    humanReadableByteCount(avail),
+                    humanReadableByteCount(needed),
+                    humanReadableByteCount(delta));
+            if (cleanup) {
+                cleanupNeeded.set(true);
+            }
+            return false;
+        }
+
+        Stopwatch watch = Stopwatch.createStarted();
+        compactionStrategy.setCompactionStart(System.currentTimeMillis());
+        boolean compacted = false;
+
+        CompactionGainEstimate estimate = estimateCompactionGain();
+        long gain = estimate.estimateCompactionGain();
+        if (gain >= 10) {
+            log.info(
+                    "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction",
+                    watch, gain, estimate.getReachableSize(),
+                    estimate.getTotalSize(),
+                    humanReadableByteCount(estimate.getReachableSize()),
+                    humanReadableByteCount(estimate.getTotalSize()));
+            if (!compactionStrategy.isPaused()) {
+                compact();
+                compacted = true;
+            } else {
+                log.info("TarMK compaction paused");
+            }
+        } else {
+            log.info(
+                    "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now",
+                    watch, gain, estimate.getReachableSize(),
+                    estimate.getTotalSize(),
+                    humanReadableByteCount(estimate.getReachableSize()),
+                    humanReadableByteCount(estimate.getTotalSize()));
+        }
+        if (cleanup) {
+            cleanupNeeded.set(true);
+        }
+        return compacted;
+    }
+
     static Map<Integer, Map<Character, File>> collectFiles(File directory)
             throws IOException {
         Map<Integer, Map<Character, File>> dataFiles = newHashMap();
@@ -461,10 +491,10 @@ public class FileStore implements Segmen
         }
         writer.cleanup(ids);
 
-        List<TarReader> list =
-                newArrayListWithCapacity(readers.size());
+        CompactionMap cm = tracker.getCompactionMap();
+        List<TarReader> list = newArrayListWithCapacity(readers.size());
         for (TarReader reader : readers) {
-            TarReader cleaned = reader.cleanup(ids);
+            TarReader cleaned = reader.cleanup(ids, cm);
             if (cleaned == reader) {
                 list.add(reader);
             } else {
@@ -490,12 +520,11 @@ public class FileStore implements Segmen
      * reference to them).
      */
     public void compact() {
-        long start = System.nanoTime();
-        log.info("TarMK compaction running");
+        log.info("TarMK compaction running, strategy={}", compactionStrategy);
 
+        long start = System.currentTimeMillis();
         SegmentWriter writer = new SegmentWriter(this, tracker);
-        Compactor compactor = new Compactor(writer);
-
+        final Compactor compactor = new Compactor(writer, compactionStrategy.cloneBinaries());
         SegmentNodeState before = getHead();
         long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
                 .getChildNodeCount(Long.MAX_VALUE);
@@ -506,26 +535,22 @@ public class FileStore implements Segmen
         }
 
         SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
-        writer.flush();
-        while (!setHead(before, after)) {
-            // Some other concurrent changes have been made.
-            // Rebase (and compact) those changes on top of the
-            // compacted state before retrying to set the head.
-            SegmentNodeState head = getHead();
-            after = compactor.compact(before, head);
-            before = head;
-            writer.flush();
-        }
-        tracker.setCompactionMap(compactor.getCompactionMap());
-
-        // Drop the SegmentWriter caches and flush any existing state
-        // in an attempt to prevent new references to old pre-compacted
-        // content. TODO: There should be a cleaner way to do this.
-        tracker.getWriter().dropCache();
-        tracker.getWriter().flush();
 
-        log.info("TarMK compaction completed in {}ms", MILLISECONDS
-                .convert(System.nanoTime() - start, NANOSECONDS));
+        Callable<Boolean> setHead = new SetHead(before, after, compactor);
+        try {
+            while(!compactionStrategy.compacted(setHead)) {
+                // Some other concurrent changes have been made.
+                // Rebase (and compact) those changes on top of the
+                // compacted state before retrying to set the head.
+                SegmentNodeState head = getHead();
+                after = compactor.compact(after, head);
+                setHead = new SetHead(head, after, compactor);
+            }
+            log.info("TarMK compaction completed in {}ms",
+                    System.currentTimeMillis() - start);
+        } catch (Exception e) {
+            log.error("Error while running TarMK compaction", e);
+        }
     }
 
     public synchronized Iterable<SegmentId> getSegmentIds() {
@@ -741,8 +766,8 @@ public class FileStore implements Segmen
         return emptyMap();
     }
 
-    public FileStore setPauseCompaction(boolean pauseCompaction) {
-        this.pauseCompaction = pauseCompaction;
+    public FileStore setCompactionStrategy(CompactionStrategy strategy) {
+        this.compactionStrategy = strategy;
         return this;
     }
 
@@ -810,4 +835,37 @@ public class FileStore implements Segmen
 
     }
 
+    private class SetHead implements Callable<Boolean> {
+        private final SegmentNodeState before;
+        private final SegmentNodeState after;
+        private final Compactor compactor;
+
+        public SetHead(SegmentNodeState before, SegmentNodeState after, Compactor compactor) {
+            this.before = before;
+            this.after = after;
+            this.compactor = compactor;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            // When used in conjunction with the SegmentNodeStore, this method
+            // needs to be called inside the commitSemaphore as doing otherwise
+            // might result in mixed segments. See OAK-2192.
+            if (setHead(before, after)) {
+                CompactionMap cm = compactor.getCompactionMap();
+                tracker.setCompactionMap(cm);
+                compactionStrategy.setCompactionMap(cm);
+
+                // Drop the SegmentWriter caches and flush any existing state
+                // in an attempt to prevent new references to old pre-compacted
+                // content. TODO: There should be a cleaner way to do this.
+                tracker.getWriter().dropCache();
+                tracker.getWriter().flush();
+                tracker.clearSegmentIdTables(compactionStrategy);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java Mon Dec 15 12:36:36 2014
@@ -46,6 +46,7 @@ import java.util.regex.Pattern;
 import java.util.zip.CRC32;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,7 +148,7 @@ class TarReader {
         if (reader != null) {
             return reader;
         } else {
-            throw new IOException("Failed to open recoved tar file " + file);
+            throw new IOException("Failed to open recovered tar file " + file);
         }
     }
 
@@ -616,12 +617,15 @@ class TarReader {
      * @return this (if the file is kept as is), or the new generation file, or
      *         null if the file is fully garbage
      */
-    synchronized TarReader cleanup(Set<UUID> referencedIds) throws IOException {
+    synchronized TarReader cleanup(Set<UUID> referencedIds, CompactionMap cm) throws IOException {
+        if (referencedIds.isEmpty()) {
+            return null;
+        }
+
         Map<UUID, List<UUID>> graph = null;
         if (this.graph != null) {
             graph = parseGraph();
         }
-
         TarEntry[] sorted = new TarEntry[index.remaining() / 24];
         int position = index.position();
         for (int i = 0; position < index.limit(); i++) {
@@ -643,16 +647,27 @@ class TarReader {
                 // this segment is not referenced anywhere
                 sorted[i] = null;
             } else {
-                size += getEntrySize(entry.size());
-                count += 1;
-
                 if (isDataSegmentId(entry.lsb())) {
+                    size += getEntrySize(entry.size());
+                    count += 1;
+
                     // this is a referenced data segment, so follow the graph
                     if (graph != null) {
                         List<UUID> refids = graph.get(
                                 new UUID(entry.msb(), entry.lsb()));
                         if (refids != null) {
-                            referencedIds.addAll(refids);
+                            for (UUID r : refids) {
+                                if (isDataSegmentId(r.getLeastSignificantBits())) {
+                                    referencedIds.add(r);
+                                } else {
+                                    if (cm != null && cm.wasCompacted(id)) {
+                                        // skip bulk compacted segment
+                                        // references
+                                    } else {
+                                        referencedIds.add(r);
+                                    }
+                                }
+                            }
                         }
                     } else {
                         // a pre-compiled graph is not available, so read the
@@ -664,11 +679,27 @@ class TarReader {
                         int refcount = segment.get(pos + REF_COUNT_OFFSET) & 0xff;
                         int refend = pos + 16 * (refcount + 1);
                         for (int refpos = pos + 16; refpos < refend; refpos += 16) {
-                            referencedIds.add(new UUID(
-                                    segment.getLong(refpos),
-                                    segment.getLong(refpos + 8)));
+                            UUID r = new UUID(segment.getLong(refpos),
+                                    segment.getLong(refpos + 8));
+                            if (isDataSegmentId(r.getLeastSignificantBits())) {
+                                referencedIds.add(r);
+                            } else {
+                                if (cm != null && cm.wasCompacted(id)) {
+                                    // skip bulk compacted segment references
+                                } else {
+                                    referencedIds.add(r);
+                                }
+                            }
                         }
                     }
+                } else {
+                    // bulk segments compaction check
+                    if (cm != null && cm.wasCompacted(id)) {
+                        sorted[i] = null;
+                    } else {
+                        size += getEntrySize(entry.size());
+                        count += 1;
+                    }
                 }
             }
         }

Copied: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java (from r1645616, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java?p2=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java&r1=1645616&r2=1645637&rev=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java Mon Dec 15 12:36:36 2014
@@ -17,9 +17,13 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.plugins.segment.file;
+package org.apache.jackrabbit.oak.plugins.segment;
 
 import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
+import static org.apache.commons.io.FileUtils.deleteDirectory;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_OLD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -32,17 +36,17 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.io.FileUtils;
+import com.google.common.io.ByteStreams;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.plugins.segment.Compactor;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentPropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
@@ -68,79 +72,113 @@ public class CompactionAndCleanupTest {
 
     @Test
     @Ignore("OAK-2045")
-    public void compactionAndWeakReferenceMagic() throws Exception{
+    public void compactionAndWeakReferenceMagic() throws Exception {
         final int MB = 1024 * 1024;
         final int blobSize = 5 * MB;
 
-        FileStore fileStore = new FileStore(directory, 1, 1, false);
+        final int dataNodes = 10000;
+
+        // really long time span, no binary cloning
+        CompactionStrategy custom = new CompactionStrategy(false,
+                false, CLEAN_OLD, TimeUnit.HOURS.toMillis(1), (byte) 0);
+
+        FileStore fileStore = new FileStore(directory, 1);
         SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
+        fileStore.setCompactionStrategy(custom);
+
+        // 1a. Create a bunch of data
+        NodeBuilder extra = nodeStore.getRoot().builder();
+        NodeBuilder content = extra.child("content");
+        for (int i = 0; i < dataNodes; i++) {
+            NodeBuilder c = content.child("c" + i);
+            for (int j = 0; j < 1000; j++) {
+                c.setProperty("p" + i, "v" + i);
+            }
+        }
+        nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        // ----
 
-        //1. Create a property with 5 MB blob
+        final long dataSize = fileStore.size();
+        System.out.printf("File store dataSize %s%n",
+                byteCountToDisplaySize(dataSize));
+
+        // 1. Create a property with 5 MB blob
         NodeBuilder builder = nodeStore.getRoot().builder();
         builder.setProperty("a1", createBlob(nodeStore, blobSize));
         builder.setProperty("b", "foo");
 
-        //Keep a reference to this nodeState to simulate long
-        //running session
-        NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
-        System.out.printf("File store pre removal %d%n", mb(fileStore.size()));
-        assertEquals(mb(fileStore.size()), mb(blobSize));
-
+        // Keep a reference to this nodeState to simulate long
+        // running session
+        NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE,
+                CommitInfo.EMPTY);
+
+        System.out.printf("File store pre removal %s expecting %s %n",
+                byteCountToDisplaySize(fileStore.size()),
+                byteCountToDisplaySize(blobSize + dataSize));
+        assertEquals(mb(blobSize + dataSize), mb(fileStore.size()));
 
-        //2. Now remove the property
+        // 2. Now remove the property
         builder = nodeStore.getRoot().builder();
         builder.removeProperty("a1");
         nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
 
-        //Size remains same
-        System.out.printf("File store pre compaction %d%n", mb(fileStore.size()));
-        assertEquals(mb(fileStore.size()), mb(blobSize));
-
-        //3. Compact
-        fileStore.compact();
-
-        //Size still remains same
-        System.out.printf("File store post compaction %d%n", mb(fileStore.size()));
-        assertEquals(mb(fileStore.size()), mb(blobSize));
+        // Size remains same
+        System.out.printf("File store pre compaction %s expecting %s%n",
+                byteCountToDisplaySize(fileStore.size()),
+                byteCountToDisplaySize(blobSize + dataSize));
+        assertEquals(mb(blobSize + dataSize), mb(fileStore.size()));
+
+        // 3. Compact
+        assertTrue(fileStore.maybeCompact(false));
+        // fileStore.cleanup();
+
+        // Size still remains same
+        System.out.printf("File store post compaction %s expecting %s%n",
+                byteCountToDisplaySize(fileStore.size()),
+                byteCountToDisplaySize(blobSize + dataSize));
+        assertEquals("File store post compaction size",
+                mb(blobSize + dataSize), mb(fileStore.size()));
 
-        //4. Add some more property to flush the current TarWriter
+        // 4. Add some more property to flush the current TarWriter
         builder = nodeStore.getRoot().builder();
         builder.setProperty("a2", createBlob(nodeStore, blobSize));
         nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
 
-        //Size is double
-        System.out.printf("File store pre cleanup %d%n", mb(fileStore.size()));
-        assertEquals(mb(fileStore.size()), 2 * mb(blobSize));
-
-        //5. Cleanup
-        cleanup(fileStore);
-
-        //Size is still double. Deleted space not reclaimed
-        System.out.printf("File store post cleanup %d%n", mb(fileStore.size()));
-        assertEquals(mb(fileStore.size()), 2 * mb(blobSize));
-
-        //6. Null out any hard reference
-        ns1 = null;
-        builder = null;
-        cleanup(fileStore);
-
-        //Size should not come back to 5 and deleted data
-        //space reclaimed
-        System.out.printf("File store post cleanup and nullification %d%n", mb(fileStore.size()));
-        assertEquals(mb(fileStore.size()), mb(blobSize));
+        // Size is double
+        System.out.printf("File store pre cleanup %s expecting %s%n",
+                byteCountToDisplaySize(fileStore.size()),
+                byteCountToDisplaySize(2 * blobSize + dataSize));
+        assertEquals(mb(2 * blobSize + dataSize), mb(fileStore.size()));
+
+        // 5. Cleanup
+        assertTrue(fileStore.maybeCompact(false));
+        fileStore.cleanup();
+
+        System.out.printf(
+                "File store post cleanup %s expecting between [%s,%s]%n",
+                byteCountToDisplaySize(fileStore.size()),
+                byteCountToDisplaySize(blobSize + dataSize),
+                byteCountToDisplaySize(blobSize + 2 * dataSize));
+
+        // 0 data size: fileStore.size() == blobSize
+        // >0 data size: fileStore.size() in [blobSize + dataSize, blobSize +
+        // 2xdataSize]
+        assertTrue(mb(fileStore.size()) >= mb(blobSize + dataSize)
+                && mb(fileStore.size()) <= mb(blobSize + 2 * dataSize));
+
+        // refresh the ts ref, to simulate a long wait time
+        custom.setOlderThan(0);
+        assertFalse(fileStore.maybeCompact(false));
+
+        // no data loss happened
+        byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot()
+                .getProperty("a2").getValue(Type.BINARY).getNewStream());
+        assertEquals(blobSize, blob.length);
     }
 
     @After
     public void cleanDir() throws IOException {
-        FileUtils.deleteDirectory(directory);
-    }
-
-    private static void cleanup(FileStore fileStore) throws IOException {
-        fileStore.getTracker().setCompactionMap(new Compactor(null).getCompactionMap());
-        fileStore.getTracker().getWriter().dropCache();
-
-        fileStore.cleanup();
+        deleteDirectory(directory);
     }
 
     private static Blob createBlob(NodeStore nodeStore, int size) throws IOException {
@@ -154,50 +192,15 @@ public class CompactionAndCleanupTest {
     }
 
     @Test
-    public void testGainEstimator() throws Exception {
-        final int MB = 1024 * 1024;
-        final int blobSize = 2 * MB;
-
-        FileStore fileStore = new FileStore(directory, 2, false);
-        SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
-
-        // 1. Create some blob properties
-        NodeBuilder builder = nodeStore.getRoot().builder();
-
-        NodeBuilder c1 = builder.child("c1");
-        c1.setProperty("a", createBlob(nodeStore, blobSize));
-        c1.setProperty("b", "foo");
-
-        NodeBuilder c2 = builder.child("c2");
-        c2.setProperty("a", createBlob(nodeStore, blobSize));
-        c2.setProperty("b", "foo");
-
-        NodeBuilder c3 = builder.child("c3");
-        c3.setProperty("a", createBlob(nodeStore, blobSize));
-        c3.setProperty("b", "foo");
-        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
-        // 2. Now remove the property
-        builder = nodeStore.getRoot().builder();
-        builder.child("c1").remove();
-        builder.child("c2").remove();
-        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
-        fileStore.flush();
-        try {
-            // should be at 66%
-            assertTrue(fileStore.estimateCompactionGain()
-                    .estimateCompactionGain() > 60);
-        } finally {
-            fileStore.close();
-        }
-    }
-
-    @Ignore("OAK-2192")  // FIXME OAK-2192
-    @Test
     public void testMixedSegments() throws Exception {
         FileStore store = new FileStore(directory, 2, false);
         final SegmentNodeStore nodeStore = new SegmentNodeStore(store);
+        store.setCompactionStrategy(new CompactionStrategy(true, false, CLEAN_NONE, 0, (byte) 5) {
+            @Override
+            public boolean compacted(Callable<Boolean> setHead) throws Exception {
+                return nodeStore.locked(setHead);
+            }
+        });
 
         NodeBuilder root = nodeStore.getRoot().builder();
         createNodes(root.setChildNode("test"), 10, 3);
@@ -276,6 +279,4 @@ public class CompactionAndCleanupTest {
             builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString());
         }
     }
-
-
 }