You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by sh...@apache.org on 2015/04/03 14:04:44 UTC

svn commit: r1671041 - in /jackrabbit/trunk: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/ jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/

Author: shashank
Date: Fri Apr  3 12:04:44 2015
New Revision: 1671041

URL: http://svn.apache.org/r1671041
Log:
JCR-3864 CachingDatastore -cache file sizes to save remote call to remote datastore( S3DS)

Enabled LRU cache of defaut size 200 ( 28KB memory footprint) of IDs Vs length. 
getRecored/getRecordIfStored optimized to use this recordLength cache.
Fixed closing inputstream in TestLocalCache.java

Modified:
    jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
    jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
    jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java

Modified: jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (original)
+++ jackrabbit/trunk/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java Fri Apr  3 12:04:44 2015
@@ -529,7 +529,7 @@ public class S3Backend implements Backen
                         && s3service.getObjectMetadata(bucket,
                             s3ObjSumm.getKey()).getLastModified().getTime() < min) {
                        
-
+                        store.deleteFromCache(identifier);
                         LOG.debug("add id [{}] to delete lists",
                             s3ObjSumm.getKey());
                         deleteList.add(new DeleteObjectsRequest.KeyVersion(

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java Fri Apr  3 12:04:44 2015
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -77,6 +78,7 @@ import org.slf4j.LoggerFactory;
  *     &lt;param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/>
  *     &lt;param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/>
  *     &lt;param name="{@link #setProactiveCaching(boolean) proactiveCaching}" value="true"/>
+ *     &lt;param name="{@link #setRecLengthCacheSize(int) recLengthCacheSize}" value="200"/>
  * &lt/DataStore>
  */
 public abstract class CachingDataStore extends AbstractDataStore implements
@@ -127,6 +129,12 @@ public abstract class CachingDataStore e
      */
     protected final Map<DataIdentifier, Long> asyncDownloadCache = new ConcurrentHashMap<DataIdentifier, Long>(5);
 
+    /**
+     * In memory cache to hold {@link DataRecord#getLength()} against
+     * {@link DataIdentifier}
+     */
+    protected Map<DataIdentifier, Long> recLenCache = null;
+
     protected Backend backend;
 
     /**
@@ -236,6 +244,12 @@ public abstract class CachingDataStore e
      * repository.xml. By default it is 100
      */
     private int asyncUploadLimit = 100;
+    
+    /**
+     * Size of {@link #recLenCache}. Each entry consumes of approx 140 bytes.
+     * Default total memory consumption of {@link #recLenCache} 28KB.
+     */
+    private int recLengthCacheSize = 200;
 
     /**
      * Initialized the data store. If the path is not set, &lt;repository
@@ -333,6 +347,25 @@ public abstract class CachingDataStore e
                 new NamedThreadFactory("backend-file-download-worker"));
             cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
                 cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
+            /*
+             * Initialize LRU cache of size {@link #recLengthCacheSize}
+             */
+            recLenCache = Collections.synchronizedMap(new LinkedHashMap<DataIdentifier, Long>(
+                recLengthCacheSize, 0.75f, true) {
+
+                private static final long serialVersionUID = -8752749075395630485L;
+
+                @Override
+                protected boolean removeEldestEntry(
+                                Map.Entry<DataIdentifier, Long> eldest) {
+                    if (size() > recLengthCacheSize) {
+                        LOG.trace("evicted from recLengthCache [{}]",
+                            eldest.getKey());
+                        return true;
+                    }
+                    return false;
+                }
+            });
         } catch (Exception e) {
             throw new RepositoryException(e);
         }
@@ -415,24 +448,17 @@ public abstract class CachingDataStore e
 
     @Override
     public DataRecord getRecord(DataIdentifier identifier)
-            throws DataStoreException {
+                    throws DataStoreException {
         String fileName = getFileName(identifier);
-        boolean existsAtBackend = false;
         try {
             if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) {
-                LOG.debug("[{}] record retrieved from asyncUploadmap",
+                LOG.debug("getRecord: [{}]  retrieved from asyncUploadmap",
                     identifier);
                 usesIdentifier(identifier);
                 return new CachingDataRecord(this, identifier);
-            } else if (cache.getFileIfStored(fileName) != null
-                || (existsAtBackend = backend.exists(identifier))) {
-                if (existsAtBackend) {
-                    LOG.debug("[{}] record retrieved from backend", identifier);
-                    asyncDownload(identifier);
-                } else {
-                    LOG.debug("[{}] record retrieved from local cache",
-                        identifier);
-                }
+            } else if (getLength(identifier) > -1) {
+                LOG.debug("getRecord: [{}]  retrieved using getLength",
+                    identifier);
                 touchInternal(identifier);
                 usesIdentifier(identifier);
                 return new CachingDataRecord(this, identifier);
@@ -453,19 +479,36 @@ public abstract class CachingDataStore e
      */
     @Override
     public DataRecord getRecordIfStored(DataIdentifier identifier)
-            throws DataStoreException {
+                    throws DataStoreException {
         String fileName = getFileName(identifier);
         try {
             if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) {
-                LOG.debug("[{}] record retrieved from asyncuploadmap",
+                LOG.debug(
+                    "getRecordIfStored: [{}]  retrieved from asyncuploadmap",
                     identifier);
                 usesIdentifier(identifier);
                 return new CachingDataRecord(this, identifier);
-            } else if (backend.exists(identifier)) {
-                LOG.debug("[{}] record retrieved from backend", identifier);
+            } else if (recLenCache.containsKey(identifier)) {
+                LOG.debug(
+                    "getRecordIfStored: [{}]  retrieved using recLenCache",
+                    identifier);
                 touchInternal(identifier);
                 usesIdentifier(identifier);
                 return new CachingDataRecord(this, identifier);
+            } else {
+                try {
+                    long length = backend.getLength(identifier);
+                    LOG.debug(
+                        "getRecordIfStored :[{}]  retrieved from backend",
+                        identifier);
+                    recLenCache.put(identifier, length);
+                    touchInternal(identifier);
+                    usesIdentifier(identifier);
+                    return new CachingDataRecord(this, identifier);
+                } catch (DataStoreException ignore) {
+                    LOG.warn(" getRecordIfStored: [{}]  not found", identifier);
+                }
+
             }
         } catch (IOException ioe) {
             throw new DataStoreException(ioe);
@@ -507,6 +550,7 @@ public abstract class CachingDataStore e
         synchronized (this) {
             try {
                 // order is important here
+                recLenCache.remove(identifier);
                 asyncWriteCache.delete(fileName);
                 backend.deleteRecord(identifier);
                 cache.delete(fileName);
@@ -520,8 +564,10 @@ public abstract class CachingDataStore e
     public synchronized int deleteAllOlderThan(long min)
             throws DataStoreException {
         Set<DataIdentifier> diSet = backend.deleteAllOlderThan(min);
+        
         // remove entries from local cache
         for (DataIdentifier identifier : diSet) {
+            recLenCache.remove(identifier);
             cache.delete(getFileName(identifier));
         }
         try {
@@ -593,13 +639,24 @@ public abstract class CachingDataStore e
      * otherwise retrieve it from {@link Backend}.
      */
     public long getLength(final DataIdentifier identifier)
-            throws DataStoreException {
+                    throws DataStoreException {
         String fileName = getFileName(identifier);
-        Long length = cache.getFileLength(fileName);
+
+        Long length = recLenCache.get(identifier);
         if (length != null) {
-            return length.longValue();
+            LOG.debug(" identifier [{}] length fetched from recLengthCache",
+                identifier);
+            return length;
+        } else if ((length = cache.getFileLength(fileName)) != null) {
+            LOG.debug(" identifier [{}] length fetched from local cache",
+                identifier);
+            recLenCache.put(identifier, length);
+            return length;
         } else {
             length = backend.getLength(identifier);
+            LOG.debug(" identifier [{}] length fetched from backend",
+                identifier);
+            recLenCache.put(identifier, length);
             asyncDownload(identifier);
             return length;
         }
@@ -618,6 +675,20 @@ public abstract class CachingDataStore e
         return asyncWriteCache.getAll();
     }
     
+    
+    public void deleteFromCache(DataIdentifier identifier)
+                    throws DataStoreException {
+        try {
+            // order is important here
+            recLenCache.remove(identifier);
+            String fileName = getFileName(identifier);
+            asyncWriteCache.delete(fileName);
+            cache.delete(fileName);
+        } catch (IOException ioe) {
+            throw new DataStoreException(ioe);
+        }
+    }
+    
     @Override
     public void onSuccess(AsyncUploadResult result) {
         DataIdentifier identifier = result.getIdentifier();
@@ -1128,6 +1199,10 @@ public abstract class CachingDataStore e
     public void setProactiveCaching(boolean proactiveCaching) {
         this.proactiveCaching = proactiveCaching;
     }
+    
+    public void setRecLengthCacheSize(int recLengthCacheSize) {
+        this.recLengthCacheSize = recLengthCacheSize;
+    }
 
     public Backend getBackend() {
         return backend;

Modified: jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java Fri Apr  3 12:04:44 2015
@@ -615,8 +615,8 @@ public class LocalCache {
                         LOG.info    ("tmp file [{}] skipped ", filePath);
                         continue;
                     }
-                    if (name.startsWith(dataStorePath)) {
-                        name = name.substring(dataStorePath.length());
+                    if (filePath.startsWith(dataStorePath)) {
+                        name = filePath.substring(dataStorePath.length());
                     }
                     if (name.startsWith("/") || name.startsWith("\\")) {
                         name = name.substring(1);

Modified: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java Fri Apr  3 12:04:44 2015
@@ -101,7 +101,7 @@ public class InMemoryBackend implements
     }
 
     @Override
-    public Set<DataIdentifier> deleteAllOlderThan(final long min) {
+    public Set<DataIdentifier> deleteAllOlderThan(final long min) throws DataStoreException {
         log("deleteAllOlderThan " + min);
         Set<DataIdentifier> tobeDeleted = new HashSet<DataIdentifier>();
         for (Map.Entry<DataIdentifier, Long> entry : timeMap.entrySet()) {
@@ -109,6 +109,7 @@ public class InMemoryBackend implements
             long timestamp = entry.getValue();
             if (timestamp < min && !store.isInUse(identifier)
                 && store.confirmDelete(identifier)) {
+                store.deleteFromCache(identifier);
                 tobeDeleted.add(identifier);
             }
         }

Modified: jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java?rev=1671041&r1=1671040&r2=1671041&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java (original)
+++ jackrabbit/trunk/jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java Fri Apr  3 12:04:44 2015
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.TestCase;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,8 +46,8 @@ public class TestLocalCache extends Test
     private static final String TEMP_DIR = "target/temp";
 
     private static final String TARGET_DIR = "target";
-    
-    protected String  cacheDirPath;
+
+    protected String cacheDirPath;
 
     protected String tempDirPath;
 
@@ -61,15 +62,23 @@ public class TestLocalCache extends Test
     protected void setUp() {
         try {
             cacheDirPath = CACHE_DIR + "-"
+                + String.valueOf(randomGen.nextInt(9999)) + "-"
                 + String.valueOf(randomGen.nextInt(9999));
             File cachedir = new File(cacheDirPath);
-            if (cachedir.exists()) FileUtils.deleteQuietly(cachedir);
+            for (int i = 0; i < 4 && cachedir.exists(); i++) {
+                FileUtils.deleteQuietly(cachedir);
+                Thread.sleep(1000);
+            }
             cachedir.mkdirs();
 
             tempDirPath = TEMP_DIR + "-"
+                + String.valueOf(randomGen.nextInt(9999)) + "-"
                 + String.valueOf(randomGen.nextInt(9999));
             File tempdir = new File(tempDirPath);
-            if (tempdir.exists()) FileUtils.deleteQuietly(tempdir);
+            for (int i = 0; i < 4 && tempdir.exists(); i++) {
+                FileUtils.deleteQuietly(tempdir);
+                Thread.sleep(1000);
+            }
             tempdir.mkdirs();
         } catch (Exception e) {
             LOG.error("error:", e);
@@ -78,15 +87,17 @@ public class TestLocalCache extends Test
     }
 
     @Override
-    protected void tearDown() throws IOException {
+    protected void tearDown() throws Exception {
         File cachedir = new File(cacheDirPath);
-        if (cachedir.exists()) {
+        for (int i = 0; i < 4 && cachedir.exists(); i++) {
             FileUtils.deleteQuietly(cachedir);
+            Thread.sleep(1000);
         }
 
         File tempdir = new File(tempDirPath);
-        if (tempdir.exists()) {
+        for (int i = 0; i < 4 && tempdir.exists(); i++) {
             FileUtils.deleteQuietly(tempdir);
+            Thread.sleep(1000);
         }
     }
 
@@ -98,8 +109,8 @@ public class TestLocalCache extends Test
             AsyncUploadCache pendingFiles = new AsyncUploadCache();
             pendingFiles.init(tempDirPath, cacheDirPath, 100);
             pendingFiles.reset();
-            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95,
-                0.70, pendingFiles);
+            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400,
+                0.95, 0.70, pendingFiles);
             Random random = new Random(12345);
             byte[] data = new byte[100];
             Map<String, byte[]> byteMap = new HashMap<String, byte[]>();
@@ -117,12 +128,15 @@ public class TestLocalCache extends Test
             cache.store("a1", new ByteArrayInputStream(byteMap.get("a1")));
             cache.store("a2", new ByteArrayInputStream(byteMap.get("a2")));
             cache.store("a3", new ByteArrayInputStream(byteMap.get("a3")));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
-                cache.getIfStored("a1"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
-                cache.getIfStored("a2"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
-                cache.getIfStored("a3"));
+            InputStream result = cache.getIfStored("a1");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a1")), result);
+            IOUtils.closeQuietly(result);
+            result = cache.getIfStored("a2");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a2")), result);
+            IOUtils.closeQuietly(result);
+            result = cache.getIfStored("a3");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+            IOUtils.closeQuietly(result);
         } catch (Exception e) {
             LOG.error("error:", e);
             fail();
@@ -138,8 +152,8 @@ public class TestLocalCache extends Test
             AsyncUploadCache pendingFiles = new AsyncUploadCache();
             pendingFiles.init(tempDirPath, cacheDirPath, 100);
             pendingFiles.reset();
-            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95,
-                0.70, pendingFiles);
+            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400,
+                0.95, 0.70, pendingFiles);
             Random random = new Random(12345);
             byte[] data = new byte[100];
             Map<String, byte[]> byteMap = new HashMap<String, byte[]>();
@@ -161,12 +175,16 @@ public class TestLocalCache extends Test
             cache.store("a1", new ByteArrayInputStream(byteMap.get("a1")));
             cache.store("a2", new ByteArrayInputStream(byteMap.get("a2")));
             cache.store("a3", new ByteArrayInputStream(byteMap.get("a3")));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
-                cache.getIfStored("a1"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
-                cache.getIfStored("a2"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
-                cache.getIfStored("a3"));
+
+            InputStream result = cache.getIfStored("a1");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a1")), result);
+            IOUtils.closeQuietly(result);
+            result = cache.getIfStored("a2");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a2")), result);
+            IOUtils.closeQuietly(result);
+            result = cache.getIfStored("a3");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+            IOUtils.closeQuietly(result);
 
             data = new byte[90];
             random.nextBytes(data);
@@ -174,18 +192,30 @@ public class TestLocalCache extends Test
             // storing a4 should purge cache
             cache.store("a4", new ByteArrayInputStream(byteMap.get("a4")));
             Thread.sleep(1000);
-            assertNull("a1 should be null", cache.getIfStored("a1"));
-            assertNull("a2 should be null", cache.getIfStored("a2"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
-                cache.getIfStored("a3"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a4")),
-                cache.getIfStored("a4"));
+
+            result = cache.getIfStored("a1");
+            assertNull("a1 should be null", result);
+            IOUtils.closeQuietly(result);
+
+            result = cache.getIfStored("a2");
+            assertNull("a2 should be null", result);
+            IOUtils.closeQuietly(result);
+
+            result = cache.getIfStored("a3");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+            IOUtils.closeQuietly(result);
+
+            result = cache.getIfStored("a4");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a4")), result);
+            IOUtils.closeQuietly(result);
+
             data = new byte[100];
             random.nextBytes(data);
             byteMap.put("a5", data);
             cache.store("a5", new ByteArrayInputStream(byteMap.get("a5")));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
-                cache.getIfStored("a3"));
+            result = cache.getIfStored("a3");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result);
+            IOUtils.closeQuietly(result);
         } catch (Exception e) {
             LOG.error("error:", e);
             fail();
@@ -201,8 +231,8 @@ public class TestLocalCache extends Test
             AsyncUploadCache pendingFiles = new AsyncUploadCache();
             pendingFiles.init(tempDirPath, cacheDirPath, 100);
             pendingFiles.reset();
-            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95,
-                0.70, pendingFiles);
+            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400,
+                0.95, 0.70, pendingFiles);
             Random random = new Random(12345);
             byte[] data = new byte[125];
             Map<String, byte[]> byteMap = new HashMap<String, byte[]>();
@@ -245,12 +275,15 @@ public class TestLocalCache extends Test
             assertTrue("should be able to add to pending upload",
                 result.canAsyncUpload());
 
-            assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
-                cache.getIfStored("a1"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
-                cache.getIfStored("a2"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
-                cache.getIfStored("a3"));
+            InputStream inp = cache.getIfStored("a1");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a1")), inp);
+            IOUtils.closeQuietly(inp);
+            inp = cache.getIfStored("a2");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a2")), inp);
+            IOUtils.closeQuietly(inp);
+            inp = cache.getIfStored("a3");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a3")), inp);
+            IOUtils.closeQuietly(inp);
 
             data = new byte[90];
             random.nextBytes(data);
@@ -266,13 +299,17 @@ public class TestLocalCache extends Test
                 result.canAsyncUpload());
             Thread.sleep(1000);
 
-            assertEquals(new ByteArrayInputStream(byteMap.get("a1")),
-                cache.getIfStored("a1"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a2")),
-                cache.getIfStored("a2"));
-            assertEquals(new ByteArrayInputStream(byteMap.get("a3")),
-                cache.getIfStored("a3"));
-            assertNull("a4 should be null", cache.getIfStored("a4"));
+            inp = cache.getIfStored("a1");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a1")), inp);
+            IOUtils.closeQuietly(inp);
+            inp = cache.getIfStored("a2");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a2")), inp);
+            IOUtils.closeQuietly(inp);
+            inp = cache.getIfStored("a3");
+            assertEquals(new ByteArrayInputStream(byteMap.get("a3")), inp);
+            IOUtils.closeQuietly(inp);
+            inp = cache.getIfStored("a4");
+            assertNull("a4 should be null", inp);
         } catch (Exception e) {
             LOG.error("error:", e);
             fail();
@@ -288,8 +325,8 @@ public class TestLocalCache extends Test
             AsyncUploadCache pendingFiles = new AsyncUploadCache();
             pendingFiles.init(tempDirPath, cacheDirPath, 100);
             pendingFiles.reset();
-            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 10000000,
-                0.95, 0.70, pendingFiles);
+            LocalCache cache = new LocalCache(cacheDirPath, tempDirPath,
+                10000000, 0.95, 0.70, pendingFiles);
             Random random = new Random(12345);
             int fileUploads = 1000;
             Map<String, byte[]> byteMap = new HashMap<String, byte[]>(
@@ -303,11 +340,11 @@ public class TestLocalCache extends Test
                 cache.store(key, new ByteArrayInputStream(byteMap.get(key)));
             }
             cache.close();
-            
+
             ExecutorService executor = Executors.newFixedThreadPool(10,
                 new NamedThreadFactory("localcache-store-worker"));
-            cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, 0.95, 0.70,
-                pendingFiles);
+            cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, 0.95,
+                0.70, pendingFiles);
             executor.execute(new StoreWorker(cache, byteMap));
             executor.shutdown();
             while (!executor.awaitTermination(15, TimeUnit.SECONDS)) {
@@ -318,7 +355,6 @@ public class TestLocalCache extends Test
         }
     }
 
-
     private class StoreWorker implements Runnable {
         Map<String, byte[]> byteMap;
 
@@ -345,11 +381,12 @@ public class TestLocalCache extends Test
             }
         }
     }
+
     /**
      * Assert two inputstream
      */
     protected void assertEquals(InputStream a, InputStream b)
-            throws IOException {
+                    throws IOException {
         while (true) {
             int ai = a.read();
             int bi = b.read();
@@ -358,6 +395,8 @@ public class TestLocalCache extends Test
                 break;
             }
         }
+        IOUtils.closeQuietly(a);
+        IOUtils.closeQuietly(b);
     }
 
 }