You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by am...@apache.org on 2014/12/11 05:00:58 UTC

svn commit: r1644549 - /jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java

Author: amitj
Date: Thu Dec 11 04:00:58 2014
New Revision: 1644549

URL: http://svn.apache.org/r1644549
Log:
JCR-3838:  [aws-ext] Proactive & Asynchronous caching of binary when its metadata is accessed from S3
Patch from Shashank

Modified:
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1644549&r1=1644548&r2=1644549&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Thu Dec 11 04:00:58 2014
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
  *     <param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/>
  *     <param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/>
  *     <param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/>
+ *     <param name="{@link #setProactiveCaching(boolean) proactiveCaching}" value="true"/>
  * &lt/DataStore>
  */
 public abstract class CachingDataStore extends AbstractDataStore implements
@@ -120,6 +121,12 @@ public abstract class CachingDataStore e
      */
     protected final Map<DataIdentifier, Long> asyncTouchCache = new ConcurrentHashMap<DataIdentifier, Long>(5);
 
+    /**
+     * In memory map to hold in-progress asynchronous downloads. Once
+     * download is finished corresponding entry is flushed from the map.
+     */
+    protected final Map<DataIdentifier, Long> asyncDownloadCache = new ConcurrentHashMap<DataIdentifier, Long>(5);
+
     protected Backend backend;
 
     /**
@@ -141,6 +148,12 @@ public abstract class CachingDataStore e
     private boolean touchAsync = false;
 
     /**
+     * Flag to indicate that binary content will be cached proactively and
+     * asynchronously when binary metadata is retrieved from {@link Backend}.
+     */
+    private boolean proactiveCaching = true;
+
+    /**
      * The optional backend configuration.
      */
     private String config;
@@ -184,6 +197,11 @@ public abstract class CachingDataStore e
      */
     private AsyncUploadCache asyncWriteCache;
 
+    /**
+     * {@link ExecutorService} to asynchronous downloads
+     */
+    private ExecutorService downloadExecService;
+
     protected abstract Backend createBackend();
 
     protected abstract String getMarkerFile();
@@ -311,6 +329,8 @@ public abstract class CachingDataStore e
                     asyncWriteCache.reset();
                 }
             }
+            downloadExecService = Executors.newFixedThreadPool(5,
+                new NamedThreadFactory("backend-file-download-worker"));
             cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
                 cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
         } catch (Exception e) {
@@ -397,6 +417,7 @@ public abstract class CachingDataStore e
     public DataRecord getRecord(DataIdentifier identifier)
             throws DataStoreException {
         String fileName = getFileName(identifier);
+        boolean existsAtBackend = false;
         try {
             if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) {
                 LOG.debug("[{}] record retrieved from asyncUploadmap",
@@ -404,14 +425,18 @@ public abstract class CachingDataStore e
                 usesIdentifier(identifier);
                 return new CachingDataRecord(this, identifier);
             } else if (cache.getFileIfStored(fileName) != null
-                || backend.exists(identifier)) {
-                LOG.debug("[{}] record retrieved from local cache or backend",
-                    identifier);
+                || (existsAtBackend = backend.exists(identifier))) {
+                if (existsAtBackend) {
+                    LOG.debug("[{}] record retrieved from backend", identifier);
+                    asyncDownload(identifier);
+                } else {
+                    LOG.debug("[{}] record retrieved from local cache",
+                        identifier);
+                }
                 touchInternal(identifier);
                 usesIdentifier(identifier);
                 return new CachingDataRecord(this, identifier);
             }
-
         } catch (IOException ioe) {
             throw new DataStoreException("error in getting record ["
                 + identifier + "]", ioe);
@@ -558,6 +583,7 @@ public abstract class CachingDataStore e
             LOG.debug(
                 "identifier [{}], lastModified=[{}] retrireved from backend ",
                 identifier, lastModified);
+            asyncDownload(identifier);
         }
         return lastModified;
     }
@@ -566,13 +592,16 @@ public abstract class CachingDataStore e
      * Return the length of record from {@link LocalCache} if available,
      * otherwise retrieve it from {@link Backend}.
      */
-    public long getLength(DataIdentifier identifier) throws DataStoreException {
+    public long getLength(final DataIdentifier identifier)
+            throws DataStoreException {
         String fileName = getFileName(identifier);
         Long length = cache.getFileLength(fileName);
         if (length != null) {
             return length.longValue();
         } else {
-            return backend.getLength(identifier);
+            length = backend.getLength(identifier);
+            asyncDownload(identifier);
+            return length;
         }
     }
 
@@ -763,6 +792,33 @@ public abstract class CachingDataStore e
         }
     }
     
+    /**
+     * Invoke {@link #getStream(DataIdentifier)} asynchronously to cache binary
+     * asynchronously.
+     */
+    private void asyncDownload(final DataIdentifier identifier) {
+        if (proactiveCaching
+            && cacheSize != 0
+            && asyncDownloadCache.put(identifier, System.currentTimeMillis()) == null) {
+            downloadExecService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    InputStream input = null;
+                    try {
+                        // getStream to cache file
+                        LOG.debug("Async download [{}] started.", identifier);
+                        input = getStream(identifier);
+                    } catch (RepositoryException re) {
+                        // ignore exception
+                    } finally {
+                        asyncDownloadCache.remove(identifier);
+                        IOUtils.closeQuietly(input);
+                        LOG.debug("Async download [{}] completed.", identifier);
+                    }
+                }
+            });
+        }
+    }
 
     /**
      * Returns a unique temporary file to be used for creating a new data
@@ -914,6 +970,7 @@ public abstract class CachingDataStore e
     public void close() throws DataStoreException {
         cache.close();
         backend.close();
+        downloadExecService.shutdown();
     }
 
     /**
@@ -1063,13 +1120,15 @@ public abstract class CachingDataStore e
     public void setUploadRetries(int uploadRetries) {
         this.uploadRetries = uploadRetries;
     }
-    
-    
 
     public void setTouchAsync(boolean touchAsync) {
         this.touchAsync = touchAsync;
     }
 
+    public void setProactiveCaching(boolean proactiveCaching) {
+        this.proactiveCaching = proactiveCaching;
+    }
+
     public Backend getBackend() {
         return backend;
     }