You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/01 11:39:32 UTC
svn commit: r1583583 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/blob/
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/segment/
test/java/org/apache/jackrabbit/o...
Author: chetanm
Date: Tue Apr 1 09:39:31 2014
New Revision: 1583583
URL: http://svn.apache.org/r1583583
Log:
OAK-1639 - MarkSweepGarbageCollector improvements
Refactoring the MarkSweepGarbageCollector.
-- Replaced creation of internal executor with explicitly passed executor
-- Document/Segment store would create new instance of
MarkSweepGarbageCollector for every invocation
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java Tue Apr 1 09:39:31 2014
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.blob;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
@@ -31,55 +32,36 @@ import org.apache.jackrabbit.oak.commons
* Also, manages any temporary files needed as well as external sorting.
*
*/
-class GarbageCollectorFileState {
-
- private static final String GC_DIR = "gc";
-
- private static final String MARKED_PREFIX = "marked";
-
- private static final String AVAIL_PREFIX = "avail";
-
- private static final String GC_CANDIDATE_PREFIX = "gccand";
-
- private static final String GC_PREFIX = "gc";
-
- /** The startTime which records the starting time. */
- private long startTime;
-
+class GarbageCollectorFileState implements Closeable{
/** The root of the gc file state directory. */
- private File home;
+ private final File home;
/** The marked references. */
- private File markedRefs;
+ private final File markedRefs;
/** The available references. */
- private File availableRefs;
+ private final File availableRefs;
/** The gc candidates. */
- private File gcCandidates;
+ private final File gcCandidates;
/** The garbage stores the garbage collection candidates which were not deleted . */
- private File garbage;
+ private final File garbage;
/**
* Instantiates a new garbage collector file state.
*
- * @param root
- * the root
- * @throws IOException
- * Signals that an I/O exception has occurred.
+ * @param root path of the root directory under which the
+ * files created during gc are stored
*/
public GarbageCollectorFileState(String root) throws IOException {
- init(root);
- }
-
- /**
- * Gets the home directory.
- *
- * @return the home
- */
- protected File getHome() {
- return home;
+ long startTime = System.currentTimeMillis();
+ home = new File(root, "gcworkdir-" + startTime);
+ markedRefs = new File(home, "marked-" + startTime);
+ availableRefs = new File(home,"avail-" + startTime);
+ gcCandidates = new File(home, "gccand-" + startTime);
+ garbage = new File(home, "gc-" + startTime);
+ FileUtils.forceMkdir(home);
}
/**
@@ -87,8 +69,8 @@ class GarbageCollectorFileState {
*
* @return the marked references
*/
- protected File getMarkedRefs() {
- return createMarkedRefsFile();
+ public File getMarkedRefs() {
+ return markedRefs;
}
/**
@@ -96,8 +78,8 @@ class GarbageCollectorFileState {
*
* @return the available references
*/
- protected File getAvailableRefs() {
- return createAvailableRefsFile();
+ public File getAvailableRefs() {
+ return availableRefs;
}
/**
@@ -105,8 +87,8 @@ class GarbageCollectorFileState {
*
* @return the gc candidates
*/
- protected File getGcCandidates() {
- return createGcCandidatesFile();
+ public File getGcCandidates() {
+ return gcCandidates;
}
/**
@@ -114,100 +96,17 @@ class GarbageCollectorFileState {
*
* @return the garbage
*/
- protected File getGarbage() {
- return createGarbageFile();
- }
-
- /**
- * Initialize the state.
- *
- * @param root
- * the root
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- private void init(String root) throws IOException {
- startTime = System.currentTimeMillis();
-
- home = new File(root, GC_DIR);
- FileUtils.forceMkdir(home);
- home.deleteOnExit();
- }
-
- /**
- * Creates the marked references file.
- *
- * @return the file
- */
- private File createMarkedRefsFile() {
- if (markedRefs == null) {
- markedRefs = new File(home,
- MARKED_PREFIX + "-" + startTime);
- markedRefs.deleteOnExit();
- }
- return markedRefs;
- }
-
- /**
- * Creates the available references file.
- *
- * @return the file
- */
- private File createAvailableRefsFile() {
- if (availableRefs == null) {
- availableRefs = new File(home,
- AVAIL_PREFIX + "-" + startTime);
- availableRefs.deleteOnExit();
- }
- return availableRefs;
- }
-
- /**
- * Creates the gc candidates file.
- *
- * @return the file
- */
- private File createGcCandidatesFile() {
- if (gcCandidates == null) {
- gcCandidates = new File(home,
- GC_CANDIDATE_PREFIX + "-" + startTime);
- gcCandidates.deleteOnExit();
- }
- return gcCandidates;
- }
-
- /**
- * Creates the garbage file.
- *
- * @return the file
- */
- private File createGarbageFile() {
- if (garbage == null) {
- garbage = new File(home,
- GC_PREFIX + "-" + startTime);
- garbage.deleteOnExit();
- }
+ public File getGarbage() {
return garbage;
}
/**
- * Creates a temp file.
- *
- * @return the file
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- protected File createTempFile() throws IOException {
- return File.createTempFile("temp", null, home);
- }
-
- /**
* Completes the process by deleting the files.
*
* @throws IOException
* Signals that an I/O exception has occurred.
*/
- protected void complete() throws IOException {
+ public void close() throws IOException {
if (!getGarbage().exists() ||
FileUtils.sizeOf(getGarbage()) == 0) {
FileUtils.deleteDirectory(home);
@@ -217,10 +116,7 @@ class GarbageCollectorFileState {
/**
* Sorts the given file externally.
*
- * @param file
- * the file
- * @throws IOException
- * Signals that an I/O exception has occurred.
+ * @param file file whose contents needs to be sorted
*/
public void sort(File file) throws IOException {
File sorted = createTempFile();
@@ -235,4 +131,8 @@ class GarbageCollectorFileState {
sorted, lexComparator, true);
Files.move(sorted, file);
}
+
+ private File createTempFile() throws IOException {
+ return File.createTempFile("temp", null, home);
+ }
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Tue Apr 1 09:39:31 2014
@@ -26,23 +26,22 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.base.StandardSystemProperty;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.jackrabbit.oak.commons.IOUtils;
@@ -54,6 +53,7 @@ import org.slf4j.LoggerFactory;
* Mark and sweep garbage collector.
*
* Uses the file system to store internal state while in process to account for huge data.
+ * This class is not thread safe.
*
*/
public class MarkSweepGarbageCollector implements BlobGarbageCollector {
@@ -66,127 +66,58 @@ public class MarkSweepGarbageCollector i
public static final int DEFAULT_BATCH_COUNT = 2048;
- public static final String NOT_RUNNING = "NotRunning";
-
- public static final String MARKING = "Running-Marking";
-
- public static final String SWEEPING = "Running-Sweeping";
+ public static enum State {NOT_RUNNING, MARKING, SWEEPING}
/** The last modified time before current time of blobs to consider for garbage collection. */
- private long maxLastModifiedInterval = TimeUnit.HOURS.toMillis(24);
+ private final long maxLastModifiedInterval;
/** Run concurrently when possible. */
- private boolean runConcurrently = true;
-
- /** The number of sweeper threads to use. */
- private int numSweepers = 1;
+ private final boolean runConcurrently;
/** The blob store to be garbage collected. */
- private GarbageCollectableBlobStore blobStore;
+ private final GarbageCollectableBlobStore blobStore;
/** Helper class to mark blob references which **/
- private BlobReferenceRetriever marker;
+ private final BlobReferenceRetriever marker;
/** The garbage collector file state */
- private GarbageCollectorFileState fs;
+ private final GarbageCollectorFileState fs;
- /** The configured root to store gc process files. */
- private String root = TEMP_DIR;
+ private final Executor executor;
/** The batch count. */
- private int batchCount = DEFAULT_BATCH_COUNT;
+ private final int batchCount;
/** Flag to indicate the state of the gc **/
- private String state = NOT_RUNNING;
-
- /**
- * Gets the max last modified interval considered for garbage collection.
- *
- * @return the max last modified interval
- */
- protected long getMaxLastModifiedInterval() {
- return maxLastModifiedInterval;
- }
+ private State state = State.NOT_RUNNING;
/**
- * Sets the max last modified interval considered for garbage collection.
- *
- * @param maxLastModifiedInterval the new max last modified interval
- */
- protected void setMaxLastModifiedInterval(long maxLastModifiedInterval) {
- this.maxLastModifiedInterval = maxLastModifiedInterval;
- }
-
- /**
- * Gets the root.
- *
- * @return the root
- */
- protected String getRoot() {
- return root;
- }
-
- /**
- * Gets the batch count.
- *
- * @return the batch count
- */
- protected int getBatchCount() {
- return batchCount;
- }
-
- /**
- * Checks if run concurrently.
- *
- * @return true, if is run concurrently
- */
- protected boolean isRunConcurrently() {
- return runConcurrently;
- }
-
- /**
- * Gets the number sweepers.
- *
- * @return the number sweepers
- */
- protected int getNumSweepers() {
- return numSweepers;
- }
-
- /**
- * Gets the state of the gc process.
- *
- * @return the state
- */
- protected String getState() {
- return state;
- }
-
- /**
- * @param marker
+ * Creates an instance of MarkSweepGarbageCollector
+ *
+ * @param marker BlobReferenceRetriever instanced used to fetch refereedd blob entries
* @param blobStore
- * @param root the root
- * @param batchCount the batch count
+ * @param root the root absolute path of directory under which temporary
+ * files would be created
+ * @param batchCount batch sized used for saving intermediate state
* @param runBackendConcurrently - run the backend iterate concurrently
- * @param maxSweeperThreads the max sweeper threads
- * @param maxLastModifiedInterval
+ * @param maxLastModifiedInterval - lastModifiedTime in millis. Only files with time
+ * less than this time would be considered for GC
* @throws IOException Signals that an I/O exception has occurred.
*/
public MarkSweepGarbageCollector(
BlobReferenceRetriever marker,
GarbageCollectableBlobStore blobStore,
+ Executor executor,
String root,
int batchCount,
boolean runBackendConcurrently,
- int maxSweeperThreads,
long maxLastModifiedInterval)
throws IOException {
+ this.executor = executor;
this.blobStore = blobStore;
this.marker = marker;
this.batchCount = batchCount;
- this.root = root;
this.runConcurrently = runBackendConcurrently;
- this.numSweepers = maxSweeperThreads;
this.maxLastModifiedInterval = maxLastModifiedInterval;
fs = new GarbageCollectorFileState(root);
}
@@ -200,13 +131,10 @@ public class MarkSweepGarbageCollector i
*/
public MarkSweepGarbageCollector(
BlobReferenceRetriever marker,
- GarbageCollectableBlobStore blobStore)
+ GarbageCollectableBlobStore blobStore,
+ Executor executor)
throws IOException {
- Preconditions.checkState(!Strings.isNullOrEmpty(root));
-
- this.blobStore = blobStore;
- this.marker = marker;
- fs = new GarbageCollectorFileState(root);
+ this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, TimeUnit.HOURS.toMillis(24));
}
@Override
@@ -215,56 +143,63 @@ public class MarkSweepGarbageCollector i
}
/**
+ * Gets the state of the gc process.
+ *
+ * @return the state
+ */
+ public State getState() {
+ return state;
+ }
+
+ /**
* Mark and sweep. Main method for GC.
*
* @throws Exception
* the exception
*/
- public void markAndSweep() throws Exception {
+ private void markAndSweep() throws IOException, InterruptedException {
+ boolean threw = true;
try {
Stopwatch sw = Stopwatch.createStarted();
LOG.info("Starting Blob garbage collection");
mark();
int deleteCount = sweep();
+ threw = false;
LOG.info("Blob garbage collection completed in {}. Number of blobs " +
"deleted [{}]", sw.toString(), deleteCount);
} finally {
- fs.complete();
- state = NOT_RUNNING;
+ Closeables.close(fs, threw);
+ state = State.NOT_RUNNING;
}
}
/**
* Mark phase of the GC.
- *
- * @throws Exception
- * the exception
*/
- private void mark() throws Exception {
- state = MARKING;
+ private void mark() throws IOException, InterruptedException {
+ state = State.MARKING;
LOG.debug("Starting mark phase of the garbage collector");
// Find all blobs available in the blob store
- Thread blobIdRetrieverThread = null;
+ ListenableFutureTask<Integer> blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever());
if (runConcurrently) {
- blobIdRetrieverThread = new Thread(new BlobIdRetriever(),
- this.getClass().getSimpleName() + "-MarkThread");
- blobIdRetrieverThread.setDaemon(true);
- blobIdRetrieverThread.start();
+ executor.execute(blobIdRetriever);
} else {
- (new BlobIdRetriever()).retrieve();
+ MoreExecutors.sameThreadExecutor().execute(blobIdRetriever);
}
// Find all blob references after iterating over the whole repository
iterateNodeTree();
- if (runConcurrently) {
- if (blobIdRetrieverThread.isAlive()) {
- blobIdRetrieverThread.join();
- }
+ try {
+ blobIdRetriever.get();
+ } catch (ExecutionException e) {
+ LOG.warn("Error occurred while fetching all the blobIds from the BlobStore. GC would " +
+ "continue with the blobIds retrieved so far", e.getCause());
}
+
difference();
LOG.debug("Ending mark phase of the garbage collector");
}
@@ -278,9 +213,9 @@ public class MarkSweepGarbageCollector i
private void difference() throws IOException {
LOG.debug("Starting difference phase of the garbage collector");
- FileLineDifferenceIterator<String> iter = new FileLineDifferenceIterator<String>(
+ FileLineDifferenceIterator iter = new FileLineDifferenceIterator(
fs.getMarkedRefs(),
- fs.getAvailableRefs());
+ fs.getAvailableRefs(), batchCount);
BufferedWriter bufferWriter = null;
try {
@@ -316,87 +251,54 @@ public class MarkSweepGarbageCollector i
*/
private int sweep() throws IOException {
int count = 0;
- try {
- state = SWEEPING;
- LOG.debug("Starting sweep phase of the garbage collector");
-
- ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
- ExecutorService executorService =
- new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(), 1,
- TimeUnit.MINUTES,
- new LinkedBlockingQueue<Runnable>(),
- new ThreadFactory() {
- private final AtomicInteger threadCounter = new AtomicInteger();
-
- private String getName() {
- return "MarkSweepGarbageCollector-Sweeper-" + threadCounter.getAndIncrement();
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, getName());
- thread.setDaemon(true);
- return thread;
- }
- });
-
- LineIterator iterator =
- FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name());
- List<String> ids = Lists.newArrayList();
+ state = State.SWEEPING;
+ LOG.debug("Starting sweep phase of the garbage collector");
- while (iterator.hasNext()) {
- ids.add(iterator.next());
-
- if (ids.size() > getBatchCount()) {
- count += ids.size();
- executorService.execute(new Sweeper(ids, exceptionQueue));
- ids = Lists.newArrayList();
- }
- }
- if (!ids.isEmpty()) {
+ ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
+
+ LineIterator iterator =
+ FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name());
+ List<String> ids = Lists.newArrayList();
+
+ while (iterator.hasNext()) {
+ ids.add(iterator.next());
+
+ if (ids.size() > getBatchCount()) {
count += ids.size();
- executorService.execute(new Sweeper(ids, exceptionQueue));
+ executor.execute(new Sweeper(ids, exceptionQueue));
+ ids = Lists.newArrayList();
}
-
- try {
- executorService.shutdown();
- executorService.awaitTermination(100, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- LOG.error("Exception while waiting for termination of the executor service. ExecutorService " +
- "would be immediately shutdown", e);
- executorService.shutdownNow();
- Thread.currentThread().interrupt();
- }
-
- count -= exceptionQueue.size();
- BufferedWriter writer = null;
- try {
- if (!exceptionQueue.isEmpty()) {
- writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
- saveBatchToFile(Lists.newArrayList(exceptionQueue), writer);
- }
- } finally {
- LineIterator.closeQuietly(iterator);
- IOUtils.closeQuietly(writer);
+ }
+ if (!ids.isEmpty()) {
+ count += ids.size();
+ executor.execute(new Sweeper(ids, exceptionQueue));
+ }
+
+ count -= exceptionQueue.size();
+ BufferedWriter writer = null;
+ try {
+ if (!exceptionQueue.isEmpty()) {
+ writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
+ saveBatchToFile(Lists.newArrayList(exceptionQueue), writer);
}
-
- LOG.debug("Ending sweep phase of the garbage collector");
} finally {
- fs.complete();
- state = NOT_RUNNING;
+ LineIterator.closeQuietly(iterator);
+ IOUtils.closeQuietly(writer);
+ }
+ if(!exceptionQueue.isEmpty()) {
+ LOG.warn("Unable to delete some blob entries from the blob store. Details around such blob entries " +
+ "can be found in [{}]", fs.getGarbage().getAbsolutePath());
}
+ LOG.debug("Ending sweep phase of the garbage collector");
return count;
}
+ private int getBatchCount() {
+ return batchCount;
+ }
+
/**
* Save batch to file.
- *
- * @param ids
- * the ids
- * @param writer
- * the writer
- * @throws IOException
- * Signals that an I/O exception has occurred.
*/
static void saveBatchToFile(List<String> ids, BufferedWriter writer) throws IOException {
writer.append(Joiner.on(NEWLINE).join(ids));
@@ -416,14 +318,6 @@ public class MarkSweepGarbageCollector i
/** The ids to sweep. */
private final List<String> ids;
- /**
- * Instantiates a new sweeper.
- *
- * @param ids
- * the ids
- * @param exceptionQueue
- * the exception queue
- */
public Sweeper(List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue) {
this.exceptionQueue = exceptionQueue;
this.ids = ids;
@@ -441,30 +335,25 @@ public class MarkSweepGarbageCollector i
exceptionQueue.addAll(ids);
}
} catch (Exception e) {
- LOG.error("Error occurred while deleting blob with ids [{}]", ids, e);
+ LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
exceptionQueue.addAll(ids);
}
}
}
/**
- * Iterates the complete node tree.
- *
- * @return the list
- * @throws Exception
- * the exception
+ * Iterates the complete node tree and collect all blob references
*/
- private void iterateNodeTree() throws Exception {
+ private void iterateNodeTree() throws IOException {
final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
try {
marker.collectReferences(
new ReferenceCollector() {
- private final List<String> idBatch = Lists
- .newArrayListWithCapacity(getBatchCount());
+ private final List<String> idBatch = Lists.newArrayListWithCapacity(getBatchCount());
private int count = 0;
- private boolean debugMode = LOG.isTraceEnabled();
+ private final boolean debugMode = LOG.isTraceEnabled();
@Override
public void addReference(String blobId) {
@@ -514,26 +403,19 @@ public class MarkSweepGarbageCollector i
/**
* BlobIdRetriever class to retrieve all blob ids.
*/
- class BlobIdRetriever implements Runnable {
+ private class BlobIdRetriever implements Callable<Integer> {
@Override
- public void run() {
- retrieve();
- }
-
- /**
- * Retrieve.
- */
- protected void retrieve() {
+ public Integer call() throws Exception {
LOG.debug("Starting retrieve of all blobs");
-
BufferedWriter bufferWriter = null;
+ int blobsCount = 0;
try {
bufferWriter = new BufferedWriter(
new FileWriter(fs.getAvailableRefs()));
Iterator<String> idsIter = blobStore.getAllChunkIds(maxLastModifiedInterval);
List<String> ids = Lists.newArrayList();
- int blobsCount = 0;
+
while (idsIter.hasNext()) {
ids.add(idsIter.next());
if (ids.size() > getBatchCount()) {
@@ -550,21 +432,19 @@ public class MarkSweepGarbageCollector i
// sort the file
fs.sort(fs.getAvailableRefs());
LOG.debug("Ending retrieving all blobs : {}", blobsCount);
- } catch (Exception e) {
- LOG.error("Error retrieving available blob ids", e);
} finally {
IOUtils.closeQuietly(bufferWriter);
}
+ return blobsCount;
}
+
+
}
/**
* FileLineDifferenceIterator class which iterates over the difference of 2 files line by line.
- *
- * @param <T>
- * the generic type
*/
- class FileLineDifferenceIterator<T> implements Iterator<String> {
+ static class FileLineDifferenceIterator implements Iterator<String> {
/** The marked references iterator. */
private final LineIterator markedIter;
@@ -574,6 +454,8 @@ public class MarkSweepGarbageCollector i
private final ArrayDeque<String> queue;
+ private final int batchSize;
+
private boolean done;
/** Temporary buffer. */
@@ -581,19 +463,14 @@ public class MarkSweepGarbageCollector i
/**
* Instantiates a new file line difference iterator.
- *
- * @param marked
- * the marked
- * @param available
- * the available
- * @throws IOException
- * Signals that an I/O exception has occurred.
*/
- public FileLineDifferenceIterator(File marked, File available) throws IOException {
+ public FileLineDifferenceIterator(File marked, File available, int batchSize) throws IOException {
this.markedIter = FileUtils.lineIterator(marked);
this.allIter = FileUtils.lineIterator(available);
- queue = new ArrayDeque<String>(getBatchCount());
+ this.batchSize = batchSize;
+ queue = new ArrayDeque<String>(batchSize);
markedBuffer = Sets.newTreeSet();
+
}
/**
@@ -658,16 +535,16 @@ public class MarkSweepGarbageCollector i
// the
// blob id set iteration is complete
while (allIter.hasNext() &&
- gcSet.size() < getBatchCount()) {
+ gcSet.size() < batchSize) {
TreeSet<String> allBuffer = new TreeSet<String>();
while (markedIter.hasNext() &&
- markedBuffer.size() < getBatchCount()) {
+ markedBuffer.size() < batchSize) {
String stre = markedIter.next();
markedBuffer.add(stre);
}
while (allIter.hasNext() &&
- allBuffer.size() < getBatchCount()) {
+ allBuffer.size() < batchSize) {
String stre = allIter.next();
allBuffer.add(stre);
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Tue Apr 1 09:39:31 2014
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document;
import java.io.InputStream;
+import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -25,6 +26,7 @@ import javax.sql.DataSource;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.MoreExecutors;
import com.mongodb.DB;
import org.apache.jackrabbit.mk.api.MicroKernel;
@@ -462,6 +464,7 @@ public class DocumentMK implements Micro
private long splitDocumentAgeMillis = 5 * 60 * 1000;
private long offHeapCacheSize = -1;
private Clock clock = Clock.SIMPLE;
+ private Executor executor;
public Builder() {
memoryCacheSize(DEFAULT_MEMORY_CACHE_SIZE);
@@ -731,6 +734,18 @@ public class DocumentMK implements Micro
return this;
}
+ public Executor getExecutor() {
+ if(executor == null){
+ return MoreExecutors.sameThreadExecutor();
+ }
+ return executor;
+ }
+
+ public Builder setExecutor(Executor executor){
+ this.executor = executor;
+ return this;
+ }
+
public Builder clock(Clock clock) {
this.clock = clock;
return this;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Apr 1 09:39:31 2014
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
@@ -288,7 +289,7 @@ public final class DocumentNodeStore
private final VersionGarbageCollector versionGarbageCollector;
- private final MarkSweepGarbageCollector blobGarbageCollector;
+ private final Executor executor;
public DocumentNodeStore(DocumentMK.Builder builder) {
this.blobStore = builder.getBlobStore();
@@ -303,6 +304,7 @@ public final class DocumentNodeStore
s = new LoggingDocumentStoreWrapper(s);
}
this.store = s;
+ this.executor = builder.getExecutor();
this.clock = builder.getClock();
int cid = builder.getClusterId();
cid = Integer.getInteger("oak.documentMK.clusterId", cid);
@@ -320,7 +322,6 @@ public final class DocumentNodeStore
this.branches = new UnmergedBranches(getRevisionComparator());
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(this);
- this.blobGarbageCollector = createBlobGarbageCollector(blobStore);
this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
@Override
public int getMemory() {
@@ -1607,19 +1608,17 @@ public final class DocumentNodeStore
* Creates and returns a MarkSweepGarbageCollector if the current BlobStore
* supports garbage collection
*
- * @param blobStore the blobStore used by DocumentNodeStore
- *
* @return garbage collector of the BlobStore supports GC otherwise null
*/
- @Nullable
- private MarkSweepGarbageCollector createBlobGarbageCollector(BlobStore blobStore) {
+ @CheckForNull
+ public MarkSweepGarbageCollector createBlobGarbageCollector() {
MarkSweepGarbageCollector blobGC = null;
if(blobStore instanceof GarbageCollectableBlobStore){
- blobGC = null;
try {
blobGC = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(this),
- (GarbageCollectableBlobStore) blobStore);
+ (GarbageCollectableBlobStore) blobStore,
+ executor);
} catch (IOException e) {
throw new RuntimeException("Error occurred while initializing " +
"the MarkSweepGarbageCollector",e);
@@ -1708,9 +1707,4 @@ public final class DocumentNodeStore
public VersionGarbageCollector getVersionGarbageCollector() {
return versionGarbageCollector;
}
-
- @CheckForNull
- public MarkSweepGarbageCollector getBlobGarbageCollector() {
- return blobGarbageCollector;
- }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Tue Apr 1 09:39:31 2014
@@ -48,17 +48,17 @@ import org.apache.jackrabbit.oak.osgi.Ob
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
-import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.RevisionGC;
import org.apache.jackrabbit.oak.spi.state.RevisionGCMBean;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
-import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
@@ -128,6 +128,7 @@ public class DocumentNodeStoreService {
private DocumentMK mk;
private ObserverTracker observerTracker;
private ComponentContext context;
+ private Whiteboard whiteboard;
private static final long DEFAULT_VER_GC_MAX_AGE = TimeUnit.DAYS.toSeconds(1);
@@ -140,6 +141,9 @@ public class DocumentNodeStoreService {
@Activate
protected void activate(ComponentContext context, Map<String, ?> config) throws Exception {
this.context = context;
+ this.whiteboard = new OsgiWhiteboard(context.getBundleContext());
+ this.executor = new WhiteboardExecutor();
+ executor.start(whiteboard);
if (blobStore == null &&
PropertiesUtil.toBoolean(prop(CUSTOM_BLOB_STORE), false)) {
@@ -191,12 +195,12 @@ public class DocumentNodeStoreService {
}
mkBuilder.setMongoDB(mongoDB, changesSize);
-
+ mkBuilder.setExecutor(executor);
mk = mkBuilder.open();
log.info("Connected to database {}", mongoDB);
- registerJMXBeans(mk.getNodeStore(), context.getBundleContext());
+ registerJMXBeans(mk.getNodeStore());
NodeStore store;
if (useMK) {
@@ -234,12 +238,14 @@ public class DocumentNodeStoreService {
unregisterNodeStore();
}
+ @SuppressWarnings("UnusedDeclaration")
protected void bindBlobStore(BlobStore blobStore) throws IOException {
log.info("Initializing DocumentNodeStore with BlobStore [{}]", blobStore);
this.blobStore = blobStore;
registerNodeStore();
}
+ @SuppressWarnings("UnusedDeclaration")
protected void unbindBlobStore(BlobStore blobStore) {
this.blobStore = null;
unregisterNodeStore();
@@ -264,23 +270,22 @@ public class DocumentNodeStoreService {
}
}
- private void registerJMXBeans(final DocumentNodeStore store, BundleContext context) throws IOException {
- Whiteboard wb = new OsgiWhiteboard(context);
+ private void registerJMXBeans(final DocumentNodeStore store) throws IOException {
registrations.add(
- registerMBean(wb,
+ registerMBean(whiteboard,
CacheStatsMBean.class,
store.getNodeCacheStats(),
CacheStatsMBean.TYPE,
store.getNodeCacheStats().getName()));
registrations.add(
- registerMBean(wb,
+ registerMBean(whiteboard,
CacheStatsMBean.class,
store.getNodeChildrenCacheStats(),
CacheStatsMBean.TYPE,
store.getNodeChildrenCacheStats().getName())
);
registrations.add(
- registerMBean(wb,
+ registerMBean(whiteboard,
CacheStatsMBean.class,
store.getDocChildrenCacheStats(),
CacheStatsMBean.TYPE,
@@ -290,7 +295,7 @@ public class DocumentNodeStoreService {
if (cl instanceof MemoryDiffCache) {
MemoryDiffCache mcl = (MemoryDiffCache) cl;
registrations.add(
- registerMBean(wb,
+ registerMBean(whiteboard,
CacheStatsMBean.class,
mcl.getDiffCacheStats(),
CacheStatsMBean.TYPE,
@@ -302,7 +307,7 @@ public class DocumentNodeStoreService {
if (ds instanceof CachingDocumentStore) {
CachingDocumentStore cds = (CachingDocumentStore) ds;
registrations.add(
- registerMBean(wb,
+ registerMBean(whiteboard,
CacheStatsMBean.class,
cds.getCacheStats(),
CacheStatsMBean.TYPE,
@@ -310,11 +315,16 @@ public class DocumentNodeStoreService {
);
}
- executor = new WhiteboardExecutor();
- executor.start(wb);
- MarkSweepGarbageCollector gc = store.getBlobGarbageCollector();
- if(gc != null){
- registrations.add(registerMBean(wb, BlobGCMBean.class, new BlobGC(gc, executor),
+
+
+ if (blobStore instanceof GarbageCollectableBlobStore) {
+ BlobGarbageCollector gc = new BlobGarbageCollector() {
+ @Override
+ public void collectGarbage() throws Exception {
+ store.createBlobGarbageCollector().collectGarbage();
+ }
+ };
+ registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
BlobGCMBean.TYPE, "Document node store blob garbage collection"));
}
@@ -324,7 +334,7 @@ public class DocumentNodeStoreService {
store.getVersionGarbageCollector().gc(versionGcMaxAgeInSecs, TimeUnit.SECONDS);
}
}, executor);
- registrations.add(registerMBean(wb, RevisionGCMBean.class, revisionGC,
+ registrations.add(registerMBean(whiteboard, RevisionGCMBean.class, revisionGC,
RevisionGCMBean.TYPE, "Document node store revision garbage collection"));
//TODO Register JMX bean for Off Heap Cache stats
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Tue Apr 1 09:39:31 2014
@@ -38,6 +38,7 @@ import org.apache.jackrabbit.oak.osgi.Ob
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
+import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -174,9 +175,17 @@ public class SegmentNodeStoreService ext
RevisionGCMBean.TYPE, "Segment node store revision garbage collection");
if (blobStore instanceof GarbageCollectableBlobStore) {
- MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
- new SegmentBlobReferenceRetriever(store.getTracker()),
- (GarbageCollectableBlobStore) blobStore);
+ BlobGarbageCollector gc = new BlobGarbageCollector() {
+ @Override
+ public void collectGarbage() throws Exception {
+ MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
+ new SegmentBlobReferenceRetriever(store.getTracker()),
+ (GarbageCollectableBlobStore) blobStore,
+ executor);
+ gc.collectGarbage();
+ }
+ };
+
blobGCRegistration = registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
BlobGCMBean.TYPE, "Segment node store blob garbage collection");
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Tue Apr 1 09:39:31 2014
@@ -28,6 +28,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.MoreExecutors;
import junit.framework.Assert;
import com.google.common.collect.Lists;
@@ -122,11 +123,13 @@ public class MongoBlobGCTest extends Abs
gc(set);
}
- private void gc(HashSet<String> set) throws IOException, Exception {
+ private void gc(HashSet<String> set) throws Exception {
DocumentNodeStore store = mk.getNodeStore();
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(store),
- (GarbageCollectableBlobStore) store.getBlobStore(), "./target", 2048, true, 2, 0);
+ (GarbageCollectableBlobStore) store.getBlobStore(),
+ MoreExecutors.sameThreadExecutor(),
+ "./target", 2048, true, 0);
gc.collectGarbage();
Set<String> existing = iterate();
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Tue Apr 1 09:39:31 2014
@@ -33,6 +33,7 @@ import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.core.data.FileDataStore;
import org.apache.jackrabbit.oak.api.Blob;
@@ -123,7 +124,9 @@ public class SegmentDataStoreBlobGCTest
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store.getTracker()),
- (GarbageCollectableBlobStore) store.getBlobStore(), "./target", 2048, true, 2, 0);
+ (GarbageCollectableBlobStore) store.getBlobStore(),
+ MoreExecutors.sameThreadExecutor(),
+ "./target", 2048, true, 0);
gc.collectGarbage();
Set<String> existing = iterate();