You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/07/20 03:57:35 UTC
svn commit: r1753435 - in /jackrabbit/oak/trunk:
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Author: amitj
Date: Wed Jul 20 03:57:34 2016
New Revision: 1753435
URL: http://svn.apache.org/viewvc?rev=1753435&view=rev
Log:
OAK-4200: [BlobGC] Improve collection times of blobs available
* MarkSweepGarbageCollector code cleanup
* Removing code duplication
* Reusing methods from FileIOUtils
Modified:
jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
Modified: jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java?rev=1753435&r1=1753434&r2=1753435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java (original)
+++ jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java Wed Jul 20 03:57:34 2016
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
+import com.google.common.base.Strings;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
@@ -192,6 +193,23 @@ public final class FileIOUtils {
*/
public static int writeStrings(Iterator<String> iterator, File f, boolean escape)
throws IOException {
+ return writeStrings(iterator, f, escape, null, "");
+ }
+
+ /**
+ * Writes string from the given iterator to the given file and optionally
+ * escape the written strings for line breaks.
+ *
+ * @param iterator the source of the strings
+ * @param f file to write to
+ * @param escape escape whether to escape for line breaks
+ * @param logger logger to log progress
+ * @param message message to log
+ * @return
+ * @throws IOException
+ */
+ public static int writeStrings(Iterator<String> iterator, File f, boolean escape,
+ @Nullable Logger logger, @Nullable String message) throws IOException {
BufferedWriter writer = newWriter(f, UTF_8);
boolean threw = true;
@@ -200,6 +218,11 @@ public final class FileIOUtils {
while (iterator.hasNext()) {
writeAsLine(writer, iterator.next(), escape);
count++;
+ if (logger != null) {
+ if (count % 1000 == 0) {
+ logger.info(Strings.nullToEmpty(message) + count);
+ }
+ }
}
threw = false;
} finally {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1753435&r1=1753434&r2=1753435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Wed Jul 20 03:57:34 2016
@@ -20,20 +20,17 @@ import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.sql.Timestamp;
+import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -46,6 +43,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.StandardSystemProperty;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
@@ -57,7 +55,6 @@ import org.apache.jackrabbit.core.data.D
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.FileIOUtils;
import org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator;
-import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
@@ -310,37 +307,11 @@ public class MarkSweepGarbageCollector i
fs.getMarkedRefs(),
fs.getAvailableRefs(),
transformer);
- calculateDifference(fs, iter);
+ int candidates = FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true);
+ LOG.debug("Found candidates - " + candidates);
LOG.debug("Ending difference phase of the garbage collector");
}
-
- private long calculateDifference(GarbageCollectorFileState fs, FileLineDifferenceIterator iter) throws IOException {
- long numCandidates = 0;
- BufferedWriter bufferWriter = null;
- try {
- bufferWriter = Files.newWriter(fs.getGcCandidates(), Charsets.UTF_8);
- List<String> expiredSet = newArrayList();
-
- while (iter.hasNext()) {
- expiredSet.add(iter.next());
- if (expiredSet.size() > getBatchCount()) {
- numCandidates += expiredSet.size();
- saveBatchToFile(expiredSet, bufferWriter);
- }
- }
-
- if (!expiredSet.isEmpty()) {
- numCandidates += expiredSet.size();
- saveBatchToFile(expiredSet, bufferWriter);
- }
- LOG.debug("Found candidates - " + numCandidates);
- } finally {
- IOUtils.closeQuietly(bufferWriter);
- IOUtils.closeQuietly(iter);
- }
- return numCandidates;
- }
/**
* Sweep phase of gc candidate deletion.
@@ -408,29 +379,18 @@ public class MarkSweepGarbageCollector i
LineIterator iterator = null;
try {
removesWriter = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
- ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
+ ArrayDeque<String> removesQueue = new ArrayDeque<String>();
iterator =
FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name());
- List<String> ids = newArrayList();
- while (iterator.hasNext()) {
- ids.add(iterator.next());
-
- if (ids.size() >= getBatchCount()) {
- count += ids.size();
- deleted += BlobCollectionType.get(blobStore)
- .sweepInternal(blobStore,ids, exceptionQueue, lastMaxModifiedTime);
- ids = newArrayList();
- saveBatchToFile(newArrayList(exceptionQueue), removesWriter);
- exceptionQueue.clear();
- }
- }
- if (!ids.isEmpty()) {
+ Iterator<List<String>> partitions = Iterators.partition(iterator, getBatchCount());
+ while (partitions.hasNext()) {
+ List<String> ids = partitions.next();
count += ids.size();
deleted += BlobCollectionType.get(blobStore)
- .sweepInternal(blobStore, ids, exceptionQueue, lastMaxModifiedTime);
- saveBatchToFile(newArrayList(exceptionQueue), removesWriter);
- exceptionQueue.clear();
+ .sweepInternal(blobStore, ids, removesQueue, lastMaxModifiedTime);
+ saveBatchToFile(newArrayList(removesQueue), removesWriter);
+ removesQueue.clear();
}
} finally {
LineIterator.closeQuietly(iterator);
@@ -469,7 +429,6 @@ public class MarkSweepGarbageCollector i
for (String id : ids) {
FileIOUtils.writeAsLine(writer, id, true);
}
- ids.clear();
writer.flush();
}
@@ -484,42 +443,34 @@ public class MarkSweepGarbageCollector i
try {
marker.collectReferences(
new ReferenceCollector() {
- private final List<String> idBatch = Lists.newArrayListWithCapacity(getBatchCount());
-
private final boolean debugMode = LOG.isTraceEnabled();
@Override
- public void addReference(String blobId, String nodeId) {
+ public void addReference(String blobId, final String nodeId) {
if (debugMode) {
LOG.trace("BlobId : {}, NodeId : {}", blobId, nodeId);
}
try {
Iterator<String> idIter = blobStore.resolveChunks(blobId);
- Joiner delimJoiner = Joiner.on(DELIM).skipNulls();
- while (idIter.hasNext()) {
- String id = idIter.next();
-
- if (logPath) {
- idBatch.add(delimJoiner.join(id, nodeId));
- } else {
- idBatch.add(id);
- }
-
- if (idBatch.size() >= getBatchCount()) {
- saveBatchToFile(idBatch, writer);
- idBatch.clear();
- }
-
+ final Joiner delimJoiner = Joiner.on(DELIM).skipNulls();
+ Iterator<List<String>> partitions = Iterators.partition(idIter, getBatchCount());
+ while (partitions.hasNext()) {
+ List<String> idBatch = Lists.transform(partitions.next(), new Function<String,
+ String>() {
+ @Nullable @Override
+ public String apply(@Nullable String id) {
+ if (logPath) {
+ return delimJoiner.join(id, nodeId);
+ }
+ return id;
+ }
+ });
if (debugMode) {
- LOG.trace("chunkId : {}", id);
+ LOG.trace("chunkIds : {}", idBatch);
}
- count.getAndIncrement();
- }
-
- if (!idBatch.isEmpty()) {
+ count.getAndAdd(idBatch.size());
saveBatchToFile(idBatch, writer);
- idBatch.clear();
}
if (count.get() % getBatchCount() == 0) {
@@ -581,7 +532,7 @@ public class MarkSweepGarbageCollector i
fs.getAvailableRefs(),
fs.getMarkedRefs(),
transformer);
- candidates = calculateDifference(fs, iter);
+ candidates = FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true);
LOG.trace("Ending difference phase of the consistency check");
LOG.info("Consistency check found [{}] missing blobs", candidates);
@@ -765,7 +716,7 @@ public class MarkSweepGarbageCollector i
*/
@Override
long sweepInternal(GarbageCollectableBlobStore blobStore, List<String> ids,
- ConcurrentLinkedQueue<String> exceptionQueue, long maxModified) {
+ ArrayDeque<String> exceptionQueue, long maxModified) {
long totalDeleted = 0;
LOG.trace("Blob ids to be deleted {}", ids);
for (String id : ids) {
@@ -821,7 +772,7 @@ public class MarkSweepGarbageCollector i
* @return
*/
long sweepInternal(GarbageCollectableBlobStore blobStore,
- List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue, long maxModified) {
+ List<String> ids, ArrayDeque<String> exceptionQueue, long maxModified) {
long deleted = 0;
try {
LOG.trace("Blob ids to be deleted {}", ids);
@@ -829,8 +780,8 @@ public class MarkSweepGarbageCollector i
if (deleted != ids.size()) {
LOG.debug("Some [{}] blobs were not deleted from the batch : [{}]",
ids.size() - deleted, ids);
- exceptionQueue.addAll(ids);
}
+ exceptionQueue.addAll(ids);
} catch (Exception e) {
LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
}
@@ -848,35 +799,16 @@ public class MarkSweepGarbageCollector i
void retrieve(GarbageCollectableBlobStore blobStore,
GarbageCollectorFileState fs, int batchCount) throws Exception {
LOG.debug("Starting retrieve of all blobs");
- BufferedWriter bufferWriter = null;
int blobsCount = 0;
Iterator<String> idsIter = null;
try {
- bufferWriter = new BufferedWriter(
- new FileWriter(fs.getAvailableRefs()));
-
idsIter = blobStore.getAllChunkIds(0);
- List<String> ids = newArrayList();
- while (idsIter.hasNext()) {
- ids.add(idsIter.next());
- if (ids.size() > batchCount) {
- blobsCount += ids.size();
- saveBatchToFile(ids, bufferWriter);
- LOG.info("Retrieved ({}) blobs", blobsCount);
- }
- }
-
- if (!ids.isEmpty()) {
- blobsCount += ids.size();
- saveBatchToFile(ids, bufferWriter);
- LOG.info("Retrieved ({}) blobs", blobsCount);
- }
+ blobsCount = FileIOUtils.writeStrings(idsIter, fs.getAvailableRefs(), true, LOG, "Retrieved blobs - ");
// sort the file
sort(fs.getAvailableRefs());
LOG.info("Number of blobs present in BlobStore : [{}] ", blobsCount);
} finally {
- closeQuietly(bufferWriter);
if (idsIter instanceof Closeable) {
try {
Closeables.close((Closeable) idsIter, false);