You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by sh...@apache.org on 2015/04/03 14:04:44 UTC
svn commit: r1671041 - in /jackrabbit/trunk:
jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/
jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/
jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/
Author: shashank
Date: Fri Apr 3 12:04:44 2015
New Revision: 1671041
URL: http://svn.apache.org/r1671041
Log:
JCR-3864 CachingDatastore -cache file sizes to save remote call to remote datastore( S3DS)
Enabled LRU cache of defaut size 200 ( 28KB memory footprint) of IDs Vs length.
getRecored/getRecordIfStored optimized to use this recordLength cache.
Fixed closing inputstream in TestLocalCache.java
Modified:
jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java
jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java
Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java Fri Apr 3 12:04:44 2015
@@ -529,7 +529,7 @@ public class S3Backend implements Backen
&& s3service.getObjectMetadata(bucket,
s3ObjSumm.getKey()).getLastModified().getTime() < min) {
-
+ store.deleteFromCache(identifier);
LOG.debug("add id [{}] to delete lists",
s3ObjSumm.getKey());
deleteList.add(new DeleteObjectsRequest.KeyVersion(
Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Fri Apr 3 12:04:44 2015
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,6 +78,7 @@ import org.slf4j.LoggerFactory;
* <param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/>
* <param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/>
* <param name="{@link #setProactiveCaching(boolean) proactiveCaching}" value="true"/>
+ * <param name="{@link #setRecLengthCacheSize(int) recLengthCacheSize}" value="200"/>
* </DataStore>
*/
public abstract class CachingDataStore extends AbstractDataStore implements
@@ -127,6 +129,12 @@ public abstract class CachingDataStore e
*/
protected final Map<DataIdentifier, Long> asyncDownloadCache = new ConcurrentHashMap<DataIdentifier, Long>(5);
+ /**
+ * In memory cache to hold {@link DataRecord#getLength()} against
+ * {@link DataIdentifier}
+ */
+ protected Map<DataIdentifier, Long> recLenCache = null;
+
protected Backend backend;
/**
@@ -236,6 +244,12 @@ public abstract class CachingDataStore e
* repository.xml. By default it is 100
*/
private int asyncUploadLimit = 100;
+
+ /**
+ * Size of {@link #recLenCache}. Each entry consumes of approx 140 bytes.
+ * Default total memory consumption of {@link #recLenCache} 28KB.
+ */
+ private int recLengthCacheSize = 200;
/**
* Initialized the data store. If the path is not set, <repository
@@ -333,6 +347,25 @@ public abstract class CachingDataStore e
new NamedThreadFactory("backend-file-download-worker"));
cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
+ /*
+ * Initialize LRU cache of size {@link #recLengthCacheSize}
+ */
+ recLenCache = Collections.synchronizedMap(new LinkedHashMap<DataIdentifier, Long>(
+ recLengthCacheSize, 0.75f, true) {
+
+ private static final long serialVersionUID = -8752749075395630485L;
+
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<DataIdentifier, Long> eldest) {
+ if (size() > recLengthCacheSize) {
+ LOG.trace("evicted from recLengthCache [{}]",
+ eldest.getKey());
+ return true;
+ }
+ return false;
+ }
+ });
} catch (Exception e) {
throw new RepositoryException(e);
}
@@ -415,24 +448,17 @@ public abstract class CachingDataStore e
@Override
public DataRecord getRecord(DataIdentifier identifier)
- throws DataStoreException {
+ throws DataStoreException {
String fileName = getFileName(identifier);
- boolean existsAtBackend = false;
try {
if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) {
- LOG.debug("[{}] record retrieved from asyncUploadmap",
+ LOG.debug("getRecord: [{}] retrieved from asyncUploadmap",
identifier);
usesIdentifier(identifier);
return new CachingDataRecord(this, identifier);
- } else if (cache.getFileIfStored(fileName) != null
- || (existsAtBackend = backend.exists(identifier))) {
- if (existsAtBackend) {
- LOG.debug("[{}] record retrieved from backend", identifier);
- asyncDownload(identifier);
- } else {
- LOG.debug("[{}] record retrieved from local cache",
- identifier);
- }
+ } else if (getLength(identifier) > -1) {
+ LOG.debug("getRecord: [{}] retrieved using getLength",
+ identifier);
touchInternal(identifier);
usesIdentifier(identifier);
return new CachingDataRecord(this, identifier);
@@ -453,19 +479,36 @@ public abstract class CachingDataStore e
*/
@Override
public DataRecord getRecordIfStored(DataIdentifier identifier)
- throws DataStoreException {
+ throws DataStoreException {
String fileName = getFileName(identifier);
try {
if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) {
- LOG.debug("[{}] record retrieved from asyncuploadmap",
+ LOG.debug(
+ "getRecordIfStored: [{}] retrieved from asyncuploadmap",
identifier);
usesIdentifier(identifier);
return new CachingDataRecord(this, identifier);
- } else if (backend.exists(identifier)) {
- LOG.debug("[{}] record retrieved from backend", identifier);
+ } else if (recLenCache.containsKey(identifier)) {
+ LOG.debug(
+ "getRecordIfStored: [{}] retrieved using recLenCache",
+ identifier);
touchInternal(identifier);
usesIdentifier(identifier);
return new CachingDataRecord(this, identifier);
+ } else {
+ try {
+ long length = backend.getLength(identifier);
+ LOG.debug(
+ "getRecordIfStored :[{}] retrieved from backend",
+ identifier);
+ recLenCache.put(identifier, length);
+ touchInternal(identifier);
+ usesIdentifier(identifier);
+ return new CachingDataRecord(this, identifier);
+ } catch (DataStoreException ignore) {
+ LOG.warn(" getRecordIfStored: [{}] not found", identifier);
+ }
+
}
} catch (IOException ioe) {
throw new DataStoreException(ioe);
@@ -507,6 +550,7 @@ public abstract class CachingDataStore e
synchronized (this) {
try {
// order is important here
+ recLenCache.remove(identifier);
asyncWriteCache.delete(fileName);
backend.deleteRecord(identifier);
cache.delete(fileName);
@@ -520,8 +564,10 @@ public abstract class CachingDataStore e
public synchronized int deleteAllOlderThan(long min)
throws DataStoreException {
Set<DataIdentifier> diSet = backend.deleteAllOlderThan(min);
+
// remove entries from local cache
for (DataIdentifier identifier : diSet) {
+ recLenCache.remove(identifier);
cache.delete(getFileName(identifier));
}
try {
@@ -593,13 +639,24 @@ public abstract class CachingDataStore e
* otherwise retrieve it from {@link Backend}.
*/
public long getLength(final DataIdentifier identifier)
- throws DataStoreException {
+ throws DataStoreException {
String fileName = getFileName(identifier);
- Long length = cache.getFileLength(fileName);
+
+ Long length = recLenCache.get(identifier);
if (length != null) {
- return length.longValue();
+ LOG.debug(" identifier [{}] length fetched from recLengthCache",
+ identifier);
+ return length;
+ } else if ((length = cache.getFileLength(fileName)) != null) {
+ LOG.debug(" identifier [{}] length fetched from local cache",
+ identifier);
+ recLenCache.put(identifier, length);
+ return length;
} else {
length = backend.getLength(identifier);
+ LOG.debug(" identifier [{}] length fetched from backend",
+ identifier);
+ recLenCache.put(identifier, length);
asyncDownload(identifier);
return length;
}
@@ -618,6 +675,20 @@ public abstract class CachingDataStore e
return asyncWriteCache.getAll();
}
+
+ public void deleteFromCache(DataIdentifier identifier)
+ throws DataStoreException {
+ try {
+ // order is important here
+ recLenCache.remove(identifier);
+ String fileName = getFileName(identifier);
+ asyncWriteCache.delete(fileName);
+ cache.delete(fileName);
+ } catch (IOException ioe) {
+ throw new DataStoreException(ioe);
+ }
+ }
+
@Override
public void onSuccess(AsyncUploadResult result) {
DataIdentifier identifier = result.getIdentifier();
@@ -1128,6 +1199,10 @@ public abstract class CachingDataStore e
public void setProactiveCaching(boolean proactiveCaching) {
this.proactiveCaching = proactiveCaching;
}
+
+ public void setRecLengthCacheSize(int recLengthCacheSize) {
+ this.recLengthCacheSize = recLengthCacheSize;
+ }
public Backend getBackend() {
return backend;
Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java Fri Apr 3 12:04:44 2015
@@ -615,8 +615,8 @@ public class LocalCache {
LOG.info ("tmp file [{}] skipped ", filePath);
continue;
}
- if (name.startsWith(dataStorePath)) {
- name = name.substring(dataStorePath.length());
+ if (filePath.startsWith(dataStorePath)) {
+ name = filePath.substring(dataStorePath.length());
}
if (name.startsWith("/") || name.startsWith("\\")) {
name = name.substring(1);
Modified: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java Fri Apr 3 12:04:44 2015
@@ -101,7 +101,7 @@ public class InMemoryBackend implements
}
@Override
- public Set<DataIdentifier> deleteAllOlderThan(final long min) {
+ public Set<DataIdentifier> deleteAllOlderThan(final long min) throws DataStoreException {
log("deleteAllOlderThan " + min);
Set<DataIdentifier> tobeDeleted = new HashSet<DataIdentifier>();
for (Map.Entry<DataIdentifier, Long> entry : timeMap.entrySet()) {
@@ -109,6 +109,7 @@ public class InMemoryBackend implements
long timestamp = entry.getValue();
if (timestamp < min && !store.isInUse(identifier)
&& store.confirmDelete(identifier)) {
+ store.deleteFromCache(identifier);
tobeDeleted.add(identifier);
}
}
Modified: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java Fri Apr 3 12:04:44 2015
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +46,8 @@ public class TestLocalCache extends Test
private static final String TEMP_DIR = "target/temp";
private static final String TARGET_DIR = "target";
-
- protected String cacheDirPath;
+
+ protected String cacheDirPath;
protected String tempDirPath;
@@ -61,15 +62,23 @@ public class TestLocalCache extends Test
protected void setUp() {
try {
cacheDirPath = CACHE_DIR + "-"
+ + String.valueOf(randomGen.nextInt(9999)) + "-"
+ String.valueOf(randomGen.nextInt(9999));
File cachedir = new File(cacheDirPath);
- if (cachedir.exists()) FileUtils.deleteQuietly(cachedir);
+ for (int i = 0; i < 4 && cachedir.exists(); i++) {
+ FileUtils.deleteQuietly(cachedir);
+ Thread.sleep(1000);
+ }
cachedir.mkdirs();
tempDirPath = TEMP_DIR + "-"
+ + String.valueOf(randomGen.nextInt(9999)) + "-"
+ String.valueOf(randomGen.nextInt(9999));
File tempdir = new File(tempDirPath);
- if (tempdir.exists()) FileUtils.deleteQuietly(tempdir);
+ for (int i = 0; i < 4 && tempdir.exists(); i++) {
+ FileUtils.deleteQuietly(tempdir);
+ Thread.sleep(1000);
+ }
tempdir.mkdirs();
} catch (Exception e) {
LOG.error("error:", e);
@@ -78,15 +87,17 @@ public class TestLocalCache extends Test
}
@Override
- protected void tearDown() throws IOException {
+ protected void tearDown() throws Exception {
File cachedir = new File(cacheDirPath);
- if (cachedir.exists()) {
+ for (int i = 0; i < 4 && cachedir.exists(); i++) {
FileUtils.deleteQuietly(cachedir);
+ Thread.sleep(1000);
}
File tempdir = new File(tempDirPath);
- if (tempdir.exists()) {
+ for (int i = 0; i < 4 && tempdir.exists(); i++) {
FileUtils.deleteQuietly(tempdir);
+ Thread.sleep(1000);
}
}
@@ -98,8 +109,8 @@ public class TestLocalCache extends Test
AsyncUploadCache pendingFiles = new AsyncUploadCache();
pendingFiles.init(tempDirPath, cacheDirPath, 100);
pendingFiles.reset();
- LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95,
- 0.70, pendingFiles);
+ LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400,
+ 0.95, 0.70, pendingFiles);
Random random = new Random(12345);
byte[] data = new byte[100];
Map<String, byte[]> byteMap = new HashMap<String, byte[]>();
@@ -117,12 +128,15 @@ public class TestLocalCache extends Test
cache.store("a1", new ByteArrayInputStream(byteMap.get("a1")));
cache.store("a2", new ByteArrayInputStream(byteMap.get("a2")));
cache.store("a3", new ByteArrayInputStream(byteMap.get("a3")));
- assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
- cache.getIfStored("a1"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
- cache.getIfStored("a2"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
- cache.getIfStored("a3"));
+ InputStream result = cache.getIfStored("a1");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a1")), result);
+ IOUtils.closeQuietly(result);
+ result = cache.getIfStored("a2");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a2")), result);
+ IOUtils.closeQuietly(result);
+ result = cache.getIfStored("a3");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+ IOUtils.closeQuietly(result);
} catch (Exception e) {
LOG.error("error:", e);
fail();
@@ -138,8 +152,8 @@ public class TestLocalCache extends Test
AsyncUploadCache pendingFiles = new AsyncUploadCache();
pendingFiles.init(tempDirPath, cacheDirPath, 100);
pendingFiles.reset();
- LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95,
- 0.70, pendingFiles);
+ LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400,
+ 0.95, 0.70, pendingFiles);
Random random = new Random(12345);
byte[] data = new byte[100];
Map<String, byte[]> byteMap = new HashMap<String, byte[]>();
@@ -161,12 +175,16 @@ public class TestLocalCache extends Test
cache.store("a1", new ByteArrayInputStream(byteMap.get("a1")));
cache.store("a2", new ByteArrayInputStream(byteMap.get("a2")));
cache.store("a3", new ByteArrayInputStream(byteMap.get("a3")));
- assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
- cache.getIfStored("a1"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
- cache.getIfStored("a2"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
- cache.getIfStored("a3"));
+
+ InputStream result = cache.getIfStored("a1");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a1")), result);
+ IOUtils.closeQuietly(result);
+ result = cache.getIfStored("a2");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a2")), result);
+ IOUtils.closeQuietly(result);
+ result = cache.getIfStored("a3");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+ IOUtils.closeQuietly(result);
data = new byte[90];
random.nextBytes(data);
@@ -174,18 +192,30 @@ public class TestLocalCache extends Test
// storing a4 should purge cache
cache.store("a4", new ByteArrayInputStream(byteMap.get("a4")));
Thread.sleep(1000);
- assertNull("a1 should be null", cache.getIfStored("a1"));
- assertNull("a2 should be null", cache.getIfStored("a2"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
- cache.getIfStored("a3"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a4")),
- cache.getIfStored("a4"));
+
+ result = cache.getIfStored("a1");
+ assertNull("a1 should be null", result);
+ IOUtils.closeQuietly(result);
+
+ result = cache.getIfStored("a2");
+ assertNull("a2 should be null", result);
+ IOUtils.closeQuietly(result);
+
+ result = cache.getIfStored("a3");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+ IOUtils.closeQuietly(result);
+
+ result = cache.getIfStored("a4");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a4")), result);
+ IOUtils.closeQuietly(result);
+
data = new byte[100];
random.nextBytes(data);
byteMap.put("a5", data);
cache.store("a5", new ByteArrayInputStream(byteMap.get("a5")));
- assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
- cache.getIfStored("a3"));
+ result = cache.getIfStored("a3");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+ IOUtils.closeQuietly(result);
} catch (Exception e) {
LOG.error("error:", e);
fail();
@@ -201,8 +231,8 @@ public class TestLocalCache extends Test
AsyncUploadCache pendingFiles = new AsyncUploadCache();
pendingFiles.init(tempDirPath, cacheDirPath, 100);
pendingFiles.reset();
- LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95,
- 0.70, pendingFiles);
+ LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400,
+ 0.95, 0.70, pendingFiles);
Random random = new Random(12345);
byte[] data = new byte[125];
Map<String, byte[]> byteMap = new HashMap<String, byte[]>();
@@ -245,12 +275,15 @@ public class TestLocalCache extends Test
assertTrue("should be able to add to pending upload",
result.canAsyncUpload());
- assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
- cache.getIfStored("a1"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
- cache.getIfStored("a2"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
- cache.getIfStored("a3"));
+ InputStream inp = cache.getIfStored("a1");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a1")), inp);
+ IOUtils.closeQuietly(inp);
+ inp = cache.getIfStored("a2");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a2")), inp);
+ IOUtils.closeQuietly(inp);
+ inp = cache.getIfStored("a3");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), inp);
+ IOUtils.closeQuietly(inp);
data = new byte[90];
random.nextBytes(data);
@@ -266,13 +299,17 @@ public class TestLocalCache extends Test
result.canAsyncUpload());
Thread.sleep(1000);
- assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
- cache.getIfStored("a1"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
- cache.getIfStored("a2"));
- assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
- cache.getIfStored("a3"));
- assertNull("a4 should be null", cache.getIfStored("a4"));
+ inp = cache.getIfStored("a1");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a1")), inp);
+ IOUtils.closeQuietly(inp);
+ inp = cache.getIfStored("a2");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a2")), inp);
+ IOUtils.closeQuietly(inp);
+ inp = cache.getIfStored("a3");
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), inp);
+ IOUtils.closeQuietly(inp);
+ inp = cache.getIfStored("a4");
+ assertNull("a4 should be null", inp);
} catch (Exception e) {
LOG.error("error:", e);
fail();
@@ -288,8 +325,8 @@ public class TestLocalCache extends Test
AsyncUploadCache pendingFiles = new AsyncUploadCache();
pendingFiles.init(tempDirPath, cacheDirPath, 100);
pendingFiles.reset();
- LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 10000000,
- 0.95, 0.70, pendingFiles);
+ LocalCache cache = new LocalCache(cacheDirPath, tempDirPath,
+ 10000000, 0.95, 0.70, pendingFiles);
Random random = new Random(12345);
int fileUploads = 1000;
Map<String, byte[]> byteMap = new HashMap<String, byte[]>(
@@ -303,11 +340,11 @@ public class TestLocalCache extends Test
cache.store(key, new ByteArrayInputStream(byteMap.get(key)));
}
cache.close();
-
+
ExecutorService executor = Executors.newFixedThreadPool(10,
new NamedThreadFactory("localcache-store-worker"));
- cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, 0.95, 0.70,
- pendingFiles);
+ cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, 0.95,
+ 0.70, pendingFiles);
executor.execute(new StoreWorker(cache, byteMap));
executor.shutdown();
while (!executor.awaitTermination(15, TimeUnit.SECONDS)) {
@@ -318,7 +355,6 @@ public class TestLocalCache extends Test
}
}
-
private class StoreWorker implements Runnable {
Map<String, byte[]> byteMap;
@@ -345,11 +381,12 @@ public class TestLocalCache extends Test
}
}
}
+
/**
* Assert two inputstream
*/
protected void assertEquals(InputStream a, InputStream b)
- throws IOException {
+ throws IOException {
while (true) {
int ai = a.read();
int bi = b.read();
@@ -358,6 +395,8 @@ public class TestLocalCache extends Test
break;
}
}
+ IOUtils.closeQuietly(a);
+ IOUtils.closeQuietly(b);
}
}