You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:50:38 UTC
svn commit: r1766335 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/blob/
test/java/org/apache/jackrabbit/oak/plugins/blob/
Author: amitj
Date: Mon Oct 24 04:50:38 2016
New Revision: 1766335
URL: http://svn.apache.org/viewvc?rev=1766335&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore
- Added invalidate and getAllIdentifiers to the UploadStagingCache
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Mon Oct 24 04:50:38 2016
@@ -114,6 +114,7 @@ public class CompositeDataStoreCache ext
@Override
public void invalidate(Object key) {
+ stagingCache.invalidate((String) key);
downloadCache.invalidate(key);
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Mon Oct 24 04:50:38 2016
@@ -307,7 +307,7 @@ public class UploadStagingCache implemen
result.setException(t);
}
});
- LOG.info("File [{}] scheduled for upload [{}]", upload, result);
+ LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
} catch (Exception e) {
LOG.error("Error staging file for upload [{}]", upload, e);
}
@@ -315,6 +315,33 @@ public class UploadStagingCache implemen
}
/**
+ * Invalidate called externally.
+ * @param key
+ */
+ protected void invalidate(String key) {
+ // Check if not already scheduled for deletion
+ if (!attic.containsKey(key) && map.containsKey(key)) {
+ try {
+ LOG.debug("Invalidating [{}]", key);
+ File toBeDeleted = map.get(key);
+ deleteInternal(key, toBeDeleted);
+ map.remove(key, toBeDeleted);
+ } catch (IOException e) {
+ LOG.warn("Could not delete file from staging", e);
+ }
+ }
+ }
+
+ /**
+ * Returns all identifiers presently staged.
+ *
+ * @return iterator of all identifiers presently staged.
+ */
+ protected Iterator<String> getAllIdentifiers() {
+ return map.keySet().iterator();
+ }
+
+ /**
* Removes all cached from attic
*/
private void remove() {
@@ -325,8 +352,15 @@ public class UploadStagingCache implemen
while (iterator.hasNext()) {
String key = iterator.next();
try {
- if (remove(key)) {
+ // Check if not already scheduled for upload
+ if (!map.containsKey(key)) {
+ LOG.trace("upload map contains id [{}]", key);
+
+ File toBeDeleted = attic.get(key);
+ deleteInternal(key, toBeDeleted);
iterator.remove();
+
+ LOG.debug("Cache [{}] file deleted for id [{}]", toBeDeleted, key);
count++;
}
} catch (IOException e) {
@@ -334,33 +368,28 @@ public class UploadStagingCache implemen
}
}
- LOG.info("Finished purge of [{}] files", count);
+ LOG.info("Finished removal of [{}] files", count);
}
- private boolean remove(String id) throws IOException {
- LOG.trace("Removing upload file for id [{}]", id);
-
- // Check if not already scheduled for upload
- if (!map.containsKey(id)) {
- LOG.trace("upload map contains id [{}]", id);
-
- File toBeDeleted = attic.get(id);
- LOG.trace("Trying to delete file [{}]", toBeDeleted);
- long length = toBeDeleted.length();
-
- delete(toBeDeleted);
- LOG.info("deleted file [{}]", toBeDeleted);
-
- currentSize.addAndGet(-length);
- // Update stats for removal
- cacheStats.decrementSize(length);
- cacheStats.decrementMemSize(memWeigher.weigh(id, toBeDeleted));
- cacheStats.decrementCount();
-
- LOG.info("Cache [{}] file deleted for id [{}]", toBeDeleted, id);
- return true;
- }
- return false;
+ /**
+ * Adjust stats and delete file.
+ *
+ * @param key
+ * @param toBeDeleted
+ * @throws IOException
+ */
+ private void deleteInternal(String key, File toBeDeleted) throws IOException {
+ LOG.debug("Trying to delete file [{}]", toBeDeleted);
+ long length = toBeDeleted.length();
+
+ delete(toBeDeleted);
+ LOG.debug("deleted file [{}]", toBeDeleted);
+
+ currentSize.addAndGet(-length);
+ // Update stats for removal
+ cacheStats.decrementSize(length);
+ cacheStats.decrementMemSize(memWeigher.weigh(key, toBeDeleted));
+ cacheStats.decrementCount();
}
/**
@@ -372,7 +401,7 @@ public class UploadStagingCache implemen
private void delete(File f) throws IOException {
if (f.exists()) {
FileUtils.forceDelete(f);
- LOG.debug("Deleted staged upload file [{}]", f);
+ LOG.info("Deleted staged upload file [{}]", f);
}
// delete empty parent folders (except the main directory)
@@ -507,7 +536,7 @@ class StagingCacheStats extends Annotate
}
// Configure cache name
- cacheName = "StagingCacheStats";
+ cacheName = "DataStore-StagingCache";
this.maxWeight = maxWeight;
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Mon Oct 24 04:50:38 2016
@@ -152,7 +152,6 @@ public class CompositeDataStoreCacheTest
//start
taskLatch.countDown();
- /** might be redundant **/
callbackLatch.countDown();
waitFinish();
@@ -183,7 +182,6 @@ public class CompositeDataStoreCacheTest
//start the original upload
taskLatch.countDown();
- /** might be redundant **/
callbackLatch.countDown();
waitFinish();
@@ -195,6 +193,47 @@ public class CompositeDataStoreCacheTest
}
/**
+ * Invalidate from staging.
+ */
+ @Test
+ public void invalidateStaging() throws IOException {
+ // create executor
+ taskLatch = new CountDownLatch(2);
+ callbackLatch = new CountDownLatch(2);
+ afterExecuteLatch = new CountDownLatch(2);
+ executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+ cache = new CompositeDataStoreCache(root.getAbsolutePath(),
+ 80 * 1024 /* bytes */, 10 /* staging % */,
+ 1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+ closer.register(cache);
+
+
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ boolean accepted = cache.stage(ID_PREFIX + 0, f);
+ assertTrue(accepted);
+
+ File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+ accepted = cache.stage(ID_PREFIX + 1, f2);
+ assertTrue(accepted);
+
+ cache.invalidate(ID_PREFIX + 0);
+
+ //start the original uploads
+ taskLatch.countDown();
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ callbackLatch.countDown();
+
+ waitFinish();
+
+ File file = cache.getIfPresent(ID_PREFIX + 0);
+ assertNull(file);
+ file = cache.getIfPresent(ID_PREFIX + 1);
+ assertFile(file, 1, folder);
+ assertCacheStats(cache.getStagingCacheStats(), 0, 0, 2, 2);
+ }
+
+ /**
* Test {@link CompositeDataStoreCache#getIfPresent(String)} when file staged
* and then download cache when uploaded.
* @throws IOException
@@ -232,7 +271,6 @@ public class CompositeDataStoreCacheTest
//start the original upload
taskLatch.countDown();
- /** might be redundant **/
callbackLatch.countDown();
waitFinish();
@@ -400,7 +438,6 @@ public class CompositeDataStoreCacheTest
//start the original upload
taskLatch.countDown();
- /** might be redundant **/
callbackLatch.countDown();
waitFinish();
@@ -448,7 +485,6 @@ public class CompositeDataStoreCacheTest
//start the original upload
taskLatch.countDown();
- /** might be redundant **/
callbackLatch.countDown();
future1.get();
@@ -500,6 +536,7 @@ public class CompositeDataStoreCacheTest
ScheduledFuture<?> scheduledFuture = scheduledExecutor
.schedule(cache.getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS);
scheduledFuture.get();
+ LOG.info("After jobs completed");
} catch (Exception e) {
e.printStackTrace();
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Mon Oct 24 04:50:38 2016
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -30,6 +31,7 @@ import java.util.concurrent.ScheduledFut
import java.util.concurrent.TimeUnit;
import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.io.Files;
@@ -183,6 +185,26 @@ public class UploadStagingCacheTest exte
}
/**
+ * GetAllIdentifiers without adding.
+ * @throws Exception
+ */
+ @Test
+ public void testGetAllIdentifiersNoAdd() throws Exception {
+ Iterator<String> ids = stagingCache.getAllIdentifiers();
+ assertFalse(ids.hasNext());
+ }
+
+ /**
+ * Invalidate without adding.
+ * @throws Exception
+ */
+ @Test
+ public void testInvalidateNoAdd() throws Exception {
+ stagingCache.invalidate(ID_PREFIX + 0);
+ assertCacheStats(stagingCache, 0, 0, 0, 0);
+ }
+
+ /**
* Error in putting file to stage.
* @throws Exception
*/
@@ -262,6 +284,57 @@ public class UploadStagingCacheTest exte
}
/**
+ * GetAllIdentifiers after staging before upload.
+ * @throws Exception
+ */
+ @Test
+ public void testGetAllIdentifiers() throws Exception {
+ // add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // Check getAllIdentifiers
+ Iterator<String> idsIter = stagingCache.getAllIdentifiers();
+ assertEquals(ID_PREFIX + 0, Iterators.getOnlyElement(idsIter));
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ assertFuture(futures, 0);
+
+ assertCacheStats(stagingCache, 0, 0, 1, 1);
+
+ // Should not return anything
+ idsIter = stagingCache.getAllIdentifiers();
+ assertEquals(0, Iterators.size(idsIter));
+ }
+
+ /**
+ * Invalidate after staging before upload.
+ * @throws Exception
+ */
+ @Test
+ public void testInvalidate() throws Exception {
+ // add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // Check invalidate
+ stagingCache.invalidate(ID_PREFIX + 0);
+ File file = stagingCache.getIfPresent(ID_PREFIX + 0);
+ assertNull(file);
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ waitFinish(futures);
+
+ assertCacheStats(stagingCache, 0, 0, 1, 1);
+
+ // Should not return anything
+ file = stagingCache.getIfPresent(ID_PREFIX + 0);
+ assertNull(file);
+ }
+
+ /**
* Stage same file concurrently.
* @throws Exception
*/