You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/28 06:37:28 UTC
svn commit: r1766928 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/blob/
test/java/org/apache/jackrabbit/oak/plugins/blob/
Author: amitj
Date: Fri Oct 28 06:37:27 2016
New Revision: 1766928
URL: http://svn.apache.org/viewvc?rev=1766928&view=rev
Log:
OAK-5015: Retry mechanism for failed async uploads
- Added a retry queue where all failed uploads are added.
- A scheduled job firing after retryInterval then polls the queue to schedule them for upload again.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Fri Oct 28 06:37:27 2016
@@ -67,6 +67,7 @@ import static com.google.common.base.Pre
* <param name="{@link #setStagingSplitPercentage(int) staginSplitPercentage}" value="10"/>
* <param name="{@link #setUploadThreads(int) uploadThreads}" value="10"/>
* <param name="{@link #setStagingPurgeInterval(int) stagingPurgeInterval}" value="300"/>
+ * <param name="{@link #setStagingRetryInterval(int) stagingRetryInterval} " value="600"/>
* </DataStore>
*/
public abstract class AbstractSharedCachingDataStore extends AbstractDataStore
@@ -107,6 +108,11 @@ public abstract class AbstractSharedCach
private int stagingPurgeInterval = 300;
/**
+ * The interval for retry job in seconds.
+ */
+ private int stagingRetryInterval = 600;
+
+ /**
* The root rootDirectory where the files are created.
*/
private File rootDirectory;
@@ -150,25 +156,26 @@ public abstract class AbstractSharedCach
this.backend = createBackend();
backend.init();
- this.cache = new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
- new CacheLoader<String, InputStream>() {
- @Override public InputStream load(String key) throws Exception {
- InputStream is = null;
- boolean threw = true;
- try {
- is = backend.read(new DataIdentifier(key));
- threw = false;
- } finally {
- Closeables.close(is, threw);
+ this.cache =
+ new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
+ new CacheLoader<String, InputStream>() {
+ @Override public InputStream load(String key) throws Exception {
+ InputStream is = null;
+ boolean threw = true;
+ try {
+ is = backend.read(new DataIdentifier(key));
+ threw = false;
+ } finally {
+ Closeables.close(is, threw);
+ }
+ return is;
}
- return is;
- }
- }, new StagingUploader() {
- @Override
- public void write(String id, File file) throws DataStoreException {
- backend.write(new DataIdentifier(id), file);
- }
- }, statisticsProvider, listeningExecutor, schedulerExecutor, stagingPurgeInterval);
+ }, new StagingUploader() {
+ @Override public void write(String id, File file) throws DataStoreException {
+ backend.write(new DataIdentifier(id), file);
+ }
+ }, statisticsProvider, listeningExecutor, schedulerExecutor, stagingPurgeInterval,
+ stagingRetryInterval);
}
protected abstract AbstractSharedBackend createBackend();
@@ -363,6 +370,10 @@ public abstract class AbstractSharedCach
this.stagingPurgeInterval = stagingPurgeInterval;
}
+ public void setStagingRetryInterval(int stagingRetryInterval) {
+ this.stagingRetryInterval = stagingRetryInterval;
+ }
+
public void setStatisticsProvider(StatisticsProvider statisticsProvider) {
this.statisticsProvider = statisticsProvider;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Fri Oct 28 06:37:27 2016
@@ -59,25 +59,25 @@ public class CompositeDataStoreCache ext
*/
private final File directory;
- public CompositeDataStoreCache(String path, long size,
- int uploadSplitPercentage, int uploadThreads, CacheLoader<String, InputStream> loader,
- final StagingUploader uploader, StatisticsProvider statsProvider,
- ListeningExecutorService executor,
+ public CompositeDataStoreCache(String path, long size, int uploadSplitPercentage,
+ int uploadThreads, CacheLoader<String, InputStream> loader, final StagingUploader uploader,
+ StatisticsProvider statsProvider, ListeningExecutorService executor,
ScheduledExecutorService scheduledExecutor /* purge scheduled executor */,
- long purgeInterval /* async purge interval secs */) {
+ int purgeInterval /* async purge interval secs */,
+ int stagingRetryInterval /* async retry interval secs */) {
checkArgument(uploadSplitPercentage >= 0 && uploadSplitPercentage < 100,
"Upload percentage should be between 0 and 100");
this.directory = new File(path);
- long uploadSize = (size * uploadSplitPercentage)/100;
+ long uploadSize = (size * uploadSplitPercentage) / 100;
this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null);
this.stagingCache = UploadStagingCache
- .build(directory, uploadThreads, uploadSize, uploader, downloadCache,
- statsProvider, executor, scheduledExecutor, purgeInterval);
+ .build(directory, uploadThreads, uploadSize, uploader, downloadCache, statsProvider,
+ executor, scheduledExecutor, purgeInterval, stagingRetryInterval);
}
@Nullable
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Fri Oct 28 06:37:27 2016
@@ -22,9 +22,11 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +37,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.cache.Weigher;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.FutureCallback;
@@ -100,7 +103,7 @@ public class UploadStagingCache implemen
/**
* Scheduled executor for build and remove
*/
- private ScheduledExecutorService removeExecutor;
+ private ScheduledExecutorService scheduledExecutor;
/**
* In memory map for staged files
@@ -133,10 +136,16 @@ public class UploadStagingCache implemen
@Nullable
private FileCache downloadCache;
+ /**
+ * Queue containing items to retry.
+ */
+ private LinkedBlockingQueue<String> retryQueue;
+
private UploadStagingCache(File dir, int uploadThreads, long size /* bytes */,
StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider,
@Nullable ListeningExecutorService executor,
- @Nullable ScheduledExecutorService scheduledExecutor, long purgeInterval /* secs */) {
+ @Nullable ScheduledExecutorService scheduledExecutor,
+ int purgeInterval /* secs */, int retryInterval /* secs */) {
this.currentSize = new AtomicLong();
this.size = size;
@@ -146,13 +155,15 @@ public class UploadStagingCache implemen
.newFixedThreadPool(uploadThreads, new NamedThreadFactory("oak-ds-async-upload-thread")));
}
- this.removeExecutor = scheduledExecutor;
+ this.scheduledExecutor = scheduledExecutor;
if (scheduledExecutor == null) {
- this.removeExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.scheduledExecutor = Executors
+ .newScheduledThreadPool(2, new NamedThreadFactory("oak-ds-cache-scheduled-thread"));
}
this.map = Maps.newConcurrentMap();
this.attic = Maps.newConcurrentMap();
+ this.retryQueue = new LinkedBlockingQueue<String>();
this.uploadCacheSpace = new File(dir, "upload");
this.uploader = uploader;
this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
@@ -160,8 +171,10 @@ public class UploadStagingCache implemen
build();
- removeExecutor.scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval,
- TimeUnit.SECONDS);
+ this.scheduledExecutor
+ .scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
+ this.scheduledExecutor
+ .scheduleAtFixedRate(new RetryJob(), retryInterval, retryInterval, TimeUnit.SECONDS);
}
private UploadStagingCache() {
@@ -170,10 +183,11 @@ public class UploadStagingCache implemen
public static UploadStagingCache build(File dir, int uploadThreads, long size
/* bytes */, StagingUploader uploader, @Nullable FileCache cache,
StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor,
- @Nullable ScheduledExecutorService scheduledExecutor, long purgeInterval /* secs */) {
+ @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval /* secs */,
+ int retryInterval /* secs */) {
if (size > 0) {
return new UploadStagingCache(dir, uploadThreads, size, uploader, cache,
- statisticsProvider, executor, scheduledExecutor, purgeInterval);
+ statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
}
return new UploadStagingCache() {
@Override public Optional<SettableFuture<Integer>> put(String id, File input) {
@@ -346,8 +360,9 @@ public class UploadStagingCache implemen
}
@Override public void onFailure(Throwable t) {
- LOG.error("Error adding file to backend", t);
+ LOG.error("Error adding [{}] with file [{}] to backend", id, upload, t);
result.setException(t);
+ retryQueue.add(id);
}
});
LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
@@ -469,7 +484,7 @@ public class UploadStagingCache implemen
LOG.info("Uploads completed but not cleared from cache [{}]", attic.size());
LOG.info("Staging cache stats on close [{}]", cacheStats.cacheInfoAsString());
new ExecutorCloser(executor).close();
- new ExecutorCloser(removeExecutor).close();
+ new ExecutorCloser(scheduledExecutor).close();
}
/**
@@ -481,6 +496,28 @@ public class UploadStagingCache implemen
remove();
}
}
+
+
+ /**
+ * Job to retry failed uploads.
+ */
+ class RetryJob implements Runnable {
+ @Override
+ public void run() {
+ LOG.debug("Retry job started");
+ int count = 0;
+ List<String> entries = Lists.newArrayList();
+ retryQueue.drainTo(entries);
+ for (String key : entries) {
+ File file = map.get(key);
+ LOG.info("Retrying upload of id [{}] with file [{}] ", key, file);
+ stage(key, file);
+ count++;
+ LOG.info("Scheduled retry for upload of id [{}] with file [{}]", key, file);
+ }
+ LOG.debug("Retry job finished with staging [{}] jobs", count);
+ }
+ }
}
/**
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Fri Oct 28 06:37:27 2016
@@ -101,7 +101,7 @@ public class CompositeDataStoreCacheTest
//cache instance
cache = new CompositeDataStoreCache(root.getAbsolutePath(),
80 * 1024 /* bytes */, 10, 1/*threads*/, loader,
- uploader, statsProvider, executor, scheduledExecutor, 3000);
+ uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
closer.register(cache);
}
@@ -114,7 +114,7 @@ public class CompositeDataStoreCacheTest
public void zeroCache() throws IOException {
cache = new CompositeDataStoreCache(root.getAbsolutePath(),
0 /* bytes */, 10, 1/*threads*/, loader,
- uploader, statsProvider, executor, scheduledExecutor, 3000);
+ uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
closer.register(cache);
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
@@ -190,7 +190,8 @@ public class CompositeDataStoreCacheTest
public void addCacheFull() throws IOException {
cache = new CompositeDataStoreCache(root.getAbsolutePath(),
40 * 1024 /* bytes */, 10 /* staging % */,
- 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+ 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
+ 6000);
closer.register(cache);
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
@@ -225,7 +226,8 @@ public class CompositeDataStoreCacheTest
executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
cache = new CompositeDataStoreCache(root.getAbsolutePath(),
80 * 1024 /* bytes */, 10 /* staging % */,
- 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+ 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
+ 6000);
closer.register(cache);
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766928&r1=1766927&r2=1766928&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Fri Oct 28 06:37:27 2016
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
@@ -50,7 +51,6 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.mockito.Matchers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +58,6 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
/**
* Tests for {@link UploadStagingCache}.
@@ -89,8 +87,12 @@ public class UploadStagingCacheTest exte
}
private void init(int i) {
+ init(i, new TestStagingUploader(root));
+ }
+
+ private void init(int i, TestStagingUploader testUploader) {
// uploader
- uploader = new TestStagingUploader(root);
+ uploader = testUploader;
// create executor
taskLatch = new CountDownLatch(1);
@@ -109,7 +111,7 @@ public class UploadStagingCacheTest exte
//cache instance
stagingCache =
UploadStagingCache.build(root, 1/*threads*/, 8 * 1024 /* bytes */,
- uploader, null/*cache*/, statsProvider, executor, null, 3000);
+ uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
closer.register(stagingCache);
}
@@ -122,7 +124,7 @@ public class UploadStagingCacheTest exte
public void testZeroCache() throws IOException {
stagingCache =
UploadStagingCache.build(root, 1/*threads*/, 0 /* bytes */,
- uploader, null/*cache*/, statsProvider, executor, null, 3000);
+ uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
closer.register(stagingCache);
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
@@ -157,16 +159,19 @@ public class UploadStagingCacheTest exte
*/
@Test
public void testAddUploadException() throws Exception {
- // Mock uploader to throw exception on write
- final TestStagingUploader mockedDS = mock(TestStagingUploader.class);
- doThrow(new DataStoreException("Error in writing blob")).when(mockedDS)
- .write(Matchers.any(String.class), Matchers.any(File.class));
+ final AtomicInteger count = new AtomicInteger(0);
+ TestStagingUploader secondTimeUploader = new TestStagingUploader(root) {
+ @Override
+ public void write(String id, File f) throws DataStoreException {
+ if (count.get() == 0) {
+ throw new DataStoreException("Error in writing blob");
+ }
+ super.write(id, f);
+ }
+ };
// initialize staging cache using the mocked uploader
- stagingCache =
- UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */,
- mockedDS, null/*cache*/, statsProvider, executor, null, 3000);
- closer.register(stagingCache);
+ init(2, secondTimeUploader);
// Add load
List<ListenableFuture<Integer>> futures = put(folder);
@@ -174,7 +179,6 @@ public class UploadStagingCacheTest exte
//start
taskLatch.countDown();
callbackLatch.countDown();
-
waitFinish(futures);
// assert file retrieved from staging cache
@@ -184,6 +188,19 @@ public class UploadStagingCacheTest exte
assertEquals(1, stagingCache.getStats().getLoadCount());
assertEquals(1, stagingCache.getStats().getLoadSuccessCount());
assertCacheStats(stagingCache, 1, 4 * 1024, 1, 1);
+
+ // Retry upload and wait for finish
+ count.incrementAndGet();
+ ScheduledFuture<?> scheduledFuture =
+ removeExecutor.schedule(stagingCache.new RetryJob(), 0, TimeUnit.MILLISECONDS);
+ scheduledFuture.get();
+ afterExecuteLatch.await();
+
+ // Now uploaded
+ ret = stagingCache.getIfPresent(ID_PREFIX + 0);
+ assertNull(ret);
+ assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+ secondTimeUploader.read(ID_PREFIX + 0)));
}
/**
@@ -274,7 +291,7 @@ public class UploadStagingCacheTest exte
// initialize cache to have restricted size
stagingCache =
UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */,
- uploader, null/*cache*/, statsProvider, executor, null, 3000);
+ uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
closer.register(stagingCache);
// add load