You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by am...@apache.org on 2014/12/11 05:00:58 UTC
svn commit: r1644549 -
/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
Author: amitj
Date: Thu Dec 11 04:00:58 2014
New Revision: 1644549
URL: http://svn.apache.org/r1644549
Log:
JCR-3838: [aws-ext] Proactive & Asynchronous caching of binary when its metadata is accessed from S3
Patch from Shashank
Modified:
jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1644549&r1=1644548&r2=1644549&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Thu Dec 11 04:00:58 2014
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
* <param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/>
* <param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/>
* <param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/>
+ * <param name="{@link #setProactiveCaching(boolean) proactiveCaching}" value="true"/>
* </DataStore>
*/
public abstract class CachingDataStore extends AbstractDataStore implements
@@ -120,6 +121,12 @@ public abstract class CachingDataStore e
*/
protected final Map<DataIdentifier, Long> asyncTouchCache = new ConcurrentHashMap<DataIdentifier, Long>(5);
+ /**
+ * In memory map to hold in-progress asynchronous downloads. Once
+ * download is finished corresponding entry is flushed from the map.
+ */
+ protected final Map<DataIdentifier, Long> asyncDownloadCache = new ConcurrentHashMap<DataIdentifier, Long>(5);
+
protected Backend backend;
/**
@@ -141,6 +148,12 @@ public abstract class CachingDataStore e
private boolean touchAsync = false;
/**
+ * Flag to indicate that binary content will be cached proactively and
+ * asynchronously when binary metadata is retrieved from {@link Backend}.
+ */
+ private boolean proactiveCaching = true;
+
+ /**
* The optional backend configuration.
*/
private String config;
@@ -184,6 +197,11 @@ public abstract class CachingDataStore e
*/
private AsyncUploadCache asyncWriteCache;
+ /**
+ * {@link ExecutorService} to asynchronous downloads
+ */
+ private ExecutorService downloadExecService;
+
protected abstract Backend createBackend();
protected abstract String getMarkerFile();
@@ -311,6 +329,8 @@ public abstract class CachingDataStore e
asyncWriteCache.reset();
}
}
+ downloadExecService = Executors.newFixedThreadPool(5,
+ new NamedThreadFactory("backend-file-download-worker"));
cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
} catch (Exception e) {
@@ -397,6 +417,7 @@ public abstract class CachingDataStore e
public DataRecord getRecord(DataIdentifier identifier)
throws DataStoreException {
String fileName = getFileName(identifier);
+ boolean existsAtBackend = false;
try {
if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) {
LOG.debug("[{}] record retrieved from asyncUploadmap",
@@ -404,14 +425,18 @@ public abstract class CachingDataStore e
usesIdentifier(identifier);
return new CachingDataRecord(this, identifier);
} else if (cache.getFileIfStored(fileName) != null
- || backend.exists(identifier)) {
- LOG.debug("[{}] record retrieved from local cache or backend",
- identifier);
+ || (existsAtBackend = backend.exists(identifier))) {
+ if (existsAtBackend) {
+ LOG.debug("[{}] record retrieved from backend", identifier);
+ asyncDownload(identifier);
+ } else {
+ LOG.debug("[{}] record retrieved from local cache",
+ identifier);
+ }
touchInternal(identifier);
usesIdentifier(identifier);
return new CachingDataRecord(this, identifier);
}
-
} catch (IOException ioe) {
throw new DataStoreException("error in getting record ["
+ identifier + "]", ioe);
@@ -558,6 +583,7 @@ public abstract class CachingDataStore e
LOG.debug(
"identifier [{}], lastModified=[{}] retrireved from backend ",
identifier, lastModified);
+ asyncDownload(identifier);
}
return lastModified;
}
@@ -566,13 +592,16 @@ public abstract class CachingDataStore e
* Return the length of record from {@link LocalCache} if available,
* otherwise retrieve it from {@link Backend}.
*/
- public long getLength(DataIdentifier identifier) throws DataStoreException {
+ public long getLength(final DataIdentifier identifier)
+ throws DataStoreException {
String fileName = getFileName(identifier);
Long length = cache.getFileLength(fileName);
if (length != null) {
return length.longValue();
} else {
- return backend.getLength(identifier);
+ length = backend.getLength(identifier);
+ asyncDownload(identifier);
+ return length;
}
}
@@ -763,6 +792,33 @@ public abstract class CachingDataStore e
}
}
+ /**
+ * Invoke {@link #getStream(DataIdentifier)} asynchronously to cache binary
+ * asynchronously.
+ */
+ private void asyncDownload(final DataIdentifier identifier) {
+ if (proactiveCaching
+ && cacheSize != 0
+ && asyncDownloadCache.put(identifier, System.currentTimeMillis()) == null) {
+ downloadExecService.execute(new Runnable() {
+ @Override
+ public void run() {
+ InputStream input = null;
+ try {
+ // getStream to cache file
+ LOG.debug("Async download [{}] started.", identifier);
+ input = getStream(identifier);
+ } catch (RepositoryException re) {
+ // ignore exception
+ } finally {
+ asyncDownloadCache.remove(identifier);
+ IOUtils.closeQuietly(input);
+ LOG.debug("Async download [{}] completed.", identifier);
+ }
+ }
+ });
+ }
+ }
/**
* Returns a unique temporary file to be used for creating a new data
@@ -914,6 +970,7 @@ public abstract class CachingDataStore e
public void close() throws DataStoreException {
cache.close();
backend.close();
+ downloadExecService.shutdown();
}
/**
@@ -1063,13 +1120,15 @@ public abstract class CachingDataStore e
public void setUploadRetries(int uploadRetries) {
this.uploadRetries = uploadRetries;
}
-
-
public void setTouchAsync(boolean touchAsync) {
this.touchAsync = touchAsync;
}
+ public void setProactiveCaching(boolean proactiveCaching) {
+ this.proactiveCaching = proactiveCaching;
+ }
+
public Backend getBackend() {
return backend;
}