You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2018/10/18 06:24:03 UTC
svn commit: r1844205 - in /jackrabbit/oak/branches/1.8: ./
oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/
oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/
Author: amitj
Date: Thu Oct 18 06:24:02 2018
New Revision: 1844205
URL: http://svn.apache.org/viewvc?rev=1844205&view=rev
Log:
OAK-7638: Race condition when simultaneous request to stage file for async upload
Merge r1836082 from trunk
Modified:
jackrabbit/oak/branches/1.8/ (props changed)
jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
Propchange: jackrabbit/oak/branches/1.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 18 06:24:02 2018
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1820660-1820661,1820729,1820734,1820859,1820861,1820878,1820888,1820947,1821027,1821130,1821140-1821141,1821178,1821237,1821240,1821249,1821258,1821325,1821358,1821361-1821362,1821370,1821375,1821393,1821477,1821487,1821516,1821617,1821663,1821665,1821668,1821681,1821847,1821975-1821983,1822121,1822201,1822207,1822527,1822723,1822808,1822850,1822934,1823135,1823163,1823169,1823172,1823655,1823669,1824196,1824198,1824253,1824255,1824896,1824962,1825065,1825362,1825381,1825442,1825448,1825466,1825470-1825471,1825475,1825523,1825525,1825561,1825619-1825621,1825651,1825654,1825992,1826079,1826090,1826096,1826216,1826237,1826338,1826516,1826532,1826551,1826560,1826638,1826640,1826730,1826932,1826957,1827423,1827472,1827486,1827816,1827977,1828349,1828439,1828502,1828529,1828948,1829527,1829534,1829546,1829569,1829587,1829665,1829854,1829864,1829978,1829985,1829987,1829998,1830019,1830048,1830160,1830171,1830197,1830209,1830239,1830347,1830748,1830911,1830923,1831157
-1831158,1831163,1831190,1831374,1831560,1831689,1832258,1832376,1832379,1832535,1833308,1833347,1833833,1834112,1834117,1834287,1834291,1834302,1834326,1834328,1834336,1834428,1834468,1834483,1834610,1834648-1834649,1834681,1834823,1834857-1834858,1835060,1835518,1835521,1835635,1835642,1835780,1835819,1836121,1836487,1836493,1837057,1837274,1837296,1837326,1837475,1837503,1837547,1837569,1837600,1837657,1837718,1837998,1838076,1838637,1839549,1839570,1839637,1839746,1840024,1840455,1840574,1841314,1841352,1843398
+/jackrabbit/oak/trunk:1820660-1820661,1820729,1820734,1820859,1820861,1820878,1820888,1820947,1821027,1821130,1821140-1821141,1821178,1821237,1821240,1821249,1821258,1821325,1821358,1821361-1821362,1821370,1821375,1821393,1821477,1821487,1821516,1821617,1821663,1821665,1821668,1821681,1821847,1821975-1821983,1822121,1822201,1822207,1822527,1822723,1822808,1822850,1822934,1823135,1823163,1823169,1823172,1823655,1823669,1824196,1824198,1824253,1824255,1824896,1824962,1825065,1825362,1825381,1825442,1825448,1825466,1825470-1825471,1825475,1825523,1825525,1825561,1825619-1825621,1825651,1825654,1825992,1826079,1826090,1826096,1826216,1826237,1826338,1826516,1826532,1826551,1826560,1826638,1826640,1826730,1826932,1826957,1827423,1827472,1827486,1827816,1827977,1828349,1828439,1828502,1828529,1828948,1829527,1829534,1829546,1829569,1829587,1829665,1829854,1829864,1829978,1829985,1829987,1829998,1830019,1830048,1830160,1830171,1830197,1830209,1830239,1830347,1830748,1830911,1830923,1831157
-1831158,1831163,1831190,1831374,1831560,1831689,1832258,1832376,1832379,1832535,1833308,1833347,1833833,1834112,1834117,1834287,1834291,1834302,1834326,1834328,1834336,1834428,1834468,1834483,1834610,1834648-1834649,1834681,1834823,1834857-1834858,1835060,1835518,1835521,1835635,1835642,1835780,1835819,1836082,1836121,1836487,1836493,1837057,1837274,1837296,1837326,1837475,1837503,1837547,1837569,1837600,1837657,1837718,1837998,1838076,1838637,1839549,1839570,1839637,1839746,1840024,1840455,1840574,1841314,1841352,1843398
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1844205&r1=1844204&r2=1844205&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (original)
+++ jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Thu Oct 18 06:24:02 2018
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.plugin
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestOutputStream;
@@ -38,8 +39,8 @@ import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
-import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.AbstractDataStore;
@@ -169,6 +170,10 @@ public abstract class AbstractSharedCach
@Override public void write(String id, File file) throws DataStoreException {
backend.write(new DataIdentifier(id), file);
}
+
+ @Override public void adopt(File f, File moved) throws IOException {
+ FileUtils.moveFile(f, moved);
+ }
}, statisticsProvider, listeningExecutor, schedulerExecutor, executor, stagingPurgeInterval,
stagingRetryInterval);
}
Modified: jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1844205&r1=1844204&r2=1844205&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/branches/1.8/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Thu Oct 18 06:24:02 2018
@@ -294,14 +294,10 @@ public class UploadStagingCache implemen
if (((ignoreSize && currentSize.addAndGet(length) >= 0)
|| currentSize.addAndGet(length) <= size)
&& !attic.containsKey(id)
+ && existsOrNotExistsMoveFile(input, uploadFile, currentSize, length)
&& map.putIfAbsent(id, uploadFile) == null ) {
try {
- if (!uploadFile.exists()) {
- FileUtils.moveFile(input, uploadFile);
- LOG.trace("File [{}] moved to staging cache [{}]", input, uploadFile);
- }
-
// update stats
cacheStats.markHit();
cacheStats.incrementCount();
@@ -328,6 +324,23 @@ public class UploadStagingCache implemen
return Optional.absent();
}
+ private synchronized boolean existsOrNotExistsMoveFile(File source, File destination, AtomicLong currentSize,
+ long length) {
+ if (!destination.exists()) {
+ try {
+ uploader.adopt(source, destination);
+ LOG.trace("Moved file to staging");
+ } catch (IOException e) {
+ LOG.info("Error moving file to staging", e);
+ currentSize.addAndGet(-length);
+ return false;
+ }
+ LOG.trace("File [{}] moved to staging cache [{}]", source, destination);
+ return true;
+ }
+ return true;
+ }
+
/**
* Stages the file for async upload.
* * Puts the file into the stage caching file system directory
@@ -809,4 +822,6 @@ class StagingCacheStats extends Annotate
*/
interface StagingUploader {
void write(String id, File f) throws DataStoreException;
+
+ void adopt(File f, File moved) throws IOException;
}
Modified: jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1844205&r1=1844204&r2=1844205&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (original)
+++ jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Thu Oct 18 06:24:02 2018
@@ -70,12 +70,19 @@ public class AbstractDataStoreCacheTest
static class TestStagingUploader implements StagingUploader {
private final File root;
+ private CountDownLatch adoptLatch;
public TestStagingUploader(File dir) {
this.root = new File(dir, "datastore");
root.mkdirs();
}
+ public TestStagingUploader(File dir, CountDownLatch adoptLatch) {
+ this.root = new File(dir, "datastore");
+ root.mkdirs();
+ this.adoptLatch = adoptLatch;
+ }
+
@Override public void write(String id, File f) throws DataStoreException {
try {
File move = getFile(id, root);
@@ -87,6 +94,17 @@ public class AbstractDataStoreCacheTest
}
}
+ @Override public void adopt(File f, File moved) throws IOException {
+ try {
+ if (adoptLatch != null) {
+ adoptLatch.await();
+ }
+ } catch (Exception e) {
+ LOG.info("Error in adopt", e);
+ }
+ FileUtils.moveFile(f, moved);
+ }
+
public File read(String id) {
return getFile(id, root);
}
Modified: jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1844205&r1=1844204&r2=1844205&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/branches/1.8/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Thu Oct 18 06:24:02 2018
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -401,6 +402,8 @@ public class UploadStagingCacheTest exte
*/
@Test
public void testConcurrentSameAdd() throws Exception {
+ LOG.info("Starting testConcurrentSameAdd");
+
// Add load
List<ListenableFuture<Integer>> futures = put(folder);
@@ -415,6 +418,44 @@ public class UploadStagingCacheTest exte
assertFuture(futures, 0);
assertCacheStats(stagingCache, 0, 0, 1, 2);
+
+ LOG.info("Finished testConcurrentSameAdd");
+ }
+
+ /**
+ * Stage request same file concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSameAddRequest() throws Exception {
+ LOG.info("Starting testConcurrentSameAddRequest");
+
+ closer.close();
+
+ List<ListenableFuture<Integer>> futures = Lists.newArrayList();
+ CountDownLatch moveLatch = new CountDownLatch(1);
+ init(1, new TestStagingUploader(folder.newFolder(), moveLatch), null);
+
+ //1st request
+ ListenableFuture<Boolean> resultReq1 = putThread(folder, futures);
+
+ //2nd Request
+ ListenableFuture<Boolean> resultReq2 = putThread(folder, futures);
+ Thread.sleep(200);
+
+ // Allow any thread to start moving
+ moveLatch.countDown();
+
+ assertTrue(resultReq1.get());
+ assertTrue(resultReq2.get());
+
+ taskLatch.countDown();
+ callbackLatch.countDown();
+
+ assertFuture(futures, 0);
+ assertCacheStats(stagingCache, 0, 0, 1, 2);
+
+ LOG.info("Finished testConcurrentSameAddRequest");
}
/**
@@ -742,6 +783,31 @@ public class UploadStagingCacheTest exte
}
}
+
+ private ListenableFuture<Boolean> putThread(TemporaryFolder folder, List<ListenableFuture<Integer>> futures) {
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+ closer.register(new ExecutorCloser(executorService));
+
+ ListenableFuture<Boolean> result = executorService.submit(new Callable<Boolean>() {
+ @Override public Boolean call() {
+ try {
+ LOG.info("Starting put");
+ futures.addAll(put(folder));
+ LOG.info("Finished put");
+ File f = stagingCache.getIfPresent(ID_PREFIX + 0);
+ LOG.info("Retrieved file {}, {}", f, f.exists());
+ return f != null && f.exists();
+ } catch (Exception e) {
+ LOG.info("Exception in get", e);
+ }
+ return false;
+ }
+ });
+
+ return result;
+ }
+
private List<ListenableFuture<Integer>> put(TemporaryFolder folder)
throws IOException {
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());