You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/07/20 03:57:35 UTC

svn commit: r1753435 - in /jackrabbit/oak/trunk: oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java

Author: amitj
Date: Wed Jul 20 03:57:34 2016
New Revision: 1753435

URL: http://svn.apache.org/viewvc?rev=1753435&view=rev
Log:
OAK-4200:  [BlobGC] Improve collection times of blobs available

* MarkSweepGarbageCollector code cleanup
* Removing code duplication
* Reusing methods from FileIOUtils

Modified:
    jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java

Modified: jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java?rev=1753435&r1=1753434&r2=1753435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java (original)
+++ jackrabbit/oak/trunk/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/FileIOUtils.java Wed Jul 20 03:57:34 2016
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Function;
+import com.google.common.base.Strings;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -192,6 +193,23 @@ public final class FileIOUtils {
      */
     public static int writeStrings(Iterator<String> iterator, File f, boolean escape)
         throws IOException {
+        return writeStrings(iterator, f, escape, null, "");
+    }
+
+    /**
+     * Writes string from the given iterator to the given file and optionally
+     * escape the written strings for line breaks.
+     *
+     * @param iterator the source of the strings
+     * @param f file to write to
+     * @param escape escape whether to escape for line breaks
+     * @param logger logger to log progress
+     * @param message message to log
+     * @return
+     * @throws IOException
+     */
+    public static int writeStrings(Iterator<String> iterator, File f, boolean escape,
+        @Nullable Logger logger, @Nullable String message) throws IOException {
         BufferedWriter writer =  newWriter(f, UTF_8);
         boolean threw = true;
 
@@ -200,6 +218,11 @@ public final class FileIOUtils {
             while (iterator.hasNext()) {
                 writeAsLine(writer, iterator.next(), escape);
                 count++;
+                if (logger != null) {
+                    if (count % 1000 == 0) {
+                        logger.info(Strings.nullToEmpty(message) + count);
+                    }
+                }
             }
             threw = false;
         } finally {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1753435&r1=1753434&r2=1753435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Wed Jul 20 03:57:34 2016
@@ -20,20 +20,17 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.LineNumberReader;
 import java.sql.Timestamp;
+import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +43,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.StandardSystemProperty;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
@@ -57,7 +55,6 @@ import org.apache.jackrabbit.core.data.D
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.commons.FileIOUtils;
 import org.apache.jackrabbit.oak.commons.FileIOUtils.FileLineDifferenceIterator;
-import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
@@ -310,37 +307,11 @@ public class MarkSweepGarbageCollector i
                 fs.getMarkedRefs(),
                 fs.getAvailableRefs(),
                 transformer);
-        calculateDifference(fs, iter);
+        int candidates = FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true);
+        LOG.debug("Found candidates - " + candidates);
 
         LOG.debug("Ending difference phase of the garbage collector");
     }
-    
-    private long calculateDifference(GarbageCollectorFileState fs, FileLineDifferenceIterator iter) throws IOException {
-        long numCandidates = 0;
-        BufferedWriter bufferWriter = null;
-        try {
-            bufferWriter = Files.newWriter(fs.getGcCandidates(), Charsets.UTF_8);
-            List<String> expiredSet = newArrayList();
-
-            while (iter.hasNext()) {
-                expiredSet.add(iter.next());
-                if (expiredSet.size() > getBatchCount()) {
-                    numCandidates += expiredSet.size();
-                    saveBatchToFile(expiredSet, bufferWriter);
-                }
-            }
-
-            if (!expiredSet.isEmpty()) {
-                numCandidates += expiredSet.size();
-                saveBatchToFile(expiredSet, bufferWriter);
-            }
-            LOG.debug("Found candidates - " + numCandidates);
-        } finally {
-            IOUtils.closeQuietly(bufferWriter);
-            IOUtils.closeQuietly(iter);
-        }
-        return numCandidates;
-    }
 
     /**
      * Sweep phase of gc candidate deletion.
@@ -408,29 +379,18 @@ public class MarkSweepGarbageCollector i
         LineIterator iterator = null;
         try {
             removesWriter = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
-            ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
+            ArrayDeque<String> removesQueue = new ArrayDeque<String>();
             iterator =
                     FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name());
 
-            List<String> ids = newArrayList();
-            while (iterator.hasNext()) {
-                ids.add(iterator.next());
-
-                if (ids.size() >= getBatchCount()) {
-                    count += ids.size();
-                    deleted += BlobCollectionType.get(blobStore)
-                        .sweepInternal(blobStore,ids, exceptionQueue, lastMaxModifiedTime);
-                    ids = newArrayList();
-                    saveBatchToFile(newArrayList(exceptionQueue), removesWriter);
-                    exceptionQueue.clear();
-                }
-            }
-            if (!ids.isEmpty()) {
+            Iterator<List<String>> partitions = Iterators.partition(iterator, getBatchCount());
+            while (partitions.hasNext()) {
+                List<String> ids = partitions.next();
                 count += ids.size();
                 deleted += BlobCollectionType.get(blobStore)
-                    .sweepInternal(blobStore, ids, exceptionQueue, lastMaxModifiedTime);
-                saveBatchToFile(newArrayList(exceptionQueue), removesWriter);
-                exceptionQueue.clear();
+                    .sweepInternal(blobStore, ids, removesQueue, lastMaxModifiedTime);
+                saveBatchToFile(newArrayList(removesQueue), removesWriter);
+                removesQueue.clear();
             }
         } finally {
             LineIterator.closeQuietly(iterator);
@@ -469,7 +429,6 @@ public class MarkSweepGarbageCollector i
         for (String id : ids) {
             FileIOUtils.writeAsLine(writer, id, true);
         }
-        ids.clear();
         writer.flush();
     }
 
@@ -484,42 +443,34 @@ public class MarkSweepGarbageCollector i
         try {
             marker.collectReferences(
                     new ReferenceCollector() {
-                        private final List<String> idBatch = Lists.newArrayListWithCapacity(getBatchCount());
-
                         private final boolean debugMode = LOG.isTraceEnabled();
 
                         @Override
-                        public void addReference(String blobId, String nodeId) {
+                        public void addReference(String blobId, final String nodeId) {
                             if (debugMode) {
                                 LOG.trace("BlobId : {}, NodeId : {}", blobId, nodeId);
                             }
 
                             try {
                                 Iterator<String> idIter = blobStore.resolveChunks(blobId);
-                                Joiner delimJoiner = Joiner.on(DELIM).skipNulls();
-                                while (idIter.hasNext()) {
-                                    String id = idIter.next();
-
-                                    if (logPath) {
-                                        idBatch.add(delimJoiner.join(id, nodeId));
-                                    } else {
-                                        idBatch.add(id);
-                                    }
-
-                                    if (idBatch.size() >= getBatchCount()) {
-                                        saveBatchToFile(idBatch, writer);
-                                        idBatch.clear();
-                                    }
-
+                                final Joiner delimJoiner = Joiner.on(DELIM).skipNulls();
+                                Iterator<List<String>> partitions = Iterators.partition(idIter, getBatchCount());
+                                while (partitions.hasNext()) {
+                                    List<String> idBatch = Lists.transform(partitions.next(), new Function<String,
+                                        String>() {
+                                        @Nullable @Override
+                                        public String apply(@Nullable String id) {
+                                            if (logPath) {
+                                                return delimJoiner.join(id, nodeId);
+                                            }
+                                            return id;
+                                        }
+                                    });
                                     if (debugMode) {
-                                        LOG.trace("chunkId : {}", id);
+                                        LOG.trace("chunkIds : {}", idBatch);
                                     }
-                                    count.getAndIncrement();
-                                }
-
-                                if (!idBatch.isEmpty()) {
+                                    count.getAndAdd(idBatch.size());
                                     saveBatchToFile(idBatch, writer);
-                                    idBatch.clear();
                                 }
 
                                 if (count.get() % getBatchCount() == 0) {
@@ -581,7 +532,7 @@ public class MarkSweepGarbageCollector i
                 fs.getAvailableRefs(),
                 fs.getMarkedRefs(),
                 transformer);
-            candidates = calculateDifference(fs, iter);
+            candidates = FileIOUtils.writeStrings(iter, fs.getGcCandidates(), true);
             LOG.trace("Ending difference phase of the consistency check");
             
             LOG.info("Consistency check found [{}] missing blobs", candidates);
@@ -765,7 +716,7 @@ public class MarkSweepGarbageCollector i
              */
             @Override
             long sweepInternal(GarbageCollectableBlobStore blobStore, List<String> ids,
-                ConcurrentLinkedQueue<String> exceptionQueue, long maxModified) {
+                ArrayDeque<String> exceptionQueue, long maxModified) {
                 long totalDeleted = 0;
                 LOG.trace("Blob ids to be deleted {}", ids);
                 for (String id : ids) {
@@ -821,7 +772,7 @@ public class MarkSweepGarbageCollector i
          * @return
          */
         long sweepInternal(GarbageCollectableBlobStore blobStore,
-            List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue, long maxModified) {
+            List<String> ids, ArrayDeque<String> exceptionQueue, long maxModified) {
             long deleted = 0;
             try {
                 LOG.trace("Blob ids to be deleted {}", ids);
@@ -829,8 +780,8 @@ public class MarkSweepGarbageCollector i
                 if (deleted != ids.size()) {
                     LOG.debug("Some [{}] blobs were not deleted from the batch : [{}]",
                         ids.size() - deleted, ids);
-                    exceptionQueue.addAll(ids);
                 }
+                exceptionQueue.addAll(ids);
             } catch (Exception e) {
                 LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
             }
@@ -848,35 +799,16 @@ public class MarkSweepGarbageCollector i
         void retrieve(GarbageCollectableBlobStore blobStore,
                 GarbageCollectorFileState fs, int batchCount) throws Exception {
             LOG.debug("Starting retrieve of all blobs");
-            BufferedWriter bufferWriter = null;
             int blobsCount = 0;
             Iterator<String> idsIter = null;
             try {
-                bufferWriter = new BufferedWriter(
-                    new FileWriter(fs.getAvailableRefs()));
-
                 idsIter = blobStore.getAllChunkIds(0);
-                List<String> ids = newArrayList();
-                while (idsIter.hasNext()) {
-                    ids.add(idsIter.next());
-                    if (ids.size() > batchCount) {
-                        blobsCount += ids.size();
-                        saveBatchToFile(ids, bufferWriter);
-                        LOG.info("Retrieved ({}) blobs", blobsCount);
-                    }
-                }
-
-                if (!ids.isEmpty()) {
-                    blobsCount += ids.size();
-                    saveBatchToFile(ids, bufferWriter);
-                    LOG.info("Retrieved ({}) blobs", blobsCount);
-                }
+                blobsCount = FileIOUtils.writeStrings(idsIter, fs.getAvailableRefs(), true, LOG, "Retrieved blobs - ");
 
                 // sort the file
                 sort(fs.getAvailableRefs());
                 LOG.info("Number of blobs present in BlobStore : [{}] ", blobsCount);
             } finally {
-                closeQuietly(bufferWriter);
                 if (idsIter instanceof Closeable) {
                     try {
                         Closeables.close((Closeable) idsIter, false);