You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/28 06:37:28 UTC

svn commit: r1766928 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/blob/ test/java/org/apache/jackrabbit/oak/plugins/blob/

Author: amitj
Date: Fri Oct 28 06:37:27 2016
New Revision: 1766928

URL: http://svn.apache.org/viewvc?rev=1766928&view=rev
Log:
OAK-5015: Retry mechanism for failed async uploads

- Added a retry queue where all failed uploads are added.
- A scheduled job firing after retryInterval then polls the queue to schedule them for upload again.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Fri Oct 28 06:37:27 2016
@@ -67,6 +67,7 @@ import static com.google.common.base.Pre
  *     <param name="{@link #setStagingSplitPercentage(int) staginSplitPercentage}" value="10"/>
  *     <param name="{@link #setUploadThreads(int) uploadThreads}" value="10"/>
  *     <param name="{@link #setStagingPurgeInterval(int) stagingPurgeInterval}" value="300"/>
+ *     <param name="{@link #setStagingRetryInterval(int) stagingRetryInterval} " value="600"/>
  * </DataStore>
  */
 public abstract class AbstractSharedCachingDataStore extends AbstractDataStore
@@ -107,6 +108,11 @@ public abstract class AbstractSharedCach
     private int stagingPurgeInterval = 300;
 
     /**
+     * The interval for retry job in seconds.
+     */
+    private int stagingRetryInterval = 600;
+
+    /**
      * The root rootDirectory where the files are created.
      */
     private File rootDirectory;
@@ -150,25 +156,26 @@ public abstract class AbstractSharedCach
         this.backend = createBackend();
         backend.init();
 
-        this.cache = new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
-            new CacheLoader<String, InputStream>() {
-                @Override public InputStream load(String key) throws Exception {
-                    InputStream is = null;
-                    boolean threw = true;
-                    try {
-                        is = backend.read(new DataIdentifier(key));
-                        threw = false;
-                    } finally {
-                        Closeables.close(is, threw);
+        this.cache =
+            new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
+                new CacheLoader<String, InputStream>() {
+                    @Override public InputStream load(String key) throws Exception {
+                        InputStream is = null;
+                        boolean threw = true;
+                        try {
+                            is = backend.read(new DataIdentifier(key));
+                            threw = false;
+                        } finally {
+                            Closeables.close(is, threw);
+                        }
+                        return is;
                     }
-                    return is;
-                }
-            }, new StagingUploader() {
-                @Override
-                public void write(String id, File file) throws DataStoreException {
-                    backend.write(new DataIdentifier(id), file);
-                }
-            }, statisticsProvider, listeningExecutor, schedulerExecutor, stagingPurgeInterval);
+                }, new StagingUploader() {
+                    @Override public void write(String id, File file) throws DataStoreException {
+                        backend.write(new DataIdentifier(id), file);
+                    }
+            }, statisticsProvider, listeningExecutor, schedulerExecutor, stagingPurgeInterval,
+                stagingRetryInterval);
     }
 
     protected abstract AbstractSharedBackend createBackend();
@@ -363,6 +370,10 @@ public abstract class AbstractSharedCach
         this.stagingPurgeInterval = stagingPurgeInterval;
     }
 
+    public void setStagingRetryInterval(int stagingRetryInterval) {
+        this.stagingRetryInterval = stagingRetryInterval;
+    }
+
     public void setStatisticsProvider(StatisticsProvider statisticsProvider) {
         this.statisticsProvider = statisticsProvider;
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Fri Oct 28 06:37:27 2016
@@ -59,25 +59,25 @@ public class CompositeDataStoreCache ext
      */
     private final File directory;
 
-    public CompositeDataStoreCache(String path, long size,
-        int uploadSplitPercentage, int uploadThreads, CacheLoader<String, InputStream> loader,
-        final StagingUploader uploader, StatisticsProvider statsProvider,
-        ListeningExecutorService executor,
+    public CompositeDataStoreCache(String path, long size, int uploadSplitPercentage,
+        int uploadThreads, CacheLoader<String, InputStream> loader, final StagingUploader uploader,
+        StatisticsProvider statsProvider, ListeningExecutorService executor,
         ScheduledExecutorService scheduledExecutor /* purge scheduled executor */,
-        long purgeInterval /* async purge interval secs */) {
+        int purgeInterval /* async purge interval secs */,
+        int stagingRetryInterval /* async retry interval secs */) {
 
         checkArgument(uploadSplitPercentage >= 0 && uploadSplitPercentage < 100,
             "Upload percentage should be between 0 and 100");
 
         this.directory = new File(path);
 
-        long uploadSize = (size * uploadSplitPercentage)/100;
+        long uploadSize = (size * uploadSplitPercentage) / 100;
 
         this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null);
 
         this.stagingCache = UploadStagingCache
-            .build(directory, uploadThreads, uploadSize, uploader, downloadCache,
-                statsProvider, executor, scheduledExecutor, purgeInterval);
+            .build(directory, uploadThreads, uploadSize, uploader, downloadCache, statsProvider,
+                executor, scheduledExecutor, purgeInterval, stagingRetryInterval);
     }
 
     @Nullable

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Fri Oct 28 06:37:27 2016
@@ -22,9 +22,11 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +37,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.cache.Weigher;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.FutureCallback;
@@ -100,7 +103,7 @@ public class UploadStagingCache implemen
     /**
      * Scheduled executor for build and remove
      */
-    private ScheduledExecutorService removeExecutor;
+    private ScheduledExecutorService scheduledExecutor;
 
     /**
      * In memory map for staged files
@@ -133,10 +136,16 @@ public class UploadStagingCache implemen
     @Nullable
     private FileCache downloadCache;
 
+    /**
+     * Queue containing items to retry.
+     */
+    private LinkedBlockingQueue<String> retryQueue;
+
     private UploadStagingCache(File dir, int uploadThreads, long size /* bytes */,
         StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider,
         @Nullable ListeningExecutorService executor,
-        @Nullable ScheduledExecutorService scheduledExecutor, long purgeInterval /* secs */) {
+        @Nullable ScheduledExecutorService scheduledExecutor,
+        int purgeInterval /* secs */, int retryInterval /* secs */) {
 
         this.currentSize = new AtomicLong();
         this.size = size;
@@ -146,13 +155,15 @@ public class UploadStagingCache implemen
                 .newFixedThreadPool(uploadThreads, new NamedThreadFactory("oak-ds-async-upload-thread")));
         }
 
-        this.removeExecutor = scheduledExecutor;
+        this.scheduledExecutor = scheduledExecutor;
         if (scheduledExecutor == null) {
-            this.removeExecutor = Executors.newSingleThreadScheduledExecutor();
+            this.scheduledExecutor = Executors
+                .newScheduledThreadPool(2, new NamedThreadFactory("oak-ds-cache-scheduled-thread"));
         }
 
         this.map = Maps.newConcurrentMap();
         this.attic = Maps.newConcurrentMap();
+        this.retryQueue = new LinkedBlockingQueue<String>();
         this.uploadCacheSpace = new File(dir, "upload");
         this.uploader = uploader;
         this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
@@ -160,8 +171,10 @@ public class UploadStagingCache implemen
 
         build();
 
-        removeExecutor.scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval,
-            TimeUnit.SECONDS);
+        this.scheduledExecutor
+            .scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
+        this.scheduledExecutor
+            .scheduleAtFixedRate(new RetryJob(), retryInterval, retryInterval, TimeUnit.SECONDS);
     }
 
     private UploadStagingCache() {
@@ -170,10 +183,11 @@ public class UploadStagingCache implemen
     public static UploadStagingCache build(File dir, int uploadThreads, long size
         /* bytes */, StagingUploader uploader, @Nullable FileCache cache,
         StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor,
-        @Nullable ScheduledExecutorService scheduledExecutor, long purgeInterval /* secs */) {
+        @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval /* secs */,
+        int retryInterval /* secs */) {
         if (size > 0) {
             return new UploadStagingCache(dir, uploadThreads, size, uploader, cache,
-                statisticsProvider, executor, scheduledExecutor, purgeInterval);
+                statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
         }
         return new UploadStagingCache() {
             @Override public Optional<SettableFuture<Integer>> put(String id, File input) {
@@ -346,8 +360,9 @@ public class UploadStagingCache implemen
                 }
 
                 @Override public void onFailure(Throwable t) {
-                    LOG.error("Error adding file to backend", t);
+                    LOG.error("Error adding [{}] with file [{}] to backend", id, upload, t);
                     result.setException(t);
+                    retryQueue.add(id);
                 }
             });
             LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
@@ -469,7 +484,7 @@ public class UploadStagingCache implemen
         LOG.info("Uploads completed but not cleared from cache [{}]", attic.size());
         LOG.info("Staging cache stats on close [{}]", cacheStats.cacheInfoAsString());
         new ExecutorCloser(executor).close();
-        new ExecutorCloser(removeExecutor).close();
+        new ExecutorCloser(scheduledExecutor).close();
     }
 
     /**
@@ -481,6 +496,28 @@ public class UploadStagingCache implemen
             remove();
         }
     }
+
+
+    /**
+     * Job to retry failed uploads.
+     */
+    class RetryJob implements Runnable {
+        @Override
+        public void run() {
+            LOG.debug("Retry job started");
+            int count = 0;
+            List<String> entries = Lists.newArrayList();
+            retryQueue.drainTo(entries);
+            for (String key : entries) {
+                File file = map.get(key);
+                LOG.info("Retrying upload of id [{}] with file [{}] ", key, file);
+                stage(key, file);
+                count++;
+                LOG.info("Scheduled retry for upload of id [{}] with file [{}]", key, file);
+            }
+            LOG.debug("Retry job finished with staging [{}] jobs", count);
+        }
+    }
 }
 
 /**

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Fri Oct 28 06:37:27 2016
@@ -101,7 +101,7 @@ public class CompositeDataStoreCacheTest
         //cache instance
         cache = new CompositeDataStoreCache(root.getAbsolutePath(),
             80 * 1024 /* bytes */, 10, 1/*threads*/, loader,
-            uploader, statsProvider, executor, scheduledExecutor, 3000);
+            uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
         closer.register(cache);
     }
 
@@ -114,7 +114,7 @@ public class CompositeDataStoreCacheTest
     public void zeroCache() throws IOException {
         cache = new CompositeDataStoreCache(root.getAbsolutePath(),
             0 /* bytes */, 10, 1/*threads*/, loader,
-            uploader, statsProvider, executor, scheduledExecutor, 3000);
+            uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
         closer.register(cache);
 
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
@@ -190,7 +190,8 @@ public class CompositeDataStoreCacheTest
     public void addCacheFull() throws IOException {
         cache = new CompositeDataStoreCache(root.getAbsolutePath(),
             40 * 1024 /* bytes */, 10 /* staging % */,
-            1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+            1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
+            6000);
         closer.register(cache);
 
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
@@ -225,7 +226,8 @@ public class CompositeDataStoreCacheTest
         executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
         cache = new CompositeDataStoreCache(root.getAbsolutePath(),
             80 * 1024 /* bytes */, 10 /* staging % */,
-            1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+            1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
+            6000);
         closer.register(cache);
 
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Fri Oct 28 06:37:27 2016
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
@@ -50,7 +51,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,8 +58,6 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for {@link UploadStagingCache}.
@@ -89,8 +87,12 @@ public class UploadStagingCacheTest exte
     }
 
     private void init(int i) {
+        init(i, new TestStagingUploader(root));
+    }
+
+    private void init(int i, TestStagingUploader testUploader) {
         // uploader
-        uploader = new TestStagingUploader(root);
+        uploader = testUploader;
 
         // create executor
         taskLatch = new CountDownLatch(1);
@@ -109,7 +111,7 @@ public class UploadStagingCacheTest exte
         //cache instance
         stagingCache =
             UploadStagingCache.build(root, 1/*threads*/, 8 * 1024 /* bytes */,
-                uploader, null/*cache*/, statsProvider, executor, null, 3000);
+                uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
         closer.register(stagingCache);
     }
 
@@ -122,7 +124,7 @@ public class UploadStagingCacheTest exte
     public void testZeroCache() throws IOException {
         stagingCache =
             UploadStagingCache.build(root, 1/*threads*/, 0 /* bytes */,
-                uploader, null/*cache*/, statsProvider, executor, null, 3000);
+                uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
         closer.register(stagingCache);
 
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
@@ -157,16 +159,19 @@ public class UploadStagingCacheTest exte
      */
     @Test
     public void testAddUploadException() throws Exception {
-        // Mock uploader to throw exception on write
-        final TestStagingUploader mockedDS = mock(TestStagingUploader.class);
-        doThrow(new DataStoreException("Error in writing blob")).when(mockedDS)
-            .write(Matchers.any(String.class), Matchers.any(File.class));
+        final AtomicInteger count = new AtomicInteger(0);
+        TestStagingUploader secondTimeUploader = new TestStagingUploader(root) {
+            @Override
+            public void write(String id, File f) throws DataStoreException {
+                if (count.get() == 0) {
+                    throw new DataStoreException("Error in writing blob");
+                }
+                super.write(id, f);
+            }
+        };
 
         // initialize staging cache using the mocked uploader
-        stagingCache =
-            UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */,
-                mockedDS, null/*cache*/, statsProvider, executor, null, 3000);
-        closer.register(stagingCache);
+        init(2, secondTimeUploader);
 
         // Add load
         List<ListenableFuture<Integer>> futures = put(folder);
@@ -174,7 +179,6 @@ public class UploadStagingCacheTest exte
         //start
         taskLatch.countDown();
         callbackLatch.countDown();
-
         waitFinish(futures);
 
         // assert file retrieved from staging cache
@@ -184,6 +188,19 @@ public class UploadStagingCacheTest exte
         assertEquals(1, stagingCache.getStats().getLoadCount());
         assertEquals(1, stagingCache.getStats().getLoadSuccessCount());
         assertCacheStats(stagingCache, 1, 4 * 1024, 1, 1);
+
+        // Retry upload and wait for finish
+        count.incrementAndGet();
+        ScheduledFuture<?> scheduledFuture =
+            removeExecutor.schedule(stagingCache.new RetryJob(), 0, TimeUnit.MILLISECONDS);
+        scheduledFuture.get();
+        afterExecuteLatch.await();
+
+        // Now uploaded
+        ret = stagingCache.getIfPresent(ID_PREFIX + 0);
+        assertNull(ret);
+        assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+            secondTimeUploader.read(ID_PREFIX + 0)));
     }
 
     /**
@@ -274,7 +291,7 @@ public class UploadStagingCacheTest exte
         // initialize cache to have restricted size
         stagingCache =
             UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */,
-                uploader, null/*cache*/, statsProvider, executor, null, 3000);
+                uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
         closer.register(stagingCache);
 
         // add load