You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:50:38 UTC

svn commit: r1766335 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/blob/ test/java/org/apache/jackrabbit/oak/plugins/blob/

Author: amitj
Date: Mon Oct 24 04:50:38 2016
New Revision: 1766335

URL: http://svn.apache.org/viewvc?rev=1766335&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore

- Added invalidate and getAllIdentifiers to the UploadStagingCache

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Mon Oct 24 04:50:38 2016
@@ -114,6 +114,7 @@ public class CompositeDataStoreCache ext
 
     @Override
     public void invalidate(Object key) {
+        stagingCache.invalidate((String) key);
         downloadCache.invalidate(key);
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Mon Oct 24 04:50:38 2016
@@ -307,7 +307,7 @@ public class UploadStagingCache implemen
                     result.setException(t);
                 }
             });
-            LOG.info("File [{}] scheduled for upload [{}]", upload, result);
+            LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
         } catch (Exception e) {
             LOG.error("Error staging file for upload [{}]", upload, e);
         }
@@ -315,6 +315,33 @@ public class UploadStagingCache implemen
     }
 
     /**
+     * Invalidate called externally.
+     * @param key
+     */
+    protected void invalidate(String key) {
+        // Check if not already scheduled for deletion
+        if (!attic.containsKey(key) && map.containsKey(key)) {
+            try {
+                LOG.debug("Invalidating [{}]", key);
+                File toBeDeleted = map.get(key);
+                deleteInternal(key, toBeDeleted);
+                map.remove(key, toBeDeleted);
+            } catch (IOException e) {
+                LOG.warn("Could not delete file from staging", e);
+            }
+        }
+    }
+
+    /**
+     * Returns all identifiers presently staged.
+     *
+     * @return iterator of all identifiers presently staged.
+     */
+    protected Iterator<String> getAllIdentifiers() {
+        return map.keySet().iterator();
+    }
+
+    /**
      * Removes all cached from attic
      */
     private void remove() {
@@ -325,8 +352,15 @@ public class UploadStagingCache implemen
         while (iterator.hasNext()) {
             String key = iterator.next();
             try {
-                if (remove(key)) {
+                // Check if not already scheduled for upload
+                if (!map.containsKey(key)) {
+                    LOG.trace("upload map contains id [{}]", key);
+
+                    File toBeDeleted = attic.get(key);
+                    deleteInternal(key, toBeDeleted);
                     iterator.remove();
+
+                    LOG.debug("Cache [{}] file deleted for id [{}]", toBeDeleted, key);
                     count++;
                 }
             } catch (IOException e) {
@@ -334,33 +368,28 @@ public class UploadStagingCache implemen
             }
         }
 
-        LOG.info("Finished purge of [{}] files", count);
+        LOG.info("Finished removal of [{}] files", count);
     }
 
-    private boolean remove(String id) throws IOException {
-        LOG.trace("Removing upload file for id [{}]", id);
-
-        // Check if not already scheduled for upload
-        if (!map.containsKey(id)) {
-            LOG.trace("upload map contains id [{}]", id);
-
-            File toBeDeleted = attic.get(id);
-            LOG.trace("Trying to delete file [{}]", toBeDeleted);
-            long length = toBeDeleted.length();
-
-            delete(toBeDeleted);
-            LOG.info("deleted file [{}]", toBeDeleted);
-
-            currentSize.addAndGet(-length);
-            // Update stats for removal
-            cacheStats.decrementSize(length);
-            cacheStats.decrementMemSize(memWeigher.weigh(id, toBeDeleted));
-            cacheStats.decrementCount();
-
-            LOG.info("Cache [{}] file deleted for id [{}]", toBeDeleted, id);
-            return true;
-        }
-        return false;
+    /**
+     * Adjust stats and delete file.
+     *
+     * @param key
+     * @param toBeDeleted
+     * @throws IOException
+     */
+    private void deleteInternal(String key, File toBeDeleted) throws IOException {
+        LOG.debug("Trying to delete file [{}]", toBeDeleted);
+        long length = toBeDeleted.length();
+
+        delete(toBeDeleted);
+        LOG.debug("deleted file [{}]", toBeDeleted);
+
+        currentSize.addAndGet(-length);
+        // Update stats for removal
+        cacheStats.decrementSize(length);
+        cacheStats.decrementMemSize(memWeigher.weigh(key, toBeDeleted));
+        cacheStats.decrementCount();
     }
 
     /**
@@ -372,7 +401,7 @@ public class UploadStagingCache implemen
     private void delete(File f) throws IOException {
         if (f.exists()) {
             FileUtils.forceDelete(f);
-            LOG.debug("Deleted staged upload file [{}]", f);
+            LOG.info("Deleted staged upload file [{}]", f);
         }
 
         // delete empty parent folders (except the main directory)
@@ -507,7 +536,7 @@ class StagingCacheStats extends Annotate
         }
 
         // Configure cache name
-        cacheName = "StagingCacheStats";
+        cacheName = "DataStore-StagingCache";
 
         this.maxWeight = maxWeight;
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Mon Oct 24 04:50:38 2016
@@ -152,7 +152,6 @@ public class CompositeDataStoreCacheTest
 
         //start
         taskLatch.countDown();
-        /** might be redundant **/
         callbackLatch.countDown();
 
         waitFinish();
@@ -183,7 +182,6 @@ public class CompositeDataStoreCacheTest
 
         //start the original upload
         taskLatch.countDown();
-        /** might be redundant **/
         callbackLatch.countDown();
 
         waitFinish();
@@ -195,6 +193,47 @@ public class CompositeDataStoreCacheTest
     }
 
     /**
+     * Invalidate from staging.
+     */
+    @Test
+    public void invalidateStaging() throws IOException {
+        // create executor
+        taskLatch = new CountDownLatch(2);
+        callbackLatch = new CountDownLatch(2);
+        afterExecuteLatch = new CountDownLatch(2);
+        executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
+            80 * 1024 /* bytes */, 10 /* staging % */,
+            1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000);
+        closer.register(cache);
+
+
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        boolean accepted = cache.stage(ID_PREFIX + 0, f);
+        assertTrue(accepted);
+
+        File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+        accepted = cache.stage(ID_PREFIX + 1, f2);
+        assertTrue(accepted);
+
+        cache.invalidate(ID_PREFIX + 0);
+
+        //start the original uploads
+        taskLatch.countDown();
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        callbackLatch.countDown();
+
+        waitFinish();
+
+        File file = cache.getIfPresent(ID_PREFIX + 0);
+        assertNull(file);
+        file = cache.getIfPresent(ID_PREFIX + 1);
+        assertFile(file, 1, folder);
+        assertCacheStats(cache.getStagingCacheStats(), 0, 0, 2, 2);
+    }
+
+    /**
      * Test {@link CompositeDataStoreCache#getIfPresent(String)} when file staged
      * and then download cache when uploaded.
      * @throws IOException
@@ -232,7 +271,6 @@ public class CompositeDataStoreCacheTest
 
         //start the original upload
         taskLatch.countDown();
-        /** might be redundant **/
         callbackLatch.countDown();
 
         waitFinish();
@@ -400,7 +438,6 @@ public class CompositeDataStoreCacheTest
 
         //start the original upload
         taskLatch.countDown();
-        /** might be redundant **/
         callbackLatch.countDown();
 
         waitFinish();
@@ -448,7 +485,6 @@ public class CompositeDataStoreCacheTest
 
         //start the original upload
         taskLatch.countDown();
-        /** might be redundant **/
         callbackLatch.countDown();
 
         future1.get();
@@ -500,6 +536,7 @@ public class CompositeDataStoreCacheTest
             ScheduledFuture<?> scheduledFuture = scheduledExecutor
                 .schedule(cache.getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS);
             scheduledFuture.get();
+            LOG.info("After jobs completed");
         } catch (Exception e) {
             e.printStackTrace();
         }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766335&r1=1766334&r2=1766335&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Mon Oct 24 04:50:38 2016
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -30,6 +31,7 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 import com.google.common.io.Files;
@@ -183,6 +185,26 @@ public class UploadStagingCacheTest exte
     }
 
     /**
+     * GetAllIdentifiers without adding.
+     * @throws Exception
+     */
+    @Test
+    public void testGetAllIdentifiersNoAdd() throws Exception {
+        Iterator<String> ids = stagingCache.getAllIdentifiers();
+        assertFalse(ids.hasNext());
+    }
+
+    /**
+     * Invalidate without adding.
+     * @throws Exception
+     */
+    @Test
+    public void testInvalidateNoAdd() throws Exception {
+        stagingCache.invalidate(ID_PREFIX + 0);
+        assertCacheStats(stagingCache, 0, 0, 0, 0);
+    }
+
+    /**
      * Error in putting file to stage.
      * @throws Exception
      */
@@ -262,6 +284,57 @@ public class UploadStagingCacheTest exte
     }
 
     /**
+     * GetAllIdentifiers after staging before upload.
+     * @throws Exception
+     */
+    @Test
+    public void testGetAllIdentifiers() throws Exception {
+        // add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // Check getAllIdentifiers
+        Iterator<String> idsIter = stagingCache.getAllIdentifiers();
+        assertEquals(ID_PREFIX + 0, Iterators.getOnlyElement(idsIter));
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        assertFuture(futures, 0);
+
+        assertCacheStats(stagingCache, 0, 0, 1, 1);
+
+        // Should not return anything
+        idsIter = stagingCache.getAllIdentifiers();
+        assertEquals(0, Iterators.size(idsIter));
+    }
+
+    /**
+     * Invalidate after staging before upload.
+     * @throws Exception
+     */
+    @Test
+    public void testInvalidate() throws Exception {
+        // add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // Check invalidate
+        stagingCache.invalidate(ID_PREFIX + 0);
+        File file = stagingCache.getIfPresent(ID_PREFIX + 0);
+        assertNull(file);
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        waitFinish(futures);
+
+        assertCacheStats(stagingCache, 0, 0, 1, 1);
+
+        // Should not return anything
+        file = stagingCache.getIfPresent(ID_PREFIX + 0);
+        assertNull(file);
+    }
+
+    /**
      * Stage same file concurrently.
      * @throws Exception
      */