You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/12/15 13:36:36 UTC
svn commit: r1645637 [1/2] - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/
main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/
main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java...
Author: mduerig
Date: Mon Dec 15 12:36:36 2014
New Revision: 1645637
URL: http://svn.apache.org/r1645637
Log:
OAK-2192: Concurrent commit during compaction results in mixed segments
- patch v10 (see OAK-2192)
- Avoid deadlock in SegmentNodeStoreService
- Use correct constant values for the compaction strategy in SegmentNodeStoreService
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java
- copied, changed from r1645616, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/HeavyWriteIT.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java
Removed:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMapTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CompactionMap.java Mon Dec 15 12:36:36 2014
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Maps.newTreeMap;
import static com.google.common.collect.Sets.newTreeSet;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ALIGN_BITS;
import java.util.Map;
@@ -67,18 +68,27 @@ import java.util.UUID;
public class CompactionMap {
private final int compressInterval;
- private final Map<RecordId, RecordId> recent = newHashMap();
+ private final SegmentTracker tracker;
+
+ private Map<RecordId, RecordId> recent = newHashMap();
private long[] msbs = new long[0];
private long[] lsbs = new long[0];
- private int[] entryIndex = new int[0];
-
private short[] beforeOffsets = new short[0];
- private SegmentId[] afterSegmentIds = new SegmentId[0];
+
+ private int[] entryIndex = new int[0];
private short[] afterOffsets = new short[0];
- CompactionMap(int compressInterval) {
+ private int[] afterSegmentIds = new int[0];
+ private long[] amsbs = new long[0];
+ private long[] alsbs = new long[0];
+
+ private long prevWeight;
+ private CompactionMap prev;
+
+ CompactionMap(int compressInterval, SegmentTracker tracker) {
this.compressInterval = compressInterval;
+ this.tracker = tracker;
}
/**
@@ -90,7 +100,37 @@ public class CompactionMap {
* @return whether {@code before} was compacted to {@code after}
*/
boolean wasCompactedTo(RecordId before, RecordId after) {
- return after.equals(get(before));
+ return recursiveWasCompactedTo(before, after);
+ }
+
+ /**
+ * Given a record and a map I need to cycle down the #prev line to identify the compacted version.
+ *
+ * @param before before record identifier
+ * @param after after record identifier
+ * @return whether {@code before} was compacted to {@code after}
+ */
+ private boolean recursiveWasCompactedTo(RecordId before,
+ RecordId after) {
+ RecordId potentialAfter = recursiveGet(this, before);
+ if (potentialAfter == null) {
+ return false;
+ }
+ if (after.equals(potentialAfter)) {
+ return true;
+ }
+ return recursiveWasCompactedTo(potentialAfter, after);
+ }
+
+ private static RecordId recursiveGet(CompactionMap map, RecordId before) {
+ RecordId after = map.get(before);
+ if (after != null) {
+ return after;
+ }
+ if (map.prev != null) {
+ return recursiveGet(map.prev, before);
+ }
+ return null;
}
/**
@@ -100,10 +140,21 @@ public class CompactionMap {
* @param id segment identifier
* @return whether the identified segment was compacted
*/
- boolean wasCompacted(SegmentId id) {
+ public boolean wasCompacted(UUID id) {
long msb = id.getMostSignificantBits();
long lsb = id.getLeastSignificantBits();
- return findEntry(msb, lsb) != -1;
+ return wasCompacted(this, msb, lsb);
+ }
+
+ private static boolean wasCompacted(CompactionMap map, long msb, long lsb) {
+ int find = map.findEntry(msb, lsb);
+ if (find != -1) {
+ return true;
+ }
+ if (map.prev != null) {
+ return wasCompacted(map.prev, msb, lsb);
+ }
+ return false;
}
public RecordId get(RecordId before) {
@@ -112,6 +163,11 @@ public class CompactionMap {
return after;
}
+ //empty map
+ if (msbs.length == 0) {
+ return null;
+ }
+
SegmentId segmentId = before.getSegmentId();
long msb = segmentId.getMostSignificantBits();
long lsb = segmentId.getLeastSignificantBits();
@@ -122,12 +178,10 @@ public class CompactionMap {
int index = entryIndex[entry];
int limit = entryIndex[entry + 1];
for (int i = index; i < limit; i++) {
- int o = (beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS;
+ int o = decode(beforeOffsets[i]);
if (o == offset) {
// found it!
- return new RecordId(
- afterSegmentIds[i],
- (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS);
+ return new RecordId(asSegmentId(i), decode(afterOffsets[i]));
} else if (o > offset) {
return null;
}
@@ -137,12 +191,32 @@ public class CompactionMap {
return null;
}
+ private static int decode(short offset) {
+ return (offset & 0xffff) << RECORD_ALIGN_BITS;
+ }
+
+ private static short encode(int offset) {
+ return (short) (offset >> RECORD_ALIGN_BITS);
+ }
+
+ private SegmentId asSegmentId(int index) {
+ int idx = afterSegmentIds[index];
+ return new SegmentId(tracker, amsbs[idx], alsbs[idx]);
+ }
+
+ private static UUID asUUID(SegmentId id) {
+ return new UUID(id.getMostSignificantBits(),
+ id.getLeastSignificantBits());
+ }
+
/**
* Adds a new entry to the compaction map. Overwriting a previously
* added entry is not supported.
*/
void put(RecordId before, RecordId after) {
- assert get(before) == null;
+ if (get(before) != null) {
+ throw new IllegalArgumentException();
+ }
recent.put(before, after);
if (recent.size() >= compressInterval) {
compress();
@@ -150,6 +224,10 @@ public class CompactionMap {
}
void compress() {
+ if (recent.isEmpty()) {
+ // noop
+ return;
+ }
Set<UUID> uuids = newTreeSet();
Map<UUID, Map<Integer, RecordId>> mapping = newTreeMap();
@@ -180,9 +258,11 @@ public class CompactionMap {
int newEntries = beforeOffsets.length + recent.size();
short[] newBeforeOffsets = new short[newEntries];
- SegmentId[] newAfterSegmentIds = new SegmentId[newEntries];
short[] newAfterOffsets = new short[newEntries];
+ int[] newAfterSegmentIds = new int[newEntries];
+ Map<UUID, Integer> newAfterSegments = newHashMap();
+
int newIndex = 0;
int newEntry = 0;
int oldEntry = 0;
@@ -190,9 +270,10 @@ public class CompactionMap {
newmsbs[newEntry] = uuid.getMostSignificantBits();
newlsbs[newEntry] = uuid.getLeastSignificantBits();
- Map<Integer, RecordId> map = mapping.get(uuid);
- if (map == null) {
- map = newTreeMap();
+ // offset -> record
+ Map<Integer, RecordId> newsegment = mapping.get(uuid);
+ if (newsegment == null) {
+ newsegment = newTreeMap();
}
if (oldEntry < msbs.length
@@ -201,21 +282,29 @@ public class CompactionMap {
int index = entryIndex[oldEntry];
int limit = entryIndex[oldEntry + 1];
for (int i = index; i < limit; i++) {
- map.put((beforeOffsets[i] & 0xffff) << RECORD_ALIGN_BITS,
- new RecordId(
- afterSegmentIds[i],
- (afterOffsets[i] & 0xffff) << RECORD_ALIGN_BITS));
+ newsegment.put(decode(beforeOffsets[i]), new RecordId(
+ asSegmentId(i), decode(afterOffsets[i])));
}
oldEntry++;
}
newEntryIndex[newEntry++] = newIndex;
- for (Entry<Integer, RecordId> entry : map.entrySet()) {
+ for (Entry<Integer, RecordId> entry : newsegment.entrySet()) {
int key = entry.getKey();
RecordId id = entry.getValue();
- newBeforeOffsets[newIndex] = (short) (key >> RECORD_ALIGN_BITS);
- newAfterSegmentIds[newIndex] = id.getSegmentId();
- newAfterOffsets[newIndex] = (short) (id.getOffset() >> RECORD_ALIGN_BITS);
+ newBeforeOffsets[newIndex] = encode(key);
+ newAfterOffsets[newIndex] = encode(id.getOffset());
+
+ UUID aUUID = asUUID(id.getSegmentId());
+ int aSIdx = -1;
+ if (newAfterSegments.containsKey(aUUID)) {
+ aSIdx = newAfterSegments.get(aUUID);
+ } else {
+ aSIdx = newAfterSegments.size();
+ newAfterSegments.put(aUUID, aSIdx);
+ }
+ newAfterSegmentIds[newIndex] = aSIdx;
+
newIndex++;
}
}
@@ -227,10 +316,19 @@ public class CompactionMap {
this.entryIndex = newEntryIndex;
this.beforeOffsets = newBeforeOffsets;
- this.afterSegmentIds = newAfterSegmentIds;
this.afterOffsets = newAfterOffsets;
- recent.clear();
+ this.afterSegmentIds = newAfterSegmentIds;
+ this.amsbs = new long[newAfterSegments.size()];
+ this.alsbs = new long[newAfterSegments.size()];
+ for (Entry<UUID, Integer> entry : newAfterSegments.entrySet()) {
+ this.amsbs[entry.getValue()] = entry.getKey()
+ .getMostSignificantBits();
+ this.alsbs[entry.getValue()] = entry.getKey()
+ .getLeastSignificantBits();
+ }
+
+ recent = newHashMap();
}
/**
@@ -294,4 +392,77 @@ public class CompactionMap {
return -1;
}
+ /**
+ * TODO: merge the 2 maps (assume that 'prev' is bigger than the current map
+ * as it contains the entire history, but don't change any values as it
+ * might still be in use by other threads)
+ */
+ void merge(CompactionMap prev) {
+ this.prev = prev;
+ this.prevWeight = prev.getEstimatedWeight();
+ }
+
+ public String getCompactionStats() {
+ StringBuilder sb = new StringBuilder();
+ CompactionMap cm = this;
+ while (cm != null) {
+ sb.append("[");
+ sb.append(getCompactionStats(cm));
+ sb.append("], ");
+ cm = cm.prev;
+ }
+ return sb.toString();
+ }
+
+ private static String getCompactionStats(CompactionMap cm) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Estimated Weight: ");
+ sb.append(humanReadableByteCount(getEstimatedWeight(cm)));
+ sb.append(", Records: ");
+ sb.append(cm.afterOffsets.length);
+ sb.append(", Segments: ");
+ sb.append(cm.amsbs.length);
+ return sb.toString();
+ }
+
+ public long getEstimatedWeight() {
+ long total = 0;
+ CompactionMap cm = this;
+ while (cm != null) {
+ total += getEstimatedWeight(cm);
+ cm = cm.prev;
+ }
+ return total;
+ }
+
+ public long getLastMergeWeight() {
+ return this.prevWeight;
+ }
+
+ private static long getEstimatedWeight(CompactionMap cm) {
+ // estimation of the object including empty 'recent' map
+ long total = 168;
+
+ // msbs
+ total += 24 + cm.msbs.length * 8;
+ // lsbs
+ total += 24 + cm.lsbs.length * 8;
+ // beforeOffsets
+ total += 24 + cm.beforeOffsets.length * 2;
+
+ // entryIndex
+ total += 24 + cm.entryIndex.length * 4;
+ // afterOffsets
+ total += 24 + cm.afterOffsets.length * 2;
+
+ // afterSegmentIds
+ total += 24 + cm.afterSegmentIds.length * 4;
+ // amsbs
+ total += 24 + cm.amsbs.length * 8;
+ // alsbs
+ total += 24 + cm.alsbs.length * 8;
+
+ return total;
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Mon Dec 15 12:36:36 2014
@@ -52,11 +52,6 @@ public class Compactor {
private static final Logger log = LoggerFactory.getLogger(Compactor.class);
/**
- * over 64K in size, not will be included in the compaction map
- */
- private static final long threshold = 65536;
-
- /**
* Locks down the RecordId persistence structure
*/
static long[] recordAsKey(RecordId r) {
@@ -66,7 +61,7 @@ public class Compactor {
private final SegmentWriter writer;
- private CompactionMap map = new CompactionMap(100000);
+ private final CompactionMap map;
/**
* Map from {@link #getBlobKey(Blob) blob keys} to matching compacted
@@ -87,6 +82,7 @@ public class Compactor {
public Compactor(SegmentWriter writer, boolean cloneBinaries) {
this.writer = writer;
+ this.map = new CompactionMap(100000, writer.getTracker());
this.cloneBinaries = cloneBinaries;
}
@@ -98,7 +94,9 @@ public class Compactor {
}
public SegmentNodeState compact(NodeState before, NodeState after) {
- return process(before, after).getNodeState();
+ SegmentNodeState compacted = process(before, after).getNodeState();
+ writer.flush();
+ return compacted;
}
public CompactionMap getCompactionMap() {
@@ -117,7 +115,6 @@ public class Compactor {
return super.propertyAdded(compact(after));
}
-
@Override
public boolean propertyChanged(
PropertyState before, PropertyState after) {
@@ -143,7 +140,7 @@ public class Compactor {
if (success) {
SegmentNodeState state = writer.writeNode(child.getNodeState());
builder.setChildNode(name, state);
- if (id != null && includeInMap(state)) {
+ if (id != null) {
map.put(id, state.getRecordId());
}
}
@@ -151,23 +148,6 @@ public class Compactor {
return success;
}
- private boolean includeInMap(SegmentNodeState state) {
- if (state.getChildNodeCount(2) > 1) {
- return true;
- }
- long count = 0;
- for (PropertyState ps : state.getProperties()) {
- for (int i = 0; i < ps.count(); i++) {
- long size = ps.size(i);
- count += size;
- if (size >= threshold || count >= threshold) {
- return true;
- }
- }
- }
- return false;
- }
-
@Override
public boolean childNodeChanged(
String name, NodeState before, NodeState after) {
@@ -182,13 +162,15 @@ public class Compactor {
}
NodeBuilder child = builder.getChildNode(name);
- boolean success = after.compareAgainstBaseState(
- before, new CompactDiff(child));
+ boolean success = after.compareAgainstBaseState(before,
+ new CompactDiff(child));
- if (success && id != null && child.getChildNodeCount(2) > 1) {
- RecordId compactedId =
- writer.writeNode(child.getNodeState()).getRecordId();
- map.put(id, compactedId);
+ if (success) {
+ RecordId compactedId = writer.writeNode(child.getNodeState())
+ .getRecordId();
+ if (id != null) {
+ map.put(id, compactedId);
+ }
}
return success;
@@ -225,11 +207,6 @@ public class Compactor {
SegmentBlob sb = (SegmentBlob) blob;
try {
- // if the blob is inlined or external, just clone it
- if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) {
- return sb.clone(writer, cloneBinaries);
- }
-
// else check if we've already cloned this specific record
RecordId id = sb.getRecordId();
RecordId compactedId = map.get(id);
@@ -237,6 +214,13 @@ public class Compactor {
return new SegmentBlob(compactedId);
}
+ // if the blob is inlined or external, just clone it
+ if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) {
+ SegmentBlob clone = sb.clone(writer, cloneBinaries);
+ map.put(id, clone.getRecordId());
+ return clone;
+ }
+
// alternatively look if the exact same binary has been cloned
String key = getBlobKey(blob);
List<RecordId> ids = binaries.get(key);
@@ -269,7 +253,7 @@ public class Compactor {
return blob;
}
- private String getBlobKey(Blob blob) throws IOException {
+ private static String getBlobKey(Blob blob) throws IOException {
InputStream stream = blob.getNewStream();
try {
byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE];
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Mon Dec 15 12:36:36 2014
@@ -69,7 +69,7 @@ public class Segment {
* The number of bytes (or bits of address space) to use for the
* alignment boundary of segment records.
*/
- static final int RECORD_ALIGN_BITS = 2; // align at the four-byte boundary
+ public static final int RECORD_ALIGN_BITS = 2; // align at the four-byte boundary
/**
* Maximum segment size. Record identifiers are stored as three-byte
@@ -193,8 +193,10 @@ public class Segment {
if (!id.isDataSegmentId()) {
type = "bulk";
}
+ long delta = System.currentTimeMillis() - id.getCreationTime();
throw new IllegalStateException("RefId '" + index
- + "' doesn't exist in " + type + " segment " + id);
+ + "' doesn't exist in " + type + " segment " + id
+ + ". Creation date delta is " + delta + " ms.");
}
SegmentId refid = refids[index];
if (refid == null) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentId.java Mon Dec 15 12:36:36 2014
@@ -46,6 +46,8 @@ public class SegmentId implements Compar
private final long lsb;
+ private final long creationTime;
+
/**
* A reference to the segment object, if it is available in memory. It is
* used for fast lookup. The segment tracker will set or reset this field.
@@ -53,15 +55,17 @@ public class SegmentId implements Compar
// TODO: possibly we could remove the volatile
private volatile Segment segment;
- public SegmentId(SegmentTracker tracker, long msb, long lsb, Segment segment) {
+ private SegmentId(SegmentTracker tracker, long msb, long lsb,
+ Segment segment, long creationTime) {
this.tracker = tracker;
this.msb = msb;
this.lsb = lsb;
this.segment = segment;
+ this.creationTime = creationTime;
}
public SegmentId(SegmentTracker tracker, long msb, long lsb) {
- this(tracker, msb, lsb, null);
+ this(tracker, msb, lsb, null, System.currentTimeMillis());
}
/**
@@ -79,7 +83,7 @@ public class SegmentId implements Compar
* @return {@code true} for a bulk segment, {@code false} otherwise
*/
public boolean isBulkSegmentId() {
- return (lsb >>> 60) == 0xBL;
+ return (lsb >>> 60) == 0xBL;
}
public boolean equals(long msb, long lsb) {
@@ -117,18 +121,22 @@ public class SegmentId implements Compar
return tracker;
}
- //--------------------------------------------------------< Comparable >--
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ // --------------------------------------------------------< Comparable >--
@Override
public int compareTo(SegmentId that) {
int d = Long.valueOf(this.msb).compareTo(Long.valueOf(that.msb));
if (d == 0) {
- d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb));
+ d = Long.valueOf(this.lsb).compareTo(Long.valueOf(that.lsb));
}
return d;
}
- //------------------------------------------------------------< Object >--
+ // ------------------------------------------------------------< Object >--
@Override
public String toString() {
@@ -137,7 +145,13 @@ public class SegmentId implements Compar
@Override
public boolean equals(Object object) {
- return this == object;
+ if (this == object) {
+ return true;
+ } else if (object instanceof SegmentId) {
+ SegmentId that = (SegmentId) object;
+ return msb == that.msb && lsb == that.lsb;
+ }
+ return false;
}
@Override
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentIdTable.java Mon Dec 15 12:36:36 2014
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+
/**
* Hash table of weak references to segment identifiers.
*/
@@ -143,4 +145,24 @@ public class SegmentIdTable {
return ((int) lsb) & (references.size() - 1);
}
+ synchronized void clearSegmentIdTables(CompactionStrategy strategy) {
+ int size = references.size();
+ boolean dirty = false;
+ for (int i = 0; i < size; i++) {
+ WeakReference<SegmentId> reference = references.get(i);
+ if (reference != null) {
+ SegmentId id = reference.get();
+ if (id != null) {
+ if (strategy.canRemove(id)) {
+ reference.clear();
+ references.set(i, null);
+ dirty = true;
+ }
+ }
+ }
+ }
+ if (dirty) {
+ refresh();
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java Mon Dec 15 12:36:36 2014
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
@@ -105,6 +106,17 @@ public class SegmentNodeStore implements
this.maximumBackoff = max;
}
+ boolean locked(Callable<Boolean> c) throws Exception {
+ if (commitSemaphore.tryAcquire()) {
+ try {
+ return c.call();
+ } finally {
+ commitSemaphore.release();
+ }
+ }
+ return false;
+ }
+
/**
* Refreshes the head state. Should only be called while holding a
* permit from the {@link #commitSemaphore}.
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Mon Dec 15 12:36:36 2014
@@ -18,6 +18,12 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkState;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean;
+import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLEANUP_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLONE_BINARIES_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.PAUSE_DEFAULT;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import java.io.Closeable;
@@ -25,6 +31,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.concurrent.Callable;
import org.apache.commons.io.FilenameUtils;
import org.apache.felix.scr.annotations.Activate;
@@ -32,6 +39,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
@@ -42,6 +50,10 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -81,11 +93,27 @@ public class SegmentNodeStoreService ext
@Property(description="Cache size (MB)", intValue=256)
public static final String CACHE = "cache";
- @Property(description = "TarMK compaction paused flag", boolValue = true)
+ @Property(description = "TarMK compaction clone binaries flag", boolValue = CLONE_BINARIES_DEFAULT)
+ public static final String COMPACTION_CLONE_BINARIES = "compaction.cloneBinaries";
+
+ @Property(options = {
+ @PropertyOption(name = "CLEAN_ALL", value = "CLEAN_ALL"),
+ @PropertyOption(name = "CLEAN_NONE", value = "CLEAN_NONE"),
+ @PropertyOption(name = "CLEAN_OLD", value = "CLEAN_OLD") }, value = "CLEAN_OLD")
+ public static final String COMPACTION_CLEANUP = "compaction.cleanup";
+
+ @Property(description = "TarMK compaction strategy timestamp older (ms)", longValue = TIMESTAMP_DEFAULT)
+ public static final String COMPACTION_CLEANUP_TIMESTAMP = "compaction.cleanup.timestamp";
+
+ @Property(description = "TarMK compaction available memory multiplier needed to run compaction", byteValue = MEMORY_THRESHOLD_DEFAULT)
+ public static final String COMPACTION_MEMORY_THRESHOLD = "compaction.memoryThreshold";
+
+ @Property(description = "TarMK compaction paused flag", boolValue = PAUSE_DEFAULT)
public static final String PAUSE_COMPACTION = "pauseCompaction";
@Property(description = "Flag indicating that this component will not register as a NodeStore but just as a NodeStoreProvider", boolValue = false)
public static final String STANDBY = "standby";
+
/**
* Boolean value indicating a blobStore is to be used
*/
@@ -112,6 +140,7 @@ public class SegmentNodeStoreService ext
private Registration checkpointRegistration;
private Registration revisionGCRegistration;
private Registration blobGCRegistration;
+ private Registration compactionStrategyRegistration;
private WhiteboardExecutor executor;
private boolean customBlobStore;
@@ -126,20 +155,33 @@ public class SegmentNodeStoreService ext
this.context = context;
this.customBlobStore = Boolean.parseBoolean(lookup(context, CUSTOM_BLOB_STORE));
- if(blobStore == null && customBlobStore){
+ if (blobStore == null && customBlobStore) {
log.info("BlobStore use enabled. SegmentNodeStore would be initialized when BlobStore would be available");
- }else{
+ } else {
registerNodeStore();
}
}
- public synchronized void registerNodeStore()
- throws IOException {
- if(context == null){
- log.info("Component still not activated. Ignoring the initialization call");
- return;
+ public void registerNodeStore() throws IOException {
+ if (registerSegmentStore()) {
+ Dictionary<String, String> props = new Hashtable<String, String>();
+ props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
+
+ boolean standby = toBoolean(lookup(context, STANDBY), false);
+ providerRegistration = context.getBundleContext().registerService(
+ SegmentStoreProvider.class.getName(), this, props);
+ if (!standby) {
+ storeRegistration = context.getBundleContext().registerService(
+ NodeStore.class.getName(), this, props);
+ }
}
+ }
+ public synchronized boolean registerSegmentStore() throws IOException {
+ if (context == null) {
+ log.info("Component still not activated. Ignoring the initialization call");
+ return false;
+ }
Dictionary<?, ?> properties = context.getProperties();
name = String.valueOf(properties.get(NAME));
@@ -161,31 +203,49 @@ public class SegmentNodeStoreService ext
size = System.getProperty(SIZE, "256");
}
- boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION), true);
+ boolean pauseCompaction = toBoolean(lookup(context, PAUSE_COMPACTION),
+ PAUSE_DEFAULT);
+ boolean cloneBinaries = toBoolean(
+ lookup(context, COMPACTION_CLONE_BINARIES),
+ CLONE_BINARIES_DEFAULT);
+ long cleanupTs = toLong(lookup(context, COMPACTION_CLEANUP_TIMESTAMP),
+ TIMESTAMP_DEFAULT);
+ String cleanup = lookup(context, COMPACTION_CLEANUP);
+ if (cleanup == null) {
+ cleanup = CLEANUP_DEFAULT.toString();
+ }
+
+ String memoryThresholdS = lookup(context, COMPACTION_MEMORY_THRESHOLD);
+ byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT;
+ if (memoryThresholdS != null) {
+ memoryThreshold = Byte.valueOf(memoryThresholdS);
+ }
+ CompactionStrategy compactionStrategy = new CompactionStrategy(
+ pauseCompaction, cloneBinaries, CleanupType.valueOf(cleanup), cleanupTs,
+ memoryThreshold) {
+ @Override
+ public boolean compacted(Callable<Boolean> setHead) throws Exception {
+ // Need to guard against concurrent commits to avoid
+ // mixed segments. See OAK-2192.
+ return delegate.locked(setHead);
+ }
+ };
+
boolean memoryMapping = "64".equals(mode);
+ int cacheSize = Integer.parseInt(size);
if (customBlobStore) {
log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore);
- store = new FileStore(blobStore, new File(directory),
- Integer.parseInt(size), memoryMapping)
- .setPauseCompaction(pauseCompaction);
+ store = new FileStore(blobStore, new File(directory), cacheSize,
+ memoryMapping).setCompactionStrategy(compactionStrategy);
} else {
- store = new FileStore(new File(directory), Integer.parseInt(size),
- memoryMapping).setPauseCompaction(pauseCompaction);
+ store = new FileStore(new File(directory), cacheSize, memoryMapping)
+ .setCompactionStrategy(compactionStrategy);
}
delegate = new SegmentNodeStore(store);
observerTracker = new ObserverTracker(delegate);
observerTracker.start(context.getBundleContext());
- Dictionary<String, String> props = new Hashtable<String, String>();
- props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
-
- boolean standby = toBoolean(lookup(context, STANDBY), false);
- providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, props);
- if (!standby) {
- storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
- }
-
OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
executor = new WhiteboardExecutor();
executor.start(whiteboard);
@@ -218,7 +278,14 @@ public class SegmentNodeStoreService ext
BlobGCMBean.TYPE, "Segment node store blob garbage collection");
}
+ compactionStrategyRegistration = registerMBean(whiteboard,
+ CompactionStrategyMBean.class,
+ new DefaultCompactionStrategyMBean(compactionStrategy),
+ CompactionStrategyMBean.TYPE,
+ "Segment node store compaction strategy settings");
+
log.info("SegmentNodeStore initialized");
+ return true;
}
private static String lookup(ComponentContext context, String property) {
@@ -273,6 +340,10 @@ public class SegmentNodeStoreService ext
blobGCRegistration.unregister();
blobGCRegistration = null;
}
+ if (compactionStrategyRegistration != null) {
+ compactionStrategyRegistration.unregister();
+ compactionStrategyRegistration = null;
+ }
if (executor != null) {
executor.stop();
executor = null;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentTracker.java Mon Dec 15 12:36:36 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
import javax.annotation.Nonnull;
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +74,7 @@ public class SegmentTracker {
* identifiers and identifiers of the corresponding records
* after compaction.
*/
- private final AtomicReference<CompactionMap> compactionMap =
- new AtomicReference<CompactionMap>(new CompactionMap(1));
+ private final AtomicReference<CompactionMap> compactionMap;
private final long cacheSize;
@@ -101,6 +101,8 @@ public class SegmentTracker {
this.store = store;
this.writer = new SegmentWriter(store, this);
this.cacheSize = cacheSizeMB * MB;
+ this.compactionMap = new AtomicReference<CompactionMap>(
+ new CompactionMap(1, this));
}
public SegmentTracker(SegmentStore store) {
@@ -116,9 +118,16 @@ public class SegmentTracker {
}
Segment getSegment(SegmentId id) {
- Segment segment = store.readSegment(id);
- setSegment(id, segment);
- return segment;
+ try {
+ Segment segment = store.readSegment(id);
+ setSegment(id, segment);
+ return segment;
+ } catch (SegmentNotFoundException snfe) {
+ long delta = System.currentTimeMillis() - id.getCreationTime();
+ log.error("Segment not found: {}. Creation date delta is {} ms.",
+ id, delta);
+ throw snfe;
+ }
}
void setSegment(SegmentId id, Segment segment) {
@@ -157,11 +166,12 @@ public class SegmentTracker {
}
public void setCompactionMap(CompactionMap compaction) {
+ compaction.merge(compactionMap.get());
compactionMap.set(compaction);
}
@Nonnull
- CompactionMap getCompactionMap() {
+ public CompactionMap getCompactionMap() {
return compactionMap.get();
}
@@ -234,4 +244,10 @@ public class SegmentTracker {
return getSegmentId(msb, lsb);
}
+ public synchronized void clearSegmentIdTables(CompactionStrategy strategy) {
+ for (int i = 0; i < tables.length; i++) {
+ tables[i].clearSegmentIdTables(strategy);
+ }
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Mon Dec 15 12:36:36 2014
@@ -657,7 +657,7 @@ public class SegmentWriter {
return thisLevel.iterator().next();
}
- public MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) {
+ MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) {
if (base != null && base.isDiff()) {
Segment segment = base.getSegment();
RecordId key = segment.readRecordId(base.getOffset(8));
@@ -1094,7 +1094,7 @@ public class SegmentWriter {
&& store.containsSegment(((SegmentPropertyState) property).getRecordId().getSegmentId())) {
ids.add(((SegmentPropertyState) property).getRecordId());
} else if (!(before instanceof SegmentNodeState)
- || store.containsSegment(((SegmentNodeState) before).getRecordId().getSegmentId())) {
+ || store.containsSegment(before.getRecordId().getSegmentId())) {
ids.add(writeProperty(property));
} else {
// reuse previously stored property, if possible
@@ -1125,4 +1125,8 @@ public class SegmentWriter {
}
}
+ public SegmentTracker getTracker() {
+ return tracker;
+ }
+
}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java?rev=1645637&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategy.java Mon Dec 15 12:36:36 2014
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.compaction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
+
+import java.util.concurrent.Callable;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+
+public class CompactionStrategy {
+
+ public enum CleanupType {
+
+ /**
+ * {@code CLEAN_ALL} <em>must</em> be used in conjunction with {@code cloneBinaries}
+ * otherwise segments can go away ({@code SegmentNotFoundException})
+ * <p>
+ * Pros: best compaction results
+ * <p>
+ * Cons: larger repo size <em>during</em> compaction (2x). High chances that a currently
+ * running diff (e.g. observation) fails with {@code SegmentNotFoundException}.
+ */
+ CLEAN_ALL,
+
+ CLEAN_NONE,
+
+ /**
+ * {@code CLEAN_OLD} with {@code cloneBinaries}
+ * <p>
+ * Pros: better compaction results
+ * <p>
+ * Cons: larger repo size {@code during} compaction (2x). {@code SegmentNotFoundException}
+ * with insufficiently large values for {@code olderThan}.
+ * <p>
+ * {@code CLEAN_OLD} without {@code cloneBinaries}
+ * <p>
+ * Pros: weakest compaction results, smaller size during compaction (1x + size of
+ * data-segments).
+ * <p>
+ * Cons: {@code SegmentNotFoundException} with insufficiently large values for
+ * {@code olderThan}.
+ */
+ CLEAN_OLD
+ }
+
+ public static final boolean PAUSE_DEFAULT = true;
+
+ public static final boolean CLONE_BINARIES_DEFAULT = false;
+
+ public static final CleanupType CLEANUP_DEFAULT = CleanupType.CLEAN_OLD;
+
+ public static final long TIMESTAMP_DEFAULT = 1000 * 60 * 5;
+
+ public static final byte MEMORY_THRESHOLD_DEFAULT = 5;
+
+ private boolean paused;
+
+ private boolean cloneBinaries;
+
+ @Nonnull
+ private CleanupType cleanupType;
+
+ /**
+ * anything that has a lifetime bigger than this will be removed. a value of
+ * 0 (or very small) acts like a CLEANUP.NONE, a value of -1 (or negative)
+ * acts like a CLEANUP.ALL
+ *
+ */
+ private long olderThan;
+
+ private byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT;
+
+ private CompactionMap compactionMap;
+
+ private long compactionStart = currentTimeMillis();
+
+ public CompactionStrategy(boolean paused,
+ boolean cloneBinaries, @Nonnull CleanupType cleanupType, long olderThan, byte memoryThreshold) {
+ checkArgument(olderThan >= 0);
+ this.paused = paused;
+ this.cloneBinaries = cloneBinaries;
+ this.cleanupType = checkNotNull(cleanupType);
+ this.olderThan = olderThan;
+ this.memoryThreshold = memoryThreshold;
+ }
+
+ public boolean canRemove(SegmentId id) {
+ switch (cleanupType) {
+ case CLEAN_ALL:
+ return true;
+ case CLEAN_NONE:
+ return false;
+ case CLEAN_OLD:
+ return compactionStart - id.getCreationTime() > olderThan;
+ }
+ return false;
+ }
+
+ public boolean cloneBinaries() {
+ return cloneBinaries;
+ }
+
+ public boolean isPaused() {
+ return paused;
+ }
+
+ public void setPaused(boolean paused) {
+ this.paused = paused;
+ }
+
+ public void setCloneBinaries(boolean cloneBinaries) {
+ this.cloneBinaries = cloneBinaries;
+ }
+
+ public void setCleanupType(@Nonnull CleanupType cleanupType) {
+ this.cleanupType = checkNotNull(cleanupType);
+ }
+
+ public void setOlderThan(long olderThan) {
+ checkArgument(olderThan >= 0);
+ this.olderThan = olderThan;
+ }
+
+ public void setCompactionMap(@Nonnull CompactionMap compactionMap) {
+ this.compactionMap = checkNotNull(compactionMap);
+ }
+
+ String getCleanupType() {
+ return cleanupType.toString();
+ }
+
+ long getOlderThan() {
+ return olderThan;
+ }
+
+ @CheckForNull
+ public CompactionMap getCompactionMap() {
+ return this.compactionMap;
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultCompactionStrategy [pauseCompaction=" + paused
+ + ", cloneBinaries=" + cloneBinaries + ", cleanup=" + cleanupType
+ + ", olderThan=" + olderThan + ']';
+ }
+
+ public void setCompactionStart(long ms) {
+ this.compactionStart = ms;
+ }
+
+ public byte getMemoryThreshold() {
+ return memoryThreshold;
+ }
+
+ public void setMemoryThreshold(byte memoryThreshold) {
+ this.memoryThreshold = memoryThreshold;
+ }
+
+ public boolean compacted(@Nonnull Callable<Boolean> setHead) throws Exception {
+ return checkNotNull(setHead).call();
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java?rev=1645637&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/CompactionStrategyMBean.java Mon Dec 15 12:36:36 2014
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.compaction;
+
+public interface CompactionStrategyMBean {
+
+ String TYPE = "CompactionStrategy";
+
+ boolean isCloneBinaries();
+
+ void setCloneBinaries(boolean cloneBinaries);
+
+ boolean isPausedCompaction();
+
+ void setPausedCompaction(boolean pausedCompaction);
+
+ String getCleanupStrategy();
+
+ void setCleanupStrategy(String cleanup);
+
+ long getOlderThan();
+
+ void setOlderThan(long olderThan);
+
+ byte getMemoryThreshold();
+
+ void setMemoryThreshold(byte memory);
+
+ String getCompactionMapStats();
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java?rev=1645637&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/compaction/DefaultCompactionStrategyMBean.java Mon Dec 15 12:36:36 2014
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.compaction;
+
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
+
+public class DefaultCompactionStrategyMBean implements CompactionStrategyMBean {
+
+ private final CompactionStrategy strategy;
+
+ public DefaultCompactionStrategyMBean(CompactionStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public boolean isCloneBinaries() {
+ return strategy.cloneBinaries();
+ }
+
+ @Override
+ public void setCloneBinaries(boolean cloneBinaries) {
+ strategy.setCloneBinaries(cloneBinaries);
+ }
+
+ @Override
+ public boolean isPausedCompaction() {
+ return strategy.isPaused();
+ }
+
+ @Override
+ public void setPausedCompaction(boolean pausedCompaction) {
+ strategy.setPaused(pausedCompaction);
+ }
+
+ @Override
+ public String getCleanupStrategy() {
+ return strategy.getCleanupType();
+ }
+
+ @Override
+ public void setCleanupStrategy(String cleanup) {
+ strategy.setCleanupType(CleanupType.valueOf(cleanup));
+ }
+
+ @Override
+ public long getOlderThan() {
+ return strategy.getOlderThan();
+ }
+
+ @Override
+ public void setOlderThan(long olderThan) {
+ strategy.setOlderThan(olderThan);
+ }
+
+ @Override
+ public byte getMemoryThreshold() {
+ return strategy.getMemoryThreshold();
+ }
+
+ @Override
+ public void setMemoryThreshold(byte memory) {
+ strategy.setMemoryThreshold(memory);
+ }
+
+ @Override
+ public String getCompactionMapStats() {
+ CompactionMap cm = strategy.getCompactionMap();
+ if (cm != null) {
+ return cm.getCompactionStats();
+ }
+ return "";
+ }
+}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Mon Dec 15 12:36:36 2014
@@ -26,10 +26,10 @@ import static com.google.common.collect.
import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT;
import java.io.File;
import java.io.IOException;
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
@@ -53,6 +54,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
import org.apache.jackrabbit.oak.plugins.segment.Compactor;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
@@ -63,6 +65,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -86,7 +89,7 @@ public class FileStore implements Segmen
private static final String JOURNAL_FILE_NAME = "journal.log";
- private static final boolean MEMORY_MAPPING_DEFAULT =
+ static final boolean MEMORY_MAPPING_DEFAULT =
"64".equals(System.getProperty("sun.arch.data.model", "32"));
private final SegmentTracker tracker;
@@ -134,17 +137,15 @@ public class FileStore implements Segmen
*/
private final BackgroundThread compactionThread;
+ private CompactionStrategy compactionStrategy = new CompactionStrategy(
+ true, false, CLEAN_NONE, 0, MEMORY_THRESHOLD_DEFAULT);
+
/**
* Flag to request revision cleanup during the next flush.
*/
private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
/**
- * Flag to set the compaction on pause.
- */
- private volatile boolean pauseCompaction = true;
-
- /**
* List of old tar file generations that are waiting to be removed. They can
* not be removed immediately, because they first need to be closed, and the
* JVM needs to release the memory mapped file references.
@@ -257,39 +258,68 @@ public class FileStore implements Segmen
new Runnable() {
@Override
public void run() {
- log.info("TarMK compaction started");
- Stopwatch watch = Stopwatch.createStarted();
- CompactionGainEstimate estimate = estimateCompactionGain();
- long gain = estimate.estimateCompactionGain();
- if (gain >= 10) {
- log.info(
- "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction",
- watch, gain,
- estimate.getReachableSize(),
- estimate.getTotalSize(),
- humanReadableByteCount(estimate.getReachableSize()),
- humanReadableByteCount(estimate.getTotalSize()));
- if (!pauseCompaction) {
- compact();
- } else {
- log.info("TarMK compaction paused");
- }
- } else {
- log.info(
- "Estimated compaction in {}ms, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now",
- watch, gain,
- estimate.getReachableSize(),
- estimate.getTotalSize(),
- humanReadableByteCount(estimate.getReachableSize()),
- humanReadableByteCount(estimate.getTotalSize()));
- }
- cleanupNeeded.set(true);
+ maybeCompact(true);
}
});
log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
}
+ public boolean maybeCompact(boolean cleanup) {
+ log.info("TarMK compaction started");
+
+ Runtime runtime = Runtime.getRuntime();
+ long avail = runtime.totalMemory() - runtime.freeMemory();
+ long delta = 0;
+ if (compactionStrategy.getCompactionMap() != null) {
+ delta = compactionStrategy.getCompactionMap().getLastMergeWeight();
+ }
+ long needed = delta * compactionStrategy.getMemoryThreshold();
+ if (needed >= avail) {
+ log.info(
+ "Not enough available memory {}, needed {}, last merge delta {}, so skipping compaction for now",
+ humanReadableByteCount(avail),
+ humanReadableByteCount(needed),
+ humanReadableByteCount(delta));
+ if (cleanup) {
+ cleanupNeeded.set(true);
+ }
+ return false;
+ }
+
+ Stopwatch watch = Stopwatch.createStarted();
+ compactionStrategy.setCompactionStart(System.currentTimeMillis());
+ boolean compacted = false;
+
+ CompactionGainEstimate estimate = estimateCompactionGain();
+ long gain = estimate.estimateCompactionGain();
+ if (gain >= 10) {
+ log.info(
+ "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so running compaction",
+ watch, gain, estimate.getReachableSize(),
+ estimate.getTotalSize(),
+ humanReadableByteCount(estimate.getReachableSize()),
+ humanReadableByteCount(estimate.getTotalSize()));
+ if (!compactionStrategy.isPaused()) {
+ compact();
+ compacted = true;
+ } else {
+ log.info("TarMK compaction paused");
+ }
+ } else {
+ log.info(
+ "Estimated compaction in {}, gain is {}% ({}/{}) or ({}/{}), so skipping compaction for now",
+ watch, gain, estimate.getReachableSize(),
+ estimate.getTotalSize(),
+ humanReadableByteCount(estimate.getReachableSize()),
+ humanReadableByteCount(estimate.getTotalSize()));
+ }
+ if (cleanup) {
+ cleanupNeeded.set(true);
+ }
+ return compacted;
+ }
+
static Map<Integer, Map<Character, File>> collectFiles(File directory)
throws IOException {
Map<Integer, Map<Character, File>> dataFiles = newHashMap();
@@ -461,10 +491,10 @@ public class FileStore implements Segmen
}
writer.cleanup(ids);
- List<TarReader> list =
- newArrayListWithCapacity(readers.size());
+ CompactionMap cm = tracker.getCompactionMap();
+ List<TarReader> list = newArrayListWithCapacity(readers.size());
for (TarReader reader : readers) {
- TarReader cleaned = reader.cleanup(ids);
+ TarReader cleaned = reader.cleanup(ids, cm);
if (cleaned == reader) {
list.add(reader);
} else {
@@ -490,12 +520,11 @@ public class FileStore implements Segmen
* reference to them).
*/
public void compact() {
- long start = System.nanoTime();
- log.info("TarMK compaction running");
+ log.info("TarMK compaction running, strategy={}", compactionStrategy);
+ long start = System.currentTimeMillis();
SegmentWriter writer = new SegmentWriter(this, tracker);
- Compactor compactor = new Compactor(writer);
-
+ final Compactor compactor = new Compactor(writer, compactionStrategy.cloneBinaries());
SegmentNodeState before = getHead();
long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
.getChildNodeCount(Long.MAX_VALUE);
@@ -506,26 +535,22 @@ public class FileStore implements Segmen
}
SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
- writer.flush();
- while (!setHead(before, after)) {
- // Some other concurrent changes have been made.
- // Rebase (and compact) those changes on top of the
- // compacted state before retrying to set the head.
- SegmentNodeState head = getHead();
- after = compactor.compact(before, head);
- before = head;
- writer.flush();
- }
- tracker.setCompactionMap(compactor.getCompactionMap());
-
- // Drop the SegmentWriter caches and flush any existing state
- // in an attempt to prevent new references to old pre-compacted
- // content. TODO: There should be a cleaner way to do this.
- tracker.getWriter().dropCache();
- tracker.getWriter().flush();
- log.info("TarMK compaction completed in {}ms", MILLISECONDS
- .convert(System.nanoTime() - start, NANOSECONDS));
+ Callable<Boolean> setHead = new SetHead(before, after, compactor);
+ try {
+ while(!compactionStrategy.compacted(setHead)) {
+ // Some other concurrent changes have been made.
+ // Rebase (and compact) those changes on top of the
+ // compacted state before retrying to set the head.
+ SegmentNodeState head = getHead();
+ after = compactor.compact(after, head);
+ setHead = new SetHead(head, after, compactor);
+ }
+ log.info("TarMK compaction completed in {}ms",
+ System.currentTimeMillis() - start);
+ } catch (Exception e) {
+ log.error("Error while running TarMK compaction", e);
+ }
}
public synchronized Iterable<SegmentId> getSegmentIds() {
@@ -741,8 +766,8 @@ public class FileStore implements Segmen
return emptyMap();
}
- public FileStore setPauseCompaction(boolean pauseCompaction) {
- this.pauseCompaction = pauseCompaction;
+ public FileStore setCompactionStrategy(CompactionStrategy strategy) {
+ this.compactionStrategy = strategy;
return this;
}
@@ -810,4 +835,37 @@ public class FileStore implements Segmen
}
+ private class SetHead implements Callable<Boolean> {
+ private final SegmentNodeState before;
+ private final SegmentNodeState after;
+ private final Compactor compactor;
+
+ public SetHead(SegmentNodeState before, SegmentNodeState after, Compactor compactor) {
+ this.before = before;
+ this.after = after;
+ this.compactor = compactor;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ // When used in conjunction with the SegmentNodeStore, this method
+ // needs to be called inside the commitSemaphore as doing otherwise
+ // might result in mixed segments. See OAK-2192.
+ if (setHead(before, after)) {
+ CompactionMap cm = compactor.getCompactionMap();
+ tracker.setCompactionMap(cm);
+ compactionStrategy.setCompactionMap(cm);
+
+ // Drop the SegmentWriter caches and flush any existing state
+ // in an attempt to prevent new references to old pre-compacted
+ // content. TODO: There should be a cleaner way to do this.
+ tracker.getWriter().dropCache();
+ tracker.getWriter().flush();
+ tracker.clearSegmentIdTables(compactionStrategy);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1645637&r1=1645636&r2=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java Mon Dec 15 12:36:36 2014
@@ -46,6 +46,7 @@ import java.util.regex.Pattern;
import java.util.zip.CRC32;
import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.segment.CompactionMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +148,7 @@ class TarReader {
if (reader != null) {
return reader;
} else {
- throw new IOException("Failed to open recoved tar file " + file);
+ throw new IOException("Failed to open recovered tar file " + file);
}
}
@@ -616,12 +617,15 @@ class TarReader {
* @return this (if the file is kept as is), or the new generation file, or
* null if the file is fully garbage
*/
- synchronized TarReader cleanup(Set<UUID> referencedIds) throws IOException {
+ synchronized TarReader cleanup(Set<UUID> referencedIds, CompactionMap cm) throws IOException {
+ if (referencedIds.isEmpty()) {
+ return null;
+ }
+
Map<UUID, List<UUID>> graph = null;
if (this.graph != null) {
graph = parseGraph();
}
-
TarEntry[] sorted = new TarEntry[index.remaining() / 24];
int position = index.position();
for (int i = 0; position < index.limit(); i++) {
@@ -643,16 +647,27 @@ class TarReader {
// this segment is not referenced anywhere
sorted[i] = null;
} else {
- size += getEntrySize(entry.size());
- count += 1;
-
if (isDataSegmentId(entry.lsb())) {
+ size += getEntrySize(entry.size());
+ count += 1;
+
// this is a referenced data segment, so follow the graph
if (graph != null) {
List<UUID> refids = graph.get(
new UUID(entry.msb(), entry.lsb()));
if (refids != null) {
- referencedIds.addAll(refids);
+ for (UUID r : refids) {
+ if (isDataSegmentId(r.getLeastSignificantBits())) {
+ referencedIds.add(r);
+ } else {
+ if (cm != null && cm.wasCompacted(id)) {
+ // skip bulk compacted segment
+ // references
+ } else {
+ referencedIds.add(r);
+ }
+ }
+ }
}
} else {
// a pre-compiled graph is not available, so read the
@@ -664,11 +679,27 @@ class TarReader {
int refcount = segment.get(pos + REF_COUNT_OFFSET) & 0xff;
int refend = pos + 16 * (refcount + 1);
for (int refpos = pos + 16; refpos < refend; refpos += 16) {
- referencedIds.add(new UUID(
- segment.getLong(refpos),
- segment.getLong(refpos + 8)));
+ UUID r = new UUID(segment.getLong(refpos),
+ segment.getLong(refpos + 8));
+ if (isDataSegmentId(r.getLeastSignificantBits())) {
+ referencedIds.add(r);
+ } else {
+ if (cm != null && cm.wasCompacted(id)) {
+ // skip bulk compacted segment references
+ } else {
+ referencedIds.add(r);
+ }
+ }
}
}
+ } else {
+ // bulk segments compaction check
+ if (cm != null && cm.wasCompacted(id)) {
+ sorted[i] = null;
+ } else {
+ size += getEntrySize(entry.size());
+ count += 1;
+ }
}
}
}
Copied: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java (from r1645616, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java?p2=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java&p1=jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java&r1=1645616&r2=1645637&rev=1645637&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionAndCleanupTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupTest.java Mon Dec 15 12:36:36 2014
@@ -17,9 +17,13 @@
* under the License.
*/
-package org.apache.jackrabbit.oak.plugins.segment.file;
+package org.apache.jackrabbit.oak.plugins.segment;
import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
+import static org.apache.commons.io.FileUtils.deleteDirectory;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_NONE;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType.CLEAN_OLD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -32,17 +36,17 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.io.FileUtils;
+import com.google.common.io.ByteStreams;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.plugins.segment.Compactor;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
-import org.apache.jackrabbit.oak.plugins.segment.SegmentPropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
@@ -68,79 +72,113 @@ public class CompactionAndCleanupTest {
@Test
@Ignore("OAK-2045")
- public void compactionAndWeakReferenceMagic() throws Exception{
+ public void compactionAndWeakReferenceMagic() throws Exception {
final int MB = 1024 * 1024;
final int blobSize = 5 * MB;
- FileStore fileStore = new FileStore(directory, 1, 1, false);
+ final int dataNodes = 10000;
+
+ // really long time span, no binary cloning
+ CompactionStrategy custom = new CompactionStrategy(false,
+ false, CLEAN_OLD, TimeUnit.HOURS.toMillis(1), (byte) 0);
+
+ FileStore fileStore = new FileStore(directory, 1);
SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
+ fileStore.setCompactionStrategy(custom);
+
+ // 1a. Create a bunch of data
+ NodeBuilder extra = nodeStore.getRoot().builder();
+ NodeBuilder content = extra.child("content");
+ for (int i = 0; i < dataNodes; i++) {
+ NodeBuilder c = content.child("c" + i);
+ for (int j = 0; j < 1000; j++) {
+ c.setProperty("p" + i, "v" + i);
+ }
+ }
+ nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ // ----
- //1. Create a property with 5 MB blob
+ final long dataSize = fileStore.size();
+ System.out.printf("File store dataSize %s%n",
+ byteCountToDisplaySize(dataSize));
+
+ // 1. Create a property with 5 MB blob
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("a1", createBlob(nodeStore, blobSize));
builder.setProperty("b", "foo");
- //Keep a reference to this nodeState to simulate long
- //running session
- NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
- System.out.printf("File store pre removal %d%n", mb(fileStore.size()));
- assertEquals(mb(fileStore.size()), mb(blobSize));
-
+ // Keep a reference to this nodeState to simulate long
+ // running session
+ NodeState ns1 = nodeStore.merge(builder, EmptyHook.INSTANCE,
+ CommitInfo.EMPTY);
+
+ System.out.printf("File store pre removal %s expecting %s %n",
+ byteCountToDisplaySize(fileStore.size()),
+ byteCountToDisplaySize(blobSize + dataSize));
+ assertEquals(mb(blobSize + dataSize), mb(fileStore.size()));
- //2. Now remove the property
+ // 2. Now remove the property
builder = nodeStore.getRoot().builder();
builder.removeProperty("a1");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- //Size remains same
- System.out.printf("File store pre compaction %d%n", mb(fileStore.size()));
- assertEquals(mb(fileStore.size()), mb(blobSize));
-
- //3. Compact
- fileStore.compact();
-
- //Size still remains same
- System.out.printf("File store post compaction %d%n", mb(fileStore.size()));
- assertEquals(mb(fileStore.size()), mb(blobSize));
+ // Size remains same
+ System.out.printf("File store pre compaction %s expecting %s%n",
+ byteCountToDisplaySize(fileStore.size()),
+ byteCountToDisplaySize(blobSize + dataSize));
+ assertEquals(mb(blobSize + dataSize), mb(fileStore.size()));
+
+ // 3. Compact
+ assertTrue(fileStore.maybeCompact(false));
+ // fileStore.cleanup();
+
+ // Size still remains same
+ System.out.printf("File store post compaction %s expecting %s%n",
+ byteCountToDisplaySize(fileStore.size()),
+ byteCountToDisplaySize(blobSize + dataSize));
+ assertEquals("File store post compaction size",
+ mb(blobSize + dataSize), mb(fileStore.size()));
- //4. Add some more property to flush the current TarWriter
+ // 4. Add some more property to flush the current TarWriter
builder = nodeStore.getRoot().builder();
builder.setProperty("a2", createBlob(nodeStore, blobSize));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- //Size is double
- System.out.printf("File store pre cleanup %d%n", mb(fileStore.size()));
- assertEquals(mb(fileStore.size()), 2 * mb(blobSize));
-
- //5. Cleanup
- cleanup(fileStore);
-
- //Size is still double. Deleted space not reclaimed
- System.out.printf("File store post cleanup %d%n", mb(fileStore.size()));
- assertEquals(mb(fileStore.size()), 2 * mb(blobSize));
-
- //6. Null out any hard reference
- ns1 = null;
- builder = null;
- cleanup(fileStore);
-
- //Size should not come back to 5 and deleted data
- //space reclaimed
- System.out.printf("File store post cleanup and nullification %d%n", mb(fileStore.size()));
- assertEquals(mb(fileStore.size()), mb(blobSize));
+ // Size is double
+ System.out.printf("File store pre cleanup %s expecting %s%n",
+ byteCountToDisplaySize(fileStore.size()),
+ byteCountToDisplaySize(2 * blobSize + dataSize));
+ assertEquals(mb(2 * blobSize + dataSize), mb(fileStore.size()));
+
+ // 5. Cleanup
+ assertTrue(fileStore.maybeCompact(false));
+ fileStore.cleanup();
+
+ System.out.printf(
+ "File store post cleanup %s expecting between [%s,%s]%n",
+ byteCountToDisplaySize(fileStore.size()),
+ byteCountToDisplaySize(blobSize + dataSize),
+ byteCountToDisplaySize(blobSize + 2 * dataSize));
+
+ // 0 data size: fileStore.size() == blobSize
+ // >0 data size: fileStore.size() in [blobSize + dataSize, blobSize +
+ // 2xdataSize]
+ assertTrue(mb(fileStore.size()) >= mb(blobSize + dataSize)
+ && mb(fileStore.size()) <= mb(blobSize + 2 * dataSize));
+
+ // refresh the ts ref, to simulate a long wait time
+ custom.setOlderThan(0);
+ assertFalse(fileStore.maybeCompact(false));
+
+ // no data loss happened
+ byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot()
+ .getProperty("a2").getValue(Type.BINARY).getNewStream());
+ assertEquals(blobSize, blob.length);
}
@After
public void cleanDir() throws IOException {
- FileUtils.deleteDirectory(directory);
- }
-
- private static void cleanup(FileStore fileStore) throws IOException {
- fileStore.getTracker().setCompactionMap(new Compactor(null).getCompactionMap());
- fileStore.getTracker().getWriter().dropCache();
-
- fileStore.cleanup();
+ deleteDirectory(directory);
}
private static Blob createBlob(NodeStore nodeStore, int size) throws IOException {
@@ -154,50 +192,15 @@ public class CompactionAndCleanupTest {
}
@Test
- public void testGainEstimator() throws Exception {
- final int MB = 1024 * 1024;
- final int blobSize = 2 * MB;
-
- FileStore fileStore = new FileStore(directory, 2, false);
- SegmentNodeStore nodeStore = new SegmentNodeStore(fileStore);
-
- // 1. Create some blob properties
- NodeBuilder builder = nodeStore.getRoot().builder();
-
- NodeBuilder c1 = builder.child("c1");
- c1.setProperty("a", createBlob(nodeStore, blobSize));
- c1.setProperty("b", "foo");
-
- NodeBuilder c2 = builder.child("c2");
- c2.setProperty("a", createBlob(nodeStore, blobSize));
- c2.setProperty("b", "foo");
-
- NodeBuilder c3 = builder.child("c3");
- c3.setProperty("a", createBlob(nodeStore, blobSize));
- c3.setProperty("b", "foo");
- nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
- // 2. Now remove the property
- builder = nodeStore.getRoot().builder();
- builder.child("c1").remove();
- builder.child("c2").remove();
- nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-
- fileStore.flush();
- try {
- // should be at 66%
- assertTrue(fileStore.estimateCompactionGain()
- .estimateCompactionGain() > 60);
- } finally {
- fileStore.close();
- }
- }
-
- @Ignore("OAK-2192") // FIXME OAK-2192
- @Test
public void testMixedSegments() throws Exception {
FileStore store = new FileStore(directory, 2, false);
final SegmentNodeStore nodeStore = new SegmentNodeStore(store);
+ store.setCompactionStrategy(new CompactionStrategy(true, false, CLEAN_NONE, 0, (byte) 5) {
+ @Override
+ public boolean compacted(Callable<Boolean> setHead) throws Exception {
+ return nodeStore.locked(setHead);
+ }
+ });
NodeBuilder root = nodeStore.getRoot().builder();
createNodes(root.setChildNode("test"), 10, 3);
@@ -276,6 +279,4 @@ public class CompactionAndCleanupTest {
builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString());
}
}
-
-
}