You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:49:49 UTC

svn commit: r1766330 - in /jackrabbit/oak/trunk: oak-blob-cloud/ oak-blob/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/

Author: amitj
Date: Mon Oct 24 04:49:49 2016
New Revision: 1766330

URL: http://svn.apache.org/viewvc?rev=1766330&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore

* Implementation for local file system DataStore cache which includes:
** FileCache to cache downloaded files on the local fs. Uses in-memory CacheLIRS map to handle evictions and loading from backend
** An UploadStagingCache which uploads blobs asynchronously to the backend
** CacheStats for both types of cache
** CompositeDataStoreCache which supports both caching for upload and download

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
    jackrabbit/oak/trunk/oak-blob/pom.xml

Modified: jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/pom.xml?rev=1766330&r1=1766329&r2=1766330&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/pom.xml Mon Oct 24 04:49:49 2016
@@ -101,6 +101,11 @@
 
         <dependency>
             <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-blob</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-core</artifactId>
             <version>${project.version}</version>
         </dependency>

Modified: jackrabbit/oak/trunk/oak-blob/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/pom.xml?rev=1766330&r1=1766329&r2=1766330&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob/pom.xml Mon Oct 24 04:49:49 2016
@@ -78,6 +78,11 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>jackrabbit-data</artifactId>
+      <version>${jackrabbit.version}</version>
+    </dependency>
     <!-- Dependencies to other Oak components -->
     <dependency>
       <groupId>org.apache.jackrabbit</groupId>

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.annotation.Nullable;
+
+import com.google.common.cache.AbstractCache;
+import com.google.common.cache.CacheLoader;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ */
+public class CompositeDataStoreCache extends AbstractCache<String, File> implements Closeable {
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeDataStoreCache.class);
+
+    /**
+     * Cache for downloaded blobs
+     */
+    private final FileCache downloadCache;
+
+    /**
+     * Cache for staging async uploads
+     */
+    private final UploadStagingCache stagingCache;
+
+    /**
+     * The directory where the files are created.
+     */
+    private final File directory;
+
+    public CompositeDataStoreCache(String path, long size,
+        int uploadSplitPercentage, int uploadThreads, CacheLoader<String, InputStream> loader,
+        final StagingUploader uploader, StatisticsProvider statsProvider,
+        ListeningExecutorService executor,
+        ScheduledExecutorService scheduledExecutor /* purge scheduled executor */,
+        long purgeInterval /* async purge interval */) {
+
+        checkArgument(uploadSplitPercentage >= 0 && uploadSplitPercentage < 100,
+            "Upload percentage should be between 0 and 100");
+
+        this.directory = new File(path);
+
+        long uploadSize = (size * uploadSplitPercentage)/100;
+
+        this.downloadCache =
+            new FileCache((size - uploadSize), directory, loader, null);
+
+        this.stagingCache =
+            new UploadStagingCache(directory, uploadThreads, uploadSize, uploader, downloadCache,
+                statsProvider, executor, scheduledExecutor, purgeInterval);
+    }
+
+    @Nullable
+    public File getIfPresent(String key) {
+        // Check if the file scheduled for async upload
+        File staged = stagingCache.getIfPresent(key);
+        if (staged != null && staged.exists()) {
+            return staged;
+        }
+        return downloadCache.getIfPresent(key);
+    }
+
+    @Nullable
+    @Override
+    public File getIfPresent(Object key) {
+        return getIfPresent((String) key);
+    }
+
+    public File get(String key) throws IOException {
+        try {
+            // Check if the file scheduled for async upload
+            File staged = stagingCache.getIfPresent(key);
+            if (staged != null && staged.exists()) {
+                return staged;
+            }
+            // get from cache and download if not available
+            return downloadCache.get(key);
+        } catch (IOException e) {
+            LOG.error("Error loading [{}] from cache", key);
+            throw e;
+        }
+    }
+
+    @Override
+    public void invalidate(Object key) {
+        downloadCache.invalidate(key);
+    }
+
+    public boolean stage(String key, File file) {
+        return stagingCache.put(key, file).isPresent();
+    }
+
+
+    public DataStoreCacheStatsMBean getStagingCacheStats() {
+        return stagingCache.getStats();
+    }
+
+    public DataStoreCacheStatsMBean getCacheStats() {
+        return downloadCache.getStats();
+    }
+
+    @Override
+    public void close() throws IOException {
+        downloadCache.close();
+        stagingCache.close();
+    }
+
+    UploadStagingCache getStagingCache() {
+        return stagingCache;
+    }
+
+    FileCache getDownloadCache() {
+        return downloadCache;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,14 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
+
+/**
+ */
+public interface DataStoreCacheStatsMBean extends CacheStatsMBean {
+    /**
+     * Total weight of the in-memory cache
+     * @return to weight of the cache
+     */
+    //Computing weight is costly hence its an operation
+    long estimateCurrentMemoryWeight();
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.AbstractCache;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.Weigher;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.FilenameUtils.normalizeNoEndSeparator;
+
+/**
+ */
+public class FileCache extends AbstractCache<String, File> implements Closeable {
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
+
+    /**
+     * The cacheRoot directory of the cache.
+     */
+    private final File cacheRoot;
+
+    private final CacheLIRS<String, File> cache;
+
+    private final FileCacheStats cacheStats;
+
+    private final ExecutorService executor;
+
+    /**
+     * Convert the size calculation to KB to support max file size of 2 TB
+     */
+    private final Weigher<String, File> weigher = new Weigher<String, File>() {
+        @Override public int weigh(String key, File value) {
+            return Math.round(value.length() / (4 * 1024)); // convert to KB
+        }};
+
+    //Rough estimate of the in-memory key, value pair
+    private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
+        @Override public int weigh(String key, File value) {
+            return (StringUtils.estimateMemoryUsage(key) +
+                StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
+        }};
+
+    public FileCache(long maxSize /** bytes **/, File root,
+        final CacheLoader<String, InputStream> loader,
+        @Nullable final ExecutorService executor) {
+
+        this.cacheRoot = new File(root, "download");
+
+        /* convert to 4 KB block */
+        long size = Math.round(maxSize / (1024L * 4));
+
+        cache = new CacheLIRS.Builder<String, File>()
+            .maximumWeight(size)
+            .recordStats()
+            .weigher(weigher)
+            .evictionCallback(new EvictionCallback<String, File>() {
+                @Override
+                public void evicted(@Nonnull String key, @Nullable File cachedFile,
+                    @Nonnull RemovalCause cause) {
+                    try {
+                        if (cachedFile != null && cachedFile.exists()) {
+                            forceDelete(cachedFile);
+                            LOG.info("File [{}] evicted with reason [{}]", cachedFile, cause
+                                .toString());
+                        }
+                    } catch (IOException e) {
+                        LOG.info("Cached file deletion failed after eviction", e);
+                    }
+                }})
+            .build(new CacheLoader<String, File>() {
+                @Override
+                public File load(String key) throws Exception {
+                    // Fetch from local cache directory and if not found load from backend
+                    File cachedFile = getCacheFile(key);
+                    if (cachedFile.exists()) {
+                        return cachedFile;
+                    } else {
+                        InputStream is = null;
+                        boolean threw = true;
+                        try {
+                            is = loader.load(key);
+                            copyInputStreamToFile(is, cachedFile);
+                            threw = false;
+                        } finally {
+                            Closeables.close(is, threw);
+                        }
+                        return cachedFile;
+                    }
+                }
+            });
+        this.cacheStats =
+            new FileCacheStats(cache, "DataStore-DownloadCache", weigher, memWeigher, maxSize);
+
+        //  TODO: Check persisting the in-memory map and initializing Vs building from fs
+        // Build in-memory cache asynchronously from the file system entries
+        if (executor == null) {
+            this.executor = Executors.newSingleThreadExecutor();
+        } else {
+            this.executor = executor;
+        }
+        this.executor.submit(new CacheBuildJob());
+    }
+
+    /**
+     * Puts the given key and file into the cache.
+     * The file is moved to the cache. So, the original file
+     * won't be available after this operation. It can be retrieved
+     * using {@link #getIfPresent(String)}.
+     *
+     * @param key of the file
+     * @param file to put into cache
+     */
+    @Override
+    public void put(String key, File file) {
+        try {
+            File cached = getCacheFile(key);
+            if (!cached.exists()) {
+                FileUtils.moveFile(file, cached);
+            }
+            cache.put(key, cached);
+        } catch (IOException e) {
+            LOG.error("Exception adding id [{}] with file [{}] to cache", key, file);
+        }
+    }
+
+    public boolean containsKey(String key) {
+        return cache.containsKey(key);
+    }
+
+    /**
+     * Retrieves the file handle from the cache if present and null otherwise.
+     *
+     * @param key of the file to retrieve
+     * @return File handle if available
+     */
+    @Nullable
+    public File getIfPresent(String key) {
+        try {
+            if (cache.containsKey(key)) {
+                return cache.get(key);
+            }
+        } catch (Exception e) {
+            LOG.error("Error in retrieving [{}] from cache", key, e);
+        }
+        return null;
+    }
+
+    @Nullable
+    @Override
+    public File getIfPresent(Object key) {
+        return getIfPresent((String) key);
+    }
+
+    public File get(String key) throws IOException {
+        try {
+            // get from cache and download if not available
+            return cache.get(key);
+        } catch (ExecutionException e) {
+            LOG.error("Error loading [{}] from cache", key);
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void invalidate(Object key) {
+        cache.invalidate(key);
+    }
+
+    public DataStoreCacheStatsMBean getStats() {
+        return cacheStats;
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.info("Cache stats on close [{}]", cacheStats.cacheInfoAsString());
+        new ExecutorCloser(executor).close();
+    }
+
+    /**
+     * Called to initialize the in-memory cache from the fs folder
+     */
+    private class CacheBuildJob implements Callable {
+        @Override
+        public Integer call() {
+            Stopwatch watch = Stopwatch.createStarted();
+            int count = build();
+            LOG.info("Cache built with [{}] files from file system in [{}] seconds",
+                count, watch.elapsed(TimeUnit.SECONDS));
+            return count;
+        }
+    }
+
+    /**
+     * Retrieves all the files present in the fs cache folder and builds the in-memory cache.
+     */
+    private int build() {
+        int count = 0;
+        // Iterate over all files in the cache folder
+        Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(cacheRoot)
+            .filter(new Predicate<File>() {
+                @Override public boolean apply(File input) {
+                    return input.isFile() && !normalizeNoEndSeparator(input.getParent())
+                        .equals(cacheRoot.getAbsolutePath());
+                }
+            }).iterator();
+        while (iter.hasNext()) {
+            File toBeSyncedFile = iter.next();
+            try {
+                put(toBeSyncedFile.getName(), toBeSyncedFile);
+                count++;
+                LOG.trace("Added file [{}} to in-memory cache", toBeSyncedFile);
+            } catch (Exception e) {
+                LOG.error("Error in putting cached file in map[{}]", toBeSyncedFile);
+            }
+        }
+        LOG.trace("[{}] files put in im-memory cache", count);
+        return count;
+    }
+
+    /**
+     * Create a placeholder in the file system cache folder for the given identifier.
+     *
+     * @param key
+     * @return File handle for the id
+     */
+    private File getCacheFile(String key) {
+        File file = cacheRoot;
+        file = new File(file, key.substring(0, 2));
+        file = new File(file, key.substring(2, 4));
+        file = new File(file, key.substring(4, 6));
+        return new File(file, key);
+    }
+}
+
+class FileCacheStats extends CacheStats implements DataStoreCacheStatsMBean {
+    private static final long KB = 4 * 1024;
+    private final Weigher<Object, Object> memWeigher;
+    private final Weigher<Object, Object> weigher;
+    private final Cache<Object, Object> cache;
+
+    /**
+     * Construct the cache stats object.
+     *
+     * @param cache     the cache
+     * @param name      the name of the cache
+     * @param weigher   the weigher used to estimate the current weight
+     * @param maxWeight the maximum weight
+     */
+    public FileCacheStats(Cache<?, ?> cache, String name, Weigher<?, ?> weigher,
+        Weigher<?, ?> memWeigher, long maxWeight) {
+        super(cache, name, weigher, maxWeight);
+        this.memWeigher = (Weigher<Object, Object>) memWeigher;
+        this.weigher = (Weigher<Object, Object>) weigher;
+        this.cache = (Cache<Object, Object>) cache;
+    }
+
+    @Override
+    public long estimateCurrentMemoryWeight() {
+        if (memWeigher == null) {
+            return -1;
+        }
+        long size = 0;
+        for (Map.Entry<?, ?> e : cache.asMap().entrySet()) {
+            Object k = e.getKey();
+            Object v = e.getValue();
+            size += memWeigher.weigh(k, v);
+        }
+        return size;
+    }
+
+    @Override
+    public long estimateCurrentWeight() {
+        if (weigher == null) {
+            return -1;
+        }
+        long size = 0;
+        for (Map.Entry<?, ?> e : cache.asMap().entrySet()) {
+            Object k = e.getKey();
+            Object v = e.getValue();
+            size += weigher.weigh(k, v) * KB;
+        }
+        return size;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Objects.toStringHelper;
+import static java.lang.String.format;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+/**
+ * Cache for staging async uploads. This serves as a temporary cache for serving local
+ * requests till the time the upload has not been synced with the backend.
+ * <p>
+ * The appropriate backend for this cache are wrapped in {@link StagingUploader}
+ * implementations.
+ * <p>
+ * Stats:
+ * - Status for a particular upload
+ * - Upload time
+ */
+public class UploadStagingCache implements Closeable {
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
+
+    //Rough estimate of the in-memory key, value pair
+    private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
+        @Override public int weigh(String key, File value) {
+            return (StringUtils.estimateMemoryUsage(key) +
+                StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
+        }};
+
+    /**
+     * Max size of the upload staging cache in bytes
+     */
+    private final long size;
+
+    /**
+     * Current cache size in bytes
+     */
+    private final AtomicLong currentSize;
+
+    /**
+     * Executor for async uploads
+     */
+    private ListeningExecutorService executor;
+
+    /**
+     * Scheduled executor for build and remove
+     */
+    private ScheduledExecutorService removeExecutor;
+
+    /**
+     * In memory map for staged files
+     */
+    private final ConcurrentMap<String, File> map;
+
+    /**
+     * In memory map for files to be deleted after uploads
+     */
+    private final ConcurrentMap<String, File> attic;
+
+    /**
+     * Local directory where uploads are staged
+     */
+    private final File uploadCacheSpace;
+
+    /**
+     * Wrapper to where the blobs are uploaded/written
+     */
+    private final StagingUploader uploader;
+
+    /**
+     * Cache stats
+     */
+    private final StagingCacheStats cacheStats;
+
+    /**
+     * Handle for download cache if any
+     */
+    @Nullable
+    private final FileCache downloadCache;
+
+    public UploadStagingCache(File dir, int uploadThreads, long size /** bytes **/,
+        StagingUploader uploader,
+        @Nullable FileCache cache, StatisticsProvider statisticsProvider,
+        @Nullable ListeningExecutorService executor,
+        @Nullable ScheduledExecutorService scheduledExecutor,
+        long removalPeriod) {
+
+        this.currentSize = new AtomicLong();
+        this.size = size;
+        this.executor = executor;
+        if (executor == null) {
+            this.executor = MoreExecutors.listeningDecorator(Executors
+                .newFixedThreadPool(uploadThreads, new NamedThreadFactory("oak-ds-async-upload-thread")));
+        }
+
+        this.removeExecutor = scheduledExecutor;
+        if (scheduledExecutor == null) {
+            this.removeExecutor = Executors.newSingleThreadScheduledExecutor();
+        }
+
+        this.map = Maps.newConcurrentMap();
+        this.attic = Maps.newConcurrentMap();
+        this.uploadCacheSpace = new File(dir, "upload");
+        this.uploader = uploader;
+        this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
+        this.downloadCache = cache;
+
+        build();
+
+        removeExecutor.scheduleAtFixedRate(new RemoveJob(), removalPeriod, removalPeriod,
+            TimeUnit.SECONDS);
+    }
+
+    /**
+     * Retrieves all the files staged in the staging area and schedules them for uploads.
+     */
+    private void build() {
+        LOG.info("Scheduling pending uploads");
+
+        Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
+            .filter(new Predicate<File>() {
+                @Override public boolean apply(File input) {
+                    return input.isFile();
+                }
+            }).iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            File toBeSyncedFile = iter.next();
+            Optional<SettableFuture<Integer>> scheduled = put(toBeSyncedFile.getName(), toBeSyncedFile);
+            if (scheduled.isPresent()) {
+                count++;
+            }
+        }
+
+        LOG.info("Scheduled [{}] pending uploads", count);
+    }
+
+    /**
+     * Puts the file into the staging cache if possible.
+     * Returns an optional SettableFuture if staged for upload otherwise empty.
+     *
+     * @param id the id of the file to be staged
+     * @param input the file to be staged
+     * @return An Optional SettableFuture.
+     */
+    public Optional<SettableFuture<Integer>> put(String id, File input) {
+        cacheStats.markRequest();
+
+        long length = input.length();
+        File uploadFile = getFile(id);
+
+        // if size permits and not upload complete or already scheduled for upload
+        if (currentSize.addAndGet(length) <= size
+            && !attic.containsKey(id)
+            && map.putIfAbsent(id, uploadFile) == null ) {
+
+            try {
+                if (!uploadFile.exists()) {
+                    FileUtils.moveFile(input, uploadFile);
+                    LOG.trace("File [{}] moved to staging cache [{}]", input, uploadFile);
+                }
+
+                // update stats
+                cacheStats.markHit();
+                cacheStats.incrementCount();
+                cacheStats.incrementSize(length);
+                cacheStats.incrementMemSize(memWeigher.weigh(id, uploadFile));
+
+                return Optional.of(stage(id, uploadFile));
+            } catch (Exception e) {
+                LOG.info("Error moving file to staging", e);
+                //reset the current state and return empty flag as not added to cache
+                currentSize.addAndGet(-length);
+                map.remove(id, uploadFile);
+            }
+        } else {
+            currentSize.addAndGet(-length);
+        }
+        return Optional.absent();
+    }
+
+    /**
+     * Stages the file for async upload.
+     * * Puts the file into the stage caching file system directory
+     * * Schedules a job for upload to write using the given {@link StagingUploader}
+     * * Updates the internal map and size variable
+     * * Adds a callback listener to remove the file once finished
+     * @param id of the file to be staged
+     * @param upload the file to be staged
+     * @return a SettableFuture instance
+     */
+    private SettableFuture<Integer> stage(final String id, final File upload) {
+        final SettableFuture<Integer> result = SettableFuture.create();
+
+        try {
+            // create an async job
+            ListenableFuture<Integer> future = executor.submit(new Callable<Integer>() {
+                @Override public Integer call() throws Exception {
+                    try {
+                        final TimerStats.Context uploadContext = cacheStats.startUpLoaderTimer();
+
+                        uploader.write(id, upload);
+                        LOG.debug("File added to backend [{}]", upload);
+
+                        uploadContext.stop();
+
+                        return 1;
+                    } catch (Exception e) {
+                        LOG.error("Error adding file to backend", e);
+                        throw e;
+                    }
+                }
+            });
+
+            // Add a callback to the returned Future object for handling success and error
+            Futures.addCallback(future, new FutureCallback<Integer>() {
+                @Override public void onSuccess(@Nullable Integer r) {
+                    LOG.info("Successfully added [{}], [{}]", id, upload);
+
+                    try {
+                        // move to attic to be deleted and remove from in-memory map
+                        attic.put(id, upload);
+
+                        // Add the uploaded file to the download cache if available
+                        if (downloadCache != null) {
+                            // Touch the file to update timestamp and record length
+                            FileUtils.touch(upload);
+                            long length = upload.length();
+
+                            downloadCache.put(id, upload);
+                            LOG.debug("[{}] added to cache", id);
+
+                            // Update stats for removal
+                            currentSize.addAndGet(-length);
+                            cacheStats.decrementSize(length);
+                        }
+
+                        map.remove(id);
+
+                        // Remove from upload staging area
+                        //LOG.info("File [{}] removed from cache [{}]", upload, remove(id));
+                    } catch (IOException e) {
+                        LOG.warn("Error in cleaning up [{}] from staging", upload);
+                    }
+                    result.set(r);
+                }
+
+                @Override public void onFailure(Throwable t) {
+                    LOG.error("Error adding file to backend", t);
+                    result.setException(t);
+                }
+            });
+            LOG.info("File [{}] scheduled for upload [{}]", upload, result);
+        } catch (Exception e) {
+            LOG.error("Error staging file for upload [{}]", upload, e);
+        }
+        return result;
+    }
+
+    /**
+     * Removes all cached from attic
+     */
+    private void remove() {
+        LOG.info("Starting purge of uploaded files");
+
+        Iterator<String> iterator = attic.keySet().iterator();
+        int count = 0;
+        while (iterator.hasNext()) {
+            String key = iterator.next();
+            try {
+                if (remove(key)) {
+                    iterator.remove();
+                    count++;
+                }
+            } catch (IOException e) {
+                LOG.error("Error in removing entry for id [{}]", key);
+            }
+        }
+
+        LOG.info("Finished purge of [{}] files", count);
+    }
+
+    private boolean remove(String id) throws IOException {
+        LOG.trace("Removing upload file for id [{}]", id);
+
+        // Check if not already scheduled for upload
+        if (!map.containsKey(id)) {
+            LOG.trace("upload map contains id [{}]", id);
+
+            File toBeDeleted = attic.get(id);
+            LOG.trace("Trying to delete file [{}]", toBeDeleted);
+            long length = toBeDeleted.length();
+
+            delete(toBeDeleted);
+            LOG.info("deleted file [{}]", toBeDeleted);
+
+            currentSize.addAndGet(-length);
+            // Update stats for removal
+            cacheStats.decrementSize(length);
+            cacheStats.decrementMemSize(memWeigher.weigh(id, toBeDeleted));
+            cacheStats.decrementCount();
+
+            LOG.info("Cache [{}] file deleted for id [{}]", toBeDeleted, id);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Delete the file from the staged cache and all its empty sub-directories.
+     *
+     * @param f the file to be deleted
+     * @throws IOException
+     */
+    private void delete(File f) throws IOException {
+        if (f.exists()) {
+            FileUtils.forceDelete(f);
+            LOG.debug("Deleted staged upload file [{}]", f);
+        }
+
+        // delete empty parent folders (except the main directory)
+        while (true) {
+            f = f.getParentFile();
+            if ((f == null) || f.equals(uploadCacheSpace)
+                || (f.list() == null)
+                || (f.list().length > 0)) {
+                break;
+            }
+            LOG.debug("Deleted directory [{}], [{}]", f, f.delete());
+        }
+    }
+
+    /**
+     * Returns the File if present or null otherwise.
+     * Any usage of the returned file should assert for its existence as the file
+     * could be purged from the file system once uploaded using
+     * {@link org.apache.jackrabbit.core.data.CachingDataStore.FilesUploader}.
+     *
+     * @param key of the file to check
+     * @return a File object if found
+     */
+    @Nullable
+    public File getIfPresent(String key) {
+        cacheStats.markLoad();
+        if (map.containsKey(key)) {
+            cacheStats.markLoadSuccess();
+            return map.get(key);
+        }
+        return null;
+    }
+
+    /**
+     * Create a placeholder in the file system cache folder for the given identifier.
+     *
+     * @param id of the file
+     * @return file handle
+     */
+    private File getFile(String id) {
+        File file = uploadCacheSpace;
+        file = new File(file, id.substring(0, 2));
+        file = new File(file, id.substring(2, 4));
+        file = new File(file, id.substring(4, 6));
+        return new File(file, id);
+    }
+
+    /**
+     * Cache related stats
+     *
+     * @return an instance of the {@link DataStoreCacheStatsMBean}.
+     */
+    public DataStoreCacheStatsMBean getStats() {
+        return cacheStats;
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.info("Uploads in progress on close [{}]", map.size());
+        LOG.info("Uploads completed but not cleared from cache [{}]", attic.size());
+        LOG.info("Staging cache stats on close [{}]", cacheStats.cacheInfoAsString());
+        new ExecutorCloser(executor).close();
+        new ExecutorCloser(removeExecutor).close();
+    }
+
+    /**
+     * Class which calls remove on all
+     */
+    class RemoveJob implements Runnable {
+        @Override
+        public void run() {
+            remove();
+        }
+    }
+}
+
+/**
+ * Upload Staging Cache Statistics.
+ */
+class StagingCacheStats extends AnnotatedStandardMBean implements DataStoreCacheStatsMBean {
+
+    private static final String HITS = "HITS";
+    private static final String REQUESTS = "REQUESTS";
+    private static final String UPLOAD_TIMER = "UPLOAD_TIMER";
+    private static final String LOAD_SUCCESS = "CACHE_LOAD_SUCCESS";
+    private static final String LOAD = "CACHE_LOAD";
+    private static final String CURRENT_SIZE = "CURRENT_SIZE";
+    private static final String CURRENT_MEM_SIZE = "CURRENT_MEM_SIZE";
+    private static final String COUNT = "COUNT";
+
+    private final String cacheName;
+
+    /** Max size in bytes configured for the cache **/
+    private final long maxWeight;
+
+    /** Tracking the number of uploads that could be staged **/
+    private final MeterStats hitMeter;
+
+    /** Tracking the number of requests to upload & stage **/
+    private final MeterStats requestMeter;
+
+    /** Tracking the number of get requests serviced by the cache **/
+    private final MeterStats loadSuccessMeter;
+
+    /** Tracking the number of get requests received by the cache **/
+    private final MeterStats loadMeter;
+
+    /** Tracking the upload time **/
+    private final TimerStats uploadTimer;
+
+    /** Tracking the current size in MB **/
+    private final CounterStats currentSizeMeter;
+
+    /** Tracking the in-memory size of cache **/
+    private final CounterStats currentMemSizeMeter;
+
+    /** Tracking the cache element count **/
+    private final CounterStats countMeter;
+
+    /** Handle to the cache **/
+    private final UploadStagingCache cache;
+
+    StagingCacheStats(UploadStagingCache cache, StatisticsProvider provider, long maxWeight) {
+        super(DataStoreCacheStatsMBean.class);
+        this.cache = cache;
+
+        StatisticsProvider statisticsProvider;
+        if (provider == null) {
+            statisticsProvider = StatisticsProvider.NOOP;
+        } else {
+            statisticsProvider = provider;
+        }
+
+        // Configure cache name
+        cacheName = "StagingCacheStats";
+
+        this.maxWeight = maxWeight;
+
+        // Fetch stats and time series
+        String statName;
+
+        statName = getStatName(HITS, cacheName);
+        hitMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(REQUESTS, cacheName);
+        requestMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(UPLOAD_TIMER, cacheName);
+        uploadTimer = statisticsProvider.getTimer(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(LOAD_SUCCESS, cacheName);
+        loadSuccessMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(LOAD, cacheName);
+        loadMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(CURRENT_SIZE, cacheName);
+        currentSizeMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(CURRENT_MEM_SIZE, cacheName);
+        currentMemSizeMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
+
+        statName = getStatName(COUNT, cacheName);
+        countMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
+    }
+
+    //~--------------------------------------< stats update methods
+
+    void markHit() {
+        hitMeter.mark();
+    }
+
+    void markRequest() {
+        requestMeter.mark();
+    }
+
+    void markLoadSuccess() {
+        loadSuccessMeter.mark();
+    }
+
+    void markLoad() {
+        loadMeter.mark();
+    }
+
+    TimerStats.Context startUpLoaderTimer() {
+        return this.uploadTimer.time();
+    }
+
+    void incrementCount() {
+        countMeter.inc();
+    }
+
+    void incrementSize(long size) {
+        currentSizeMeter.inc(size);
+    }
+
+    void incrementMemSize(long size) {
+        currentMemSizeMeter.inc(size);
+    }
+
+    void decrementCount() {
+        countMeter.dec();
+    }
+
+    void decrementSize(long size) {
+        currentSizeMeter.dec(size);
+    }
+
+
+    void decrementMemSize(int size) {
+        currentMemSizeMeter.dec(size);
+    }
+
+    @Override
+    public String getName() {
+        return cacheName;
+    }
+
+    @Override
+    public long getRequestCount() {
+        return requestMeter.getCount();
+    }
+
+    @Override
+    public long getHitCount() {
+        return hitMeter.getCount();
+    }
+
+    @Override
+    public double getHitRate() {
+        long hitCount = hitMeter.getCount();
+        long requestCount = requestMeter.getCount();
+        return (requestCount == 0L ? 0L : (double)hitCount/requestCount);
+    }
+
+    @Override
+    public long getMissCount() {
+        return requestMeter.getCount() - hitMeter.getCount();
+    }
+
+    @Override
+    public double getMissRate() {
+        long missCount = getMissCount();
+        long requestCount = requestMeter.getCount();
+        return (requestCount == 0L ? 0L : (double) missCount/requestCount);
+    }
+
+    @Override
+    public long getLoadCount() {
+        return loadMeter.getCount();
+    }
+
+    @Override
+    public long getLoadSuccessCount() {
+        return loadSuccessMeter.getCount();
+    }
+
+    @Override
+    public long getLoadExceptionCount() {
+        return (getLoadCount() - getLoadSuccessCount());
+    }
+
+    @Override
+    public double getLoadExceptionRate() {
+        long loadExceptionCount = getLoadExceptionCount();
+        long loadCount = loadMeter.getCount();
+        return (loadCount == 0L ? 0L : (double) loadExceptionCount/loadCount);
+    }
+
+    @Override
+    public long getElementCount() {
+        return countMeter.getCount();
+    }
+
+    @Override
+    public long getMaxTotalWeight() {
+        return maxWeight;
+    }
+
+    @Override
+    public long estimateCurrentWeight() {
+        return currentSizeMeter.getCount();
+    }
+
+    @Override
+    public long estimateCurrentMemoryWeight() {
+        return currentMemSizeMeter.getCount();
+    }
+
+    @Override
+    public String cacheInfoAsString() {
+        return toStringHelper("StagingCacheStats")
+            .add("requestCount", getRequestCount())
+            .add("hitCount", getHitCount())
+            .add("hitRate", format("%1.2f", getHitRate()))
+            .add("missCount", getMissCount())
+            .add("missRate", format("%1.2f", getMissRate()))
+            .add("loadCount", getLoadCount())
+            .add("loadSuccessCount", getLoadSuccessCount())
+            .add("elementCount", getElementCount())
+            .add("currentMemSize", estimateCurrentMemoryWeight())
+            .add("totalWeight", humanReadableByteCount(estimateCurrentWeight()))
+            .add("maxWeight", humanReadableByteCount(getMaxTotalWeight()))
+            .toString();
+    }
+
+    //~--------------------------------------< CacheStatsMbean - stats that are not (yet) available
+    @Override
+    public long getTotalLoadTime() {
+        return 0;
+    }
+
+    @Override
+    public double getAverageLoadPenalty() {
+        return 0;
+    }
+
+    @Override
+    public long getEvictionCount() {
+        return 0;
+    }
+
+    @Override
+    public void resetStats() {
+    }
+
+    //~--------------------------------------< private helpers
+
+    private static String getStatName(String meter, String cacheName) {
+        return cacheName + "." + meter;
+    }
+}
+
+/**
+ * Wrapper for backend used for uploading
+ */
+interface StagingUploader {
+    void write(String id, File f) throws DataStoreException;
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractListeningExecutorService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class for DataStore cache related tests.
+ */
+public class AbstractDataStoreCacheTest {
+    static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreCacheTest.class);
+
+    static class TestStagingUploader implements StagingUploader {
+        private final File root;
+
+        public TestStagingUploader(File dir) {
+            this.root = new File(dir, "datastore");
+            root.mkdirs();
+        }
+
+        @Override public void write(String id, File f) throws DataStoreException {
+            try {
+                File move = getFile(id, root);
+                move.getParentFile().mkdirs();
+                Files.copy(f, move);
+                LOG.info("In TestStagingUploader after write [{}]", move);
+            } catch (IOException e) {
+                throw new DataStoreException(e);
+            }
+        }
+
+        public File read(String id) {
+            return getFile(id, root);
+        }
+    }
+
+
+    static class TestCacheLoader<S, I> extends CacheLoader<String, FileInputStream> {
+        private final File root;
+
+        public TestCacheLoader(File dir) {
+            this.root = new File(dir, "datastore");
+            root.mkdirs();
+        }
+
+        public void write(String id, File f) throws DataStoreException {
+            try {
+                File move = getFile(id, root);
+                move.getParentFile().mkdirs();
+                Files.copy(f, move);
+                LOG.info("In TestCacheLoader after write [{}], [{}]", id, move);
+            } catch (IOException e) {
+                throw new DataStoreException(e);
+            }
+        }
+
+        @Override public FileInputStream load(@Nonnull String key) throws Exception {
+            return FileUtils.openInputStream(getFile(key, root));
+        }
+    }
+
+    static class TestPoolExecutor extends ThreadPoolExecutor {
+        private final CountDownLatch beforeLatch;
+        private final CountDownLatch afterLatch;
+
+        TestPoolExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch) {
+            super(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+                new NamedThreadFactory("oak-async-thread"));
+            this.beforeLatch = beforeLatch;
+            this.afterLatch = afterLatch;
+        }
+
+        @Override public void beforeExecute(Thread t, Runnable command) {
+            try {
+                LOG.trace("Before execution....waiting for latch");
+                beforeLatch.await();
+                LOG.trace("Before execution....after acquiring latch");
+                super.beforeExecute(t, command);
+                LOG.trace("Completed beforeExecute");
+            } catch (Exception e) {
+                LOG.trace("Error in before execute", e);
+            }
+        }
+
+        @Override protected void afterExecute(Runnable r, Throwable t) {
+            try {
+                LOG.trace("After execution....waiting for latch");
+                afterLatch.countDown();
+                LOG.trace("After execution....after acquiring latch");
+                super.afterExecute(r, t);
+                LOG.trace("Completed afterExecute");
+            } catch (Exception e) {
+                LOG.trace("Error in after execute", e);
+            }
+        }
+    }
+
+
+    static class TestExecutor extends AbstractListeningExecutorService {
+        private final CountDownLatch afterLatch;
+        private final ExecutorService delegate;
+        final List<ListenableFuture<Integer>> futures;
+
+        public TestExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch,
+            CountDownLatch afterExecuteLatch) {
+            this.delegate = new TestPoolExecutor(threads, beforeLatch, afterExecuteLatch);
+            this.futures = Lists.newArrayList();
+            this.afterLatch = afterLatch;
+        }
+
+        @Override @Nonnull public ListenableFuture<?> submit(@Nonnull Callable task) {
+            LOG.trace("Before submitting to super....");
+            ListenableFuture<Integer> submit = super.submit(task);
+            LOG.trace("After submitting to super....");
+
+            futures.add(submit);
+            Futures.addCallback(submit, new TestFutureCallback<Integer>(afterLatch));
+            LOG.trace("Added callback");
+
+            return submit;
+        }
+
+        @Override public void execute(@Nonnull Runnable command) {
+            delegate.execute(command);
+        }
+
+        @Override public void shutdown() {
+            delegate.shutdown();
+        }
+
+        @Override @Nonnull public List<Runnable> shutdownNow() {
+            return delegate.shutdownNow();
+        }
+
+        @Override public boolean isShutdown() {
+            return delegate.isShutdown();
+        }
+
+        @Override public boolean isTerminated() {
+            return delegate.isTerminated();
+        }
+
+        @Override public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit)
+            throws InterruptedException {
+            return delegate.awaitTermination(timeout, unit);
+        }
+
+        static class TestFutureCallback<Integer> implements FutureCallback {
+            private final CountDownLatch latch;
+
+            public TestFutureCallback(CountDownLatch latch) {
+                this.latch = latch;
+            }
+
+            @Override public void onSuccess(@Nullable Object result) {
+                try {
+                    LOG.trace("Waiting for latch in callback");
+                    latch.await(100, TimeUnit.MILLISECONDS);
+                    LOG.trace("Acquired latch in onSuccess");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            @Override public void onFailure(@Nonnull Throwable t) {
+                try {
+                    LOG.trace("Waiting for latch onFailure in callback");
+                    latch.await(100, TimeUnit.MILLISECONDS);
+                    LOG.trace("Acquired latch in onFailure");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    static InputStream randomStream(int seed, int size) {
+        Random r = new Random(seed);
+        byte[] data = new byte[size];
+        r.nextBytes(data);
+        return new ByteArrayInputStream(data);
+    }
+
+    private static File getFile(String id, File root) {
+        File file = root;
+        file = new File(file, id.substring(0, 2));
+        file = new File(file, id.substring(2, 4));
+        return new File(file, id);
+    }
+
+    static File copyToFile(InputStream stream, File file) throws IOException {
+        FileUtils.copyInputStreamToFile(stream, file);
+        return file;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link UploadStagingCache}.
+ */
+public class UploadStagingCacheTest extends AbstractDataStoreCacheTest {
+    private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCacheTest.class);
+    private static final String ID_PREFIX = "12345";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private final Closer closer = Closer.create();
+    private TestStagingUploader uploader;
+    private File root;
+    private CountDownLatch taskLatch;
+    private CountDownLatch callbackLatch;
+    private CountDownLatch afterExecuteLatch;
+    private TestExecutor executor;
+    private UploadStagingCache stagingCache;
+    private StatisticsProvider statsProvider;
+    private ScheduledExecutorService removeExecutor;
+
+    @Before
+    public void setup() throws IOException {
+        root = folder.newFolder();
+        init();
+    }
+
+    private void init() {
+        // uploader
+        uploader = new TestStagingUploader(root);
+
+        // create executor
+        taskLatch = new CountDownLatch(1);
+        callbackLatch = new CountDownLatch(1);
+        afterExecuteLatch = new CountDownLatch(1);
+        executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+
+        // stats
+        ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS));
+        statsProvider = new DefaultStatisticsProvider(statsExecutor);
+
+        removeExecutor = Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(removeExecutor, 500, TimeUnit.MILLISECONDS));
+
+        //cache instance
+        stagingCache =
+            new UploadStagingCache(root, 1/*threads*/, 8 * 1024 /* bytes */,
+                uploader, null/*cache*/, statsProvider, executor, null, 3000);
+        closer.register(stagingCache);
+    }
+
+    @After
+    public void tear() throws IOException {
+        closer.close();
+    }
+
+    /**
+     *  Stage file successful upload.
+     * @throws Exception
+     */
+    @Test
+    public void testAdd() throws Exception {
+        // add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+
+        assertFuture(futures, 0);
+        assertCacheStats(stagingCache, 0, 0, 1, 1);
+    }
+
+    /**
+     * Stage file unsuccessful upload.
+     * @throws Exception
+     */
+    @Test
+    public void testAddUploadException() throws Exception {
+        // Mock uploader to throw exception on write
+        final TestStagingUploader mockedDS = mock(TestStagingUploader.class);
+        doThrow(new DataStoreException("Error in writing blob")).when(mockedDS)
+            .write(Matchers.any(String.class), Matchers.any(File.class));
+
+        // initialize staging cache using the mocked uploader
+        stagingCache =
+            new UploadStagingCache(root, 1/*threads*/, 4 * 1024 /* bytes */,
+                mockedDS, null/*cache*/, statsProvider, executor, null, 3000);
+        closer.register(stagingCache);
+
+        // Add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+
+        waitFinish(futures);
+
+        // assert file retrieved from staging cache
+        File ret = stagingCache.getIfPresent(ID_PREFIX + 0);
+        assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()), ret));
+
+        assertEquals(1, stagingCache.getStats().getLoadCount());
+        assertEquals(1, stagingCache.getStats().getLoadSuccessCount());
+        assertCacheStats(stagingCache, 1, 4 * 1024, 1, 1);
+    }
+
+    /**
+     * Retrieve without adding.
+     * @throws Exception
+     */
+    @Test
+    public void testGetNoAdd() throws Exception {
+        File ret = stagingCache.getIfPresent(ID_PREFIX + 0);
+
+        // assert no file
+        assertNull(ret);
+        assertEquals(1, stagingCache.getStats().getLoadCount());
+        assertCacheStats(stagingCache, 0, 0, 0, 0);
+    }
+
+    /**
+     * Error in putting file to stage.
+     * @throws Exception
+     */
+    @Test
+    public void testPutMoveFileError() throws Exception {
+        Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, new File("empty"));
+        // assert no file
+        assertFalse(future.isPresent());
+        assertEquals(1, stagingCache.getStats().getMissCount());
+        assertCacheStats(stagingCache, 0, 0, 0, 1);
+    }
+
+    /**
+     * Put and retrieve different files concurrently.
+     * @throws Exception
+     */
+    @Test
+    public void testGetAddDifferent() throws Exception {
+        //add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // Create an async retrieve task
+        final SettableFuture<File> retFuture = SettableFuture.create();
+        Thread t = new Thread(new Runnable() {
+            @Override public void run() {
+                retFuture.set(stagingCache.getIfPresent(ID_PREFIX + 1));
+            }
+        });
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        t.start();
+
+        //assert no file retrieve
+        assertNull(retFuture.get());
+        assertEquals(1, stagingCache.getStats().getLoadCount());
+
+        assertFuture(futures, 0);
+        assertCacheStats(stagingCache, 0, 0, 1, 1);
+    }
+
+    /**
+     * Stage file when cache full.
+     * @throws Exception
+     */
+    @Test
+    public void testCacheFullAdd() throws Exception {
+        // initialize cache to have restricted size
+        stagingCache =
+            new UploadStagingCache(root, 1/*threads*/, 4 * 1024 /* bytes */,
+                uploader, null/*cache*/, statsProvider, executor, null, 3000);
+        closer.register(stagingCache);
+
+        // add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // Add another load
+        File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+        Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2);
+        assertFalse(future2.isPresent());
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        assertFuture(futures, 0);
+
+        // Try 2nd upload again
+        Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 1, f2);
+        futures = Lists.newArrayList();
+        if (future.isPresent()) {
+            futures.add(future.get());
+        }
+        assertFuture(futures, 1);
+
+        assertCacheStats(stagingCache, 0, 0, 2, 3);
+    }
+
+    /**
+     * Stage same file concurrently.
+     * @throws Exception
+     */
+    @Test
+    public void testConcurrentSameAdd() throws Exception {
+        // Add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 0, f);
+        assertFalse(future2.isPresent());
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        assertFuture(futures, 0);
+
+        assertCacheStats(stagingCache, 0, 0, 1, 2);
+    }
+
+    /**
+     * Stage different files concurrently
+     * @throws Exception
+     */
+    @Test
+    public void testConcurrentDifferentAdd() throws Exception {
+        // Add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // Add diff load
+        File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+        Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2);
+        if (future2.isPresent()) {
+            futures.add(future2.get());
+        }
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        assertFuture(futures, 0, 1);
+
+        assertCacheStats(stagingCache, 0, 0, 2, 2);
+    }
+
+    /**
+     * Concurrently retrieve after stage but before upload.
+     * @throws Exception
+     */
+    @Test
+    public void testConcurrentGetDelete() throws Exception {
+        ListeningExecutorService executorService =
+            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        closer.register(new ExecutorCloser(executorService));
+
+        // Add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // Get a handle to the file and open stream
+        File file = stagingCache.getIfPresent(ID_PREFIX + 0);
+        final FileInputStream fStream = Files.newInputStreamSupplier(file).getInput();
+
+        // task to copy the steam to a file simulating read from the stream
+        File temp = folder.newFile();
+        CountDownLatch copyThreadLatch = new CountDownLatch(1);
+        SettableFuture<File> future1 =
+            copyStreamThread(executorService, fStream, temp, copyThreadLatch);
+
+        //start
+        taskLatch.countDown();
+        callbackLatch.countDown();
+
+        waitFinish(futures);
+
+        // trying copying now
+        copyThreadLatch.countDown();
+        future1.get();
+
+        assertTrue(Files.equal(temp, uploader.read(ID_PREFIX + 0)));
+    }
+
+    /**
+     * Concurrently stage and trigger delete after upload for same file.
+     * @throws Exception
+     */
+    @Test
+    public void testConcurrentPutDeleteSame() throws Exception {
+        testConcurrentPutDelete(0);
+    }
+
+    /**
+     * Concurrently stage and trigger delete after upload for different file.
+     * @throws Exception
+     */
+    @Test
+    public void testConcurrentPutDeleteDifferent() throws Exception {
+        testConcurrentPutDelete(1);
+    }
+
+    private void testConcurrentPutDelete(int diff) throws Exception {
+        ListeningExecutorService executorService =
+            MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+        closer.register(new ExecutorCloser(executorService));
+        //start immediately
+        taskLatch.countDown();
+
+        // Add immediately
+        List<ListenableFuture<Integer>> futures = put(folder);
+
+        // New task to put another file
+        File f2 = copyToFile(randomStream(diff, 4 * 1024), folder.newFile());
+        CountDownLatch putThreadLatch = new CountDownLatch(1);
+        CountDownLatch triggerLatch = new CountDownLatch(1);
+        SettableFuture<Optional<SettableFuture<Integer>>> future1 =
+            putThread(executorService, diff, f2, stagingCache, putThreadLatch, triggerLatch);
+        putThreadLatch.countDown();
+
+        // wait for put thread to go ahead
+        callbackLatch.countDown();
+        ScheduledFuture<?> scheduledFuture =
+            removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+        triggerLatch.await();
+        if (future1.get().isPresent()) {
+            futures.add(future1.get().get());
+        }
+
+        ListenableFuture<List<Integer>> listListenableFuture = Futures.successfulAsList(futures);
+        try {
+            listListenableFuture.get();
+            scheduledFuture.get();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+            uploader.read(ID_PREFIX + 0)));
+        assertTrue(Files.equal(copyToFile(randomStream(diff, 4 * 1024), folder.newFile()),
+            uploader.read(ID_PREFIX + diff)));
+    }
+
+    /**
+     * Test build on start.
+     * @throws Exception
+     */
+    @Test
+    public void testBuild() throws Exception {
+        // Add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+        // Close before uploading finished
+        closer.close();
+
+        // Start again
+        init();
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        afterExecuteLatch.await();
+
+        waitFinish(futures);
+
+        assertNull(stagingCache.getIfPresent(ID_PREFIX + 0));
+        assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+            uploader.read(ID_PREFIX + 0)));
+        assertCacheStats(stagingCache, 0, 0, 1, 1);
+    }
+
+    /** -------------------- Helper methods ----------------------------------------------------**/
+
+    private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor,
+        final InputStream fStream, final File temp, final CountDownLatch start) {
+        final SettableFuture<File> future = SettableFuture.create();
+        executor.submit(new Runnable() {
+            @Override public void run() {
+                try {
+                    LOG.info("Waiting for start of copying");
+                    start.await();
+                    LOG.info("Starting copy of [{}]", temp);
+                    FileUtils.copyInputStreamToFile(fStream, temp);
+                    LOG.info("Finished retrieve");
+                    future.set(temp);
+                } catch (Exception e) {
+                    LOG.info("Exception in get", e);
+                }
+            }
+        });
+        return future;
+    }
+
+    private static SettableFuture<Optional<SettableFuture<Integer>>> putThread(
+        ListeningExecutorService executor, final int seed, final File f, final UploadStagingCache cache,
+        final CountDownLatch start, final CountDownLatch trigger) {
+        final SettableFuture<Optional<SettableFuture<Integer>>> future = SettableFuture.create();
+        executor.submit(new Runnable() {
+            @Override public void run() {
+                try {
+                    LOG.info("Waiting for start to put");
+                    start.await();
+                    LOG.info("Starting put");
+                    trigger.countDown();
+                    Optional<SettableFuture<Integer>> opt = cache.put(ID_PREFIX + seed, f);
+                    LOG.info("Finished put");
+                    future.set(opt);
+                } catch (Exception e) {
+                    LOG.info("Exception in get", e);
+                }
+            }
+        });
+        return future;
+    }
+
+    private void waitFinish(List<ListenableFuture<Integer>> futures) {
+        ListenableFuture<List<Integer>> listListenableFuture = Futures.successfulAsList(futures);
+        try {
+            listListenableFuture.get();
+            ScheduledFuture<?> scheduledFuture =
+                removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+            scheduledFuture.get();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private List<ListenableFuture<Integer>> put(TemporaryFolder folder)
+        throws IOException {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f);
+        List<ListenableFuture<Integer>> futures = Lists.newArrayList();
+        if (future.isPresent()) {
+            futures.add(future.get());
+        }
+        return futures;
+    }
+
+    private void assertFuture(List<ListenableFuture<Integer>> futures, int... seeds)
+        throws Exception {
+        waitFinish(futures);
+
+        for (int i = 0; i < seeds.length; i++) {
+            File upload = uploader.read(ID_PREFIX + seeds[i]);
+            assertFile(upload, seeds[i], folder);
+        }
+    }
+
+    private void assertFile(File f, int seed, TemporaryFolder folder) throws IOException {
+        assertTrue(f.exists());
+        File temp = copyToFile(randomStream(seed, 4 * 1024), folder.newFile());
+        assertTrue("Uploaded file content differs", FileUtils.contentEquals(temp, f));
+    }
+
+    private static void assertCacheStats(UploadStagingCache cache, long elems, long weight,
+        long hits, long count) {
+        assertEquals(elems, cache.getStats().getElementCount());
+        assertEquals(weight, cache.getStats().estimateCurrentWeight());
+        assertEquals(hits, cache.getStats().getHitCount());
+        assertEquals(count, cache.getStats().getRequestCount());
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
------------------------------------------------------------------------------
    svn:eol-style = native