You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:49:49 UTC
svn commit: r1766330 - in /jackrabbit/oak/trunk: oak-blob-cloud/ oak-blob/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/
Author: amitj
Date: Mon Oct 24 04:49:49 2016
New Revision: 1766330
URL: http://svn.apache.org/viewvc?rev=1766330&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore
* Implementation for local file system DataStore cache which includes:
** FileCache to cache downloaded files on the local fs. Uses in-memory CacheLIRS map to handle evictions and loading from backend
** An UploadStagingCache which uploads blobs asynchronously to the backend
** CacheStats for both types of cache
** CompositeDataStoreCache which supports both caching for upload and download
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
jackrabbit/oak/trunk/oak-blob/pom.xml
Modified: jackrabbit/oak/trunk/oak-blob-cloud/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-cloud/pom.xml?rev=1766330&r1=1766329&r2=1766330&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-cloud/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob-cloud/pom.xml Mon Oct 24 04:49:49 2016
@@ -101,6 +101,11 @@
<dependency>
<groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-core</artifactId>
<version>${project.version}</version>
</dependency>
Modified: jackrabbit/oak/trunk/oak-blob/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/pom.xml?rev=1766330&r1=1766329&r2=1766330&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob/pom.xml Mon Oct 24 04:49:49 2016
@@ -78,6 +78,11 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-data</artifactId>
+ <version>${jackrabbit.version}</version>
+ </dependency>
<!-- Dependencies to other Oak components -->
<dependency>
<groupId>org.apache.jackrabbit</groupId>
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.annotation.Nullable;
+
+import com.google.common.cache.AbstractCache;
+import com.google.common.cache.CacheLoader;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ */
+public class CompositeDataStoreCache extends AbstractCache<String, File> implements Closeable {
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeDataStoreCache.class);
+
+ /**
+ * Cache for downloaded blobs
+ */
+ private final FileCache downloadCache;
+
+ /**
+ * Cache for staging async uploads
+ */
+ private final UploadStagingCache stagingCache;
+
+ /**
+ * The directory where the files are created.
+ */
+ private final File directory;
+
+ public CompositeDataStoreCache(String path, long size,
+ int uploadSplitPercentage, int uploadThreads, CacheLoader<String, InputStream> loader,
+ final StagingUploader uploader, StatisticsProvider statsProvider,
+ ListeningExecutorService executor,
+ ScheduledExecutorService scheduledExecutor /* purge scheduled executor */,
+ long purgeInterval /* async purge interval */) {
+
+ checkArgument(uploadSplitPercentage >= 0 && uploadSplitPercentage < 100,
+ "Upload percentage should be between 0 and 100");
+
+ this.directory = new File(path);
+
+ long uploadSize = (size * uploadSplitPercentage)/100;
+
+ this.downloadCache =
+ new FileCache((size - uploadSize), directory, loader, null);
+
+ this.stagingCache =
+ new UploadStagingCache(directory, uploadThreads, uploadSize, uploader, downloadCache,
+ statsProvider, executor, scheduledExecutor, purgeInterval);
+ }
+
+ @Nullable
+ public File getIfPresent(String key) {
+ // Check if the file scheduled for async upload
+ File staged = stagingCache.getIfPresent(key);
+ if (staged != null && staged.exists()) {
+ return staged;
+ }
+ return downloadCache.getIfPresent(key);
+ }
+
+ @Nullable
+ @Override
+ public File getIfPresent(Object key) {
+ return getIfPresent((String) key);
+ }
+
+ public File get(String key) throws IOException {
+ try {
+ // Check if the file scheduled for async upload
+ File staged = stagingCache.getIfPresent(key);
+ if (staged != null && staged.exists()) {
+ return staged;
+ }
+ // get from cache and download if not available
+ return downloadCache.get(key);
+ } catch (IOException e) {
+ LOG.error("Error loading [{}] from cache", key);
+ throw e;
+ }
+ }
+
+ @Override
+ public void invalidate(Object key) {
+ downloadCache.invalidate(key);
+ }
+
+ public boolean stage(String key, File file) {
+ return stagingCache.put(key, file).isPresent();
+ }
+
+
+ public DataStoreCacheStatsMBean getStagingCacheStats() {
+ return stagingCache.getStats();
+ }
+
+ public DataStoreCacheStatsMBean getCacheStats() {
+ return downloadCache.getStats();
+ }
+
+ @Override
+ public void close() throws IOException {
+ downloadCache.close();
+ stagingCache.close();
+ }
+
+ UploadStagingCache getStagingCache() {
+ return stagingCache;
+ }
+
+ FileCache getDownloadCache() {
+ return downloadCache;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,14 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
+
+/**
+ */
+public interface DataStoreCacheStatsMBean extends CacheStatsMBean {
+ /**
+ * Total weight of the in-memory cache
+ * @return to weight of the cache
+ */
+ //Computing weight is costly hence its an operation
+ long estimateCurrentMemoryWeight();
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheStatsMBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.AbstractCache;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.Weigher;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.cache.CacheLIRS;
+import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
+import static org.apache.commons.io.FileUtils.forceDelete;
+import static org.apache.commons.io.FilenameUtils.normalizeNoEndSeparator;
+
+/**
+ */
+public class FileCache extends AbstractCache<String, File> implements Closeable {
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
+
+ /**
+ * The cacheRoot directory of the cache.
+ */
+ private final File cacheRoot;
+
+ private final CacheLIRS<String, File> cache;
+
+ private final FileCacheStats cacheStats;
+
+ private final ExecutorService executor;
+
+ /**
+ * Convert the size calculation to KB to support max file size of 2 TB
+ */
+ private final Weigher<String, File> weigher = new Weigher<String, File>() {
+ @Override public int weigh(String key, File value) {
+ return Math.round(value.length() / (4 * 1024)); // convert to KB
+ }};
+
+ //Rough estimate of the in-memory key, value pair
+ private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
+ @Override public int weigh(String key, File value) {
+ return (StringUtils.estimateMemoryUsage(key) +
+ StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
+ }};
+
+ public FileCache(long maxSize /** bytes **/, File root,
+ final CacheLoader<String, InputStream> loader,
+ @Nullable final ExecutorService executor) {
+
+ this.cacheRoot = new File(root, "download");
+
+ /* convert to 4 KB block */
+ long size = Math.round(maxSize / (1024L * 4));
+
+ cache = new CacheLIRS.Builder<String, File>()
+ .maximumWeight(size)
+ .recordStats()
+ .weigher(weigher)
+ .evictionCallback(new EvictionCallback<String, File>() {
+ @Override
+ public void evicted(@Nonnull String key, @Nullable File cachedFile,
+ @Nonnull RemovalCause cause) {
+ try {
+ if (cachedFile != null && cachedFile.exists()) {
+ forceDelete(cachedFile);
+ LOG.info("File [{}] evicted with reason [{}]", cachedFile, cause
+ .toString());
+ }
+ } catch (IOException e) {
+ LOG.info("Cached file deletion failed after eviction", e);
+ }
+ }})
+ .build(new CacheLoader<String, File>() {
+ @Override
+ public File load(String key) throws Exception {
+ // Fetch from local cache directory and if not found load from backend
+ File cachedFile = getCacheFile(key);
+ if (cachedFile.exists()) {
+ return cachedFile;
+ } else {
+ InputStream is = null;
+ boolean threw = true;
+ try {
+ is = loader.load(key);
+ copyInputStreamToFile(is, cachedFile);
+ threw = false;
+ } finally {
+ Closeables.close(is, threw);
+ }
+ return cachedFile;
+ }
+ }
+ });
+ this.cacheStats =
+ new FileCacheStats(cache, "DataStore-DownloadCache", weigher, memWeigher, maxSize);
+
+ // TODO: Check persisting the in-memory map and initializing Vs building from fs
+ // Build in-memory cache asynchronously from the file system entries
+ if (executor == null) {
+ this.executor = Executors.newSingleThreadExecutor();
+ } else {
+ this.executor = executor;
+ }
+ this.executor.submit(new CacheBuildJob());
+ }
+
+ /**
+ * Puts the given key and file into the cache.
+ * The file is moved to the cache. So, the original file
+ * won't be available after this operation. It can be retrieved
+ * using {@link #getIfPresent(String)}.
+ *
+ * @param key of the file
+ * @param file to put into cache
+ */
+ @Override
+ public void put(String key, File file) {
+ try {
+ File cached = getCacheFile(key);
+ if (!cached.exists()) {
+ FileUtils.moveFile(file, cached);
+ }
+ cache.put(key, cached);
+ } catch (IOException e) {
+ LOG.error("Exception adding id [{}] with file [{}] to cache", key, file);
+ }
+ }
+
+ public boolean containsKey(String key) {
+ return cache.containsKey(key);
+ }
+
+ /**
+ * Retrieves the file handle from the cache if present and null otherwise.
+ *
+ * @param key of the file to retrieve
+ * @return File handle if available
+ */
+ @Nullable
+ public File getIfPresent(String key) {
+ try {
+ if (cache.containsKey(key)) {
+ return cache.get(key);
+ }
+ } catch (Exception e) {
+ LOG.error("Error in retrieving [{}] from cache", key, e);
+ }
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public File getIfPresent(Object key) {
+ return getIfPresent((String) key);
+ }
+
+ public File get(String key) throws IOException {
+ try {
+ // get from cache and download if not available
+ return cache.get(key);
+ } catch (ExecutionException e) {
+ LOG.error("Error loading [{}] from cache", key);
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void invalidate(Object key) {
+ cache.invalidate(key);
+ }
+
+ public DataStoreCacheStatsMBean getStats() {
+ return cacheStats;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Cache stats on close [{}]", cacheStats.cacheInfoAsString());
+ new ExecutorCloser(executor).close();
+ }
+
+ /**
+ * Called to initialize the in-memory cache from the fs folder
+ */
+ private class CacheBuildJob implements Callable {
+ @Override
+ public Integer call() {
+ Stopwatch watch = Stopwatch.createStarted();
+ int count = build();
+ LOG.info("Cache built with [{}] files from file system in [{}] seconds",
+ count, watch.elapsed(TimeUnit.SECONDS));
+ return count;
+ }
+ }
+
+ /**
+ * Retrieves all the files present in the fs cache folder and builds the in-memory cache.
+ */
+ private int build() {
+ int count = 0;
+ // Iterate over all files in the cache folder
+ Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(cacheRoot)
+ .filter(new Predicate<File>() {
+ @Override public boolean apply(File input) {
+ return input.isFile() && !normalizeNoEndSeparator(input.getParent())
+ .equals(cacheRoot.getAbsolutePath());
+ }
+ }).iterator();
+ while (iter.hasNext()) {
+ File toBeSyncedFile = iter.next();
+ try {
+ put(toBeSyncedFile.getName(), toBeSyncedFile);
+ count++;
+ LOG.trace("Added file [{}} to in-memory cache", toBeSyncedFile);
+ } catch (Exception e) {
+ LOG.error("Error in putting cached file in map[{}]", toBeSyncedFile);
+ }
+ }
+ LOG.trace("[{}] files put in im-memory cache", count);
+ return count;
+ }
+
+ /**
+ * Create a placeholder in the file system cache folder for the given identifier.
+ *
+ * @param key
+ * @return File handle for the id
+ */
+ private File getCacheFile(String key) {
+ File file = cacheRoot;
+ file = new File(file, key.substring(0, 2));
+ file = new File(file, key.substring(2, 4));
+ file = new File(file, key.substring(4, 6));
+ return new File(file, key);
+ }
+}
+
+class FileCacheStats extends CacheStats implements DataStoreCacheStatsMBean {
+ private static final long KB = 4 * 1024;
+ private final Weigher<Object, Object> memWeigher;
+ private final Weigher<Object, Object> weigher;
+ private final Cache<Object, Object> cache;
+
+ /**
+ * Construct the cache stats object.
+ *
+ * @param cache the cache
+ * @param name the name of the cache
+ * @param weigher the weigher used to estimate the current weight
+ * @param maxWeight the maximum weight
+ */
+ public FileCacheStats(Cache<?, ?> cache, String name, Weigher<?, ?> weigher,
+ Weigher<?, ?> memWeigher, long maxWeight) {
+ super(cache, name, weigher, maxWeight);
+ this.memWeigher = (Weigher<Object, Object>) memWeigher;
+ this.weigher = (Weigher<Object, Object>) weigher;
+ this.cache = (Cache<Object, Object>) cache;
+ }
+
+ @Override
+ public long estimateCurrentMemoryWeight() {
+ if (memWeigher == null) {
+ return -1;
+ }
+ long size = 0;
+ for (Map.Entry<?, ?> e : cache.asMap().entrySet()) {
+ Object k = e.getKey();
+ Object v = e.getValue();
+ size += memWeigher.weigh(k, v);
+ }
+ return size;
+ }
+
+ @Override
+ public long estimateCurrentWeight() {
+ if (weigher == null) {
+ return -1;
+ }
+ long size = 0;
+ for (Map.Entry<?, ?> e : cache.asMap().entrySet()) {
+ Object k = e.getKey();
+ Object v = e.getValue();
+ size += weigher.weigh(k, v) * KB;
+ }
+ return size;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Objects.toStringHelper;
+import static java.lang.String.format;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+/**
+ * Cache for staging async uploads. This serves as a temporary cache for serving local
+ * requests till the time the upload has not been synced with the backend.
+ * <p>
+ * The appropriate backend for this cache are wrapped in {@link StagingUploader}
+ * implementations.
+ * <p>
+ * Stats:
+ * - Status for a particular upload
+ * - Upload time
+ */
+public class UploadStagingCache implements Closeable {
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
+
+ //Rough estimate of the in-memory key, value pair
+ private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
+ @Override public int weigh(String key, File value) {
+ return (StringUtils.estimateMemoryUsage(key) +
+ StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
+ }};
+
+ /**
+ * Max size of the upload staging cache in bytes
+ */
+ private final long size;
+
+ /**
+ * Current cache size in bytes
+ */
+ private final AtomicLong currentSize;
+
+ /**
+ * Executor for async uploads
+ */
+ private ListeningExecutorService executor;
+
+ /**
+ * Scheduled executor for build and remove
+ */
+ private ScheduledExecutorService removeExecutor;
+
+ /**
+ * In memory map for staged files
+ */
+ private final ConcurrentMap<String, File> map;
+
+ /**
+ * In memory map for files to be deleted after uploads
+ */
+ private final ConcurrentMap<String, File> attic;
+
+ /**
+ * Local directory where uploads are staged
+ */
+ private final File uploadCacheSpace;
+
+ /**
+ * Wrapper to where the blobs are uploaded/written
+ */
+ private final StagingUploader uploader;
+
+ /**
+ * Cache stats
+ */
+ private final StagingCacheStats cacheStats;
+
+ /**
+ * Handle for download cache if any
+ */
+ @Nullable
+ private final FileCache downloadCache;
+
+ public UploadStagingCache(File dir, int uploadThreads, long size /** bytes **/,
+ StagingUploader uploader,
+ @Nullable FileCache cache, StatisticsProvider statisticsProvider,
+ @Nullable ListeningExecutorService executor,
+ @Nullable ScheduledExecutorService scheduledExecutor,
+ long removalPeriod) {
+
+ this.currentSize = new AtomicLong();
+ this.size = size;
+ this.executor = executor;
+ if (executor == null) {
+ this.executor = MoreExecutors.listeningDecorator(Executors
+ .newFixedThreadPool(uploadThreads, new NamedThreadFactory("oak-ds-async-upload-thread")));
+ }
+
+ this.removeExecutor = scheduledExecutor;
+ if (scheduledExecutor == null) {
+ this.removeExecutor = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ this.map = Maps.newConcurrentMap();
+ this.attic = Maps.newConcurrentMap();
+ this.uploadCacheSpace = new File(dir, "upload");
+ this.uploader = uploader;
+ this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
+ this.downloadCache = cache;
+
+ build();
+
+ removeExecutor.scheduleAtFixedRate(new RemoveJob(), removalPeriod, removalPeriod,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Retrieves all the files staged in the staging area and schedules them for uploads.
+ */
+ private void build() {
+ LOG.info("Scheduling pending uploads");
+
+ Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
+ .filter(new Predicate<File>() {
+ @Override public boolean apply(File input) {
+ return input.isFile();
+ }
+ }).iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ File toBeSyncedFile = iter.next();
+ Optional<SettableFuture<Integer>> scheduled = put(toBeSyncedFile.getName(), toBeSyncedFile);
+ if (scheduled.isPresent()) {
+ count++;
+ }
+ }
+
+ LOG.info("Scheduled [{}] pending uploads", count);
+ }
+
+ /**
+ * Puts the file into the staging cache if possible.
+ * Returns an optional SettableFuture if staged for upload otherwise empty.
+ *
+ * @param id the id of the file to be staged
+ * @param input the file to be staged
+ * @return An Optional SettableFuture.
+ */
+ public Optional<SettableFuture<Integer>> put(String id, File input) {
+ cacheStats.markRequest();
+
+ long length = input.length();
+ File uploadFile = getFile(id);
+
+ // if size permits and not upload complete or already scheduled for upload
+ if (currentSize.addAndGet(length) <= size
+ && !attic.containsKey(id)
+ && map.putIfAbsent(id, uploadFile) == null ) {
+
+ try {
+ if (!uploadFile.exists()) {
+ FileUtils.moveFile(input, uploadFile);
+ LOG.trace("File [{}] moved to staging cache [{}]", input, uploadFile);
+ }
+
+ // update stats
+ cacheStats.markHit();
+ cacheStats.incrementCount();
+ cacheStats.incrementSize(length);
+ cacheStats.incrementMemSize(memWeigher.weigh(id, uploadFile));
+
+ return Optional.of(stage(id, uploadFile));
+ } catch (Exception e) {
+ LOG.info("Error moving file to staging", e);
+ //reset the current state and return empty flag as not added to cache
+ currentSize.addAndGet(-length);
+ map.remove(id, uploadFile);
+ }
+ } else {
+ currentSize.addAndGet(-length);
+ }
+ return Optional.absent();
+ }
+
+ /**
+ * Stages the file for async upload.
+ * * Puts the file into the stage caching file system directory
+ * * Schedules a job for upload to write using the given {@link StagingUploader}
+ * * Updates the internal map and size variable
+ * * Adds a callback listener to remove the file once finished
+ * @param id of the file to be staged
+ * @param upload the file to be staged
+ * @return a SettableFuture instance
+ */
+ private SettableFuture<Integer> stage(final String id, final File upload) {
+ final SettableFuture<Integer> result = SettableFuture.create();
+
+ try {
+ // create an async job
+ ListenableFuture<Integer> future = executor.submit(new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ try {
+ final TimerStats.Context uploadContext = cacheStats.startUpLoaderTimer();
+
+ uploader.write(id, upload);
+ LOG.debug("File added to backend [{}]", upload);
+
+ uploadContext.stop();
+
+ return 1;
+ } catch (Exception e) {
+ LOG.error("Error adding file to backend", e);
+ throw e;
+ }
+ }
+ });
+
+ // Add a callback to the returned Future object for handling success and error
+ Futures.addCallback(future, new FutureCallback<Integer>() {
+ @Override public void onSuccess(@Nullable Integer r) {
+ LOG.info("Successfully added [{}], [{}]", id, upload);
+
+ try {
+ // move to attic to be deleted and remove from in-memory map
+ attic.put(id, upload);
+
+ // Add the uploaded file to the download cache if available
+ if (downloadCache != null) {
+ // Touch the file to update timestamp and record length
+ FileUtils.touch(upload);
+ long length = upload.length();
+
+ downloadCache.put(id, upload);
+ LOG.debug("[{}] added to cache", id);
+
+ // Update stats for removal
+ currentSize.addAndGet(-length);
+ cacheStats.decrementSize(length);
+ }
+
+ map.remove(id);
+
+ // Remove from upload staging area
+ //LOG.info("File [{}] removed from cache [{}]", upload, remove(id));
+ } catch (IOException e) {
+ LOG.warn("Error in cleaning up [{}] from staging", upload);
+ }
+ result.set(r);
+ }
+
+ @Override public void onFailure(Throwable t) {
+ LOG.error("Error adding file to backend", t);
+ result.setException(t);
+ }
+ });
+ LOG.info("File [{}] scheduled for upload [{}]", upload, result);
+ } catch (Exception e) {
+ LOG.error("Error staging file for upload [{}]", upload, e);
+ }
+ return result;
+ }
+
+ /**
+ * Removes all cached from attic
+ */
+ private void remove() {
+ LOG.info("Starting purge of uploaded files");
+
+ Iterator<String> iterator = attic.keySet().iterator();
+ int count = 0;
+ while (iterator.hasNext()) {
+ String key = iterator.next();
+ try {
+ if (remove(key)) {
+ iterator.remove();
+ count++;
+ }
+ } catch (IOException e) {
+ LOG.error("Error in removing entry for id [{}]", key);
+ }
+ }
+
+ LOG.info("Finished purge of [{}] files", count);
+ }
+
+ private boolean remove(String id) throws IOException {
+ LOG.trace("Removing upload file for id [{}]", id);
+
+ // Check if not already scheduled for upload
+ if (!map.containsKey(id)) {
+ LOG.trace("upload map contains id [{}]", id);
+
+ File toBeDeleted = attic.get(id);
+ LOG.trace("Trying to delete file [{}]", toBeDeleted);
+ long length = toBeDeleted.length();
+
+ delete(toBeDeleted);
+ LOG.info("deleted file [{}]", toBeDeleted);
+
+ currentSize.addAndGet(-length);
+ // Update stats for removal
+ cacheStats.decrementSize(length);
+ cacheStats.decrementMemSize(memWeigher.weigh(id, toBeDeleted));
+ cacheStats.decrementCount();
+
+ LOG.info("Cache [{}] file deleted for id [{}]", toBeDeleted, id);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Delete the file from the staged cache and all its empty sub-directories.
+ *
+ * @param f the file to be deleted
+ * @throws IOException
+ */
+ private void delete(File f) throws IOException {
+ if (f.exists()) {
+ FileUtils.forceDelete(f);
+ LOG.debug("Deleted staged upload file [{}]", f);
+ }
+
+ // delete empty parent folders (except the main directory)
+ while (true) {
+ f = f.getParentFile();
+ if ((f == null) || f.equals(uploadCacheSpace)
+ || (f.list() == null)
+ || (f.list().length > 0)) {
+ break;
+ }
+ LOG.debug("Deleted directory [{}], [{}]", f, f.delete());
+ }
+ }
+
+ /**
+ * Returns the File if present or null otherwise.
+ * Any usage of the returned file should assert for its existence as the file
+ * could be purged from the file system once uploaded using
+ * {@link org.apache.jackrabbit.core.data.CachingDataStore.FilesUploader}.
+ *
+ * @param key of the file to check
+ * @return a File object if found
+ */
+ @Nullable
+ public File getIfPresent(String key) {
+ cacheStats.markLoad();
+ if (map.containsKey(key)) {
+ cacheStats.markLoadSuccess();
+ return map.get(key);
+ }
+ return null;
+ }
+
+ /**
+ * Create a placeholder in the file system cache folder for the given identifier.
+ *
+ * @param id of the file
+ * @return file handle
+ */
+ private File getFile(String id) {
+ File file = uploadCacheSpace;
+ file = new File(file, id.substring(0, 2));
+ file = new File(file, id.substring(2, 4));
+ file = new File(file, id.substring(4, 6));
+ return new File(file, id);
+ }
+
+ /**
+ * Cache related stats
+ *
+ * @return an instance of the {@link DataStoreCacheStatsMBean}.
+ */
+ public DataStoreCacheStatsMBean getStats() {
+ return cacheStats;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Uploads in progress on close [{}]", map.size());
+ LOG.info("Uploads completed but not cleared from cache [{}]", attic.size());
+ LOG.info("Staging cache stats on close [{}]", cacheStats.cacheInfoAsString());
+ new ExecutorCloser(executor).close();
+ new ExecutorCloser(removeExecutor).close();
+ }
+
+ /**
+ * Class which calls remove on all
+ */
+ class RemoveJob implements Runnable {
+ @Override
+ public void run() {
+ remove();
+ }
+ }
+}
+
+/**
+ * Upload Staging Cache Statistics.
+ */
+class StagingCacheStats extends AnnotatedStandardMBean implements DataStoreCacheStatsMBean {
+
+ private static final String HITS = "HITS";
+ private static final String REQUESTS = "REQUESTS";
+ private static final String UPLOAD_TIMER = "UPLOAD_TIMER";
+ private static final String LOAD_SUCCESS = "CACHE_LOAD_SUCCESS";
+ private static final String LOAD = "CACHE_LOAD";
+ private static final String CURRENT_SIZE = "CURRENT_SIZE";
+ private static final String CURRENT_MEM_SIZE = "CURRENT_MEM_SIZE";
+ private static final String COUNT = "COUNT";
+
+ private final String cacheName;
+
+ /** Max size in bytes configured for the cache **/
+ private final long maxWeight;
+
+ /** Tracking the number of uploads that could be staged **/
+ private final MeterStats hitMeter;
+
+ /** Tracking the number of requests to upload & stage **/
+ private final MeterStats requestMeter;
+
+ /** Tracking the number of get requests serviced by the cache **/
+ private final MeterStats loadSuccessMeter;
+
+ /** Tracking the number of get requests received by the cache **/
+ private final MeterStats loadMeter;
+
+ /** Tracking the upload time **/
+ private final TimerStats uploadTimer;
+
+ /** Tracking the current size in MB **/
+ private final CounterStats currentSizeMeter;
+
+ /** Tracking the in-memory size of cache **/
+ private final CounterStats currentMemSizeMeter;
+
+ /** Tracking the cache element count **/
+ private final CounterStats countMeter;
+
+ /** Handle to the cache **/
+ private final UploadStagingCache cache;
+
+ StagingCacheStats(UploadStagingCache cache, StatisticsProvider provider, long maxWeight) {
+ super(DataStoreCacheStatsMBean.class);
+ this.cache = cache;
+
+ StatisticsProvider statisticsProvider;
+ if (provider == null) {
+ statisticsProvider = StatisticsProvider.NOOP;
+ } else {
+ statisticsProvider = provider;
+ }
+
+ // Configure cache name
+ cacheName = "StagingCacheStats";
+
+ this.maxWeight = maxWeight;
+
+ // Fetch stats and time series
+ String statName;
+
+ statName = getStatName(HITS, cacheName);
+ hitMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(REQUESTS, cacheName);
+ requestMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(UPLOAD_TIMER, cacheName);
+ uploadTimer = statisticsProvider.getTimer(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(LOAD_SUCCESS, cacheName);
+ loadSuccessMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(LOAD, cacheName);
+ loadMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(CURRENT_SIZE, cacheName);
+ currentSizeMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(CURRENT_MEM_SIZE, cacheName);
+ currentMemSizeMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
+
+ statName = getStatName(COUNT, cacheName);
+ countMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
+ }
+
+ //~--------------------------------------< stats update methods
+
+ void markHit() {
+ hitMeter.mark();
+ }
+
+ void markRequest() {
+ requestMeter.mark();
+ }
+
+ void markLoadSuccess() {
+ loadSuccessMeter.mark();
+ }
+
+ void markLoad() {
+ loadMeter.mark();
+ }
+
+ TimerStats.Context startUpLoaderTimer() {
+ return this.uploadTimer.time();
+ }
+
+ void incrementCount() {
+ countMeter.inc();
+ }
+
+ void incrementSize(long size) {
+ currentSizeMeter.inc(size);
+ }
+
+ void incrementMemSize(long size) {
+ currentMemSizeMeter.inc(size);
+ }
+
+ void decrementCount() {
+ countMeter.dec();
+ }
+
+ void decrementSize(long size) {
+ currentSizeMeter.dec(size);
+ }
+
+
+ void decrementMemSize(int size) {
+ currentMemSizeMeter.dec(size);
+ }
+
+ @Override
+ public String getName() {
+ return cacheName;
+ }
+
+ @Override
+ public long getRequestCount() {
+ return requestMeter.getCount();
+ }
+
+ @Override
+ public long getHitCount() {
+ return hitMeter.getCount();
+ }
+
+ @Override
+ public double getHitRate() {
+ long hitCount = hitMeter.getCount();
+ long requestCount = requestMeter.getCount();
+ return (requestCount == 0L ? 0L : (double)hitCount/requestCount);
+ }
+
+ @Override
+ public long getMissCount() {
+ return requestMeter.getCount() - hitMeter.getCount();
+ }
+
+ @Override
+ public double getMissRate() {
+ long missCount = getMissCount();
+ long requestCount = requestMeter.getCount();
+ return (requestCount == 0L ? 0L : (double) missCount/requestCount);
+ }
+
+ @Override
+ public long getLoadCount() {
+ return loadMeter.getCount();
+ }
+
+ @Override
+ public long getLoadSuccessCount() {
+ return loadSuccessMeter.getCount();
+ }
+
+ @Override
+ public long getLoadExceptionCount() {
+ return (getLoadCount() - getLoadSuccessCount());
+ }
+
+ @Override
+ public double getLoadExceptionRate() {
+ long loadExceptionCount = getLoadExceptionCount();
+ long loadCount = loadMeter.getCount();
+ return (loadCount == 0L ? 0L : (double) loadExceptionCount/loadCount);
+ }
+
+ @Override
+ public long getElementCount() {
+ return countMeter.getCount();
+ }
+
+ @Override
+ public long getMaxTotalWeight() {
+ return maxWeight;
+ }
+
+ @Override
+ public long estimateCurrentWeight() {
+ return currentSizeMeter.getCount();
+ }
+
+ @Override
+ public long estimateCurrentMemoryWeight() {
+ return currentMemSizeMeter.getCount();
+ }
+
+ @Override
+ public String cacheInfoAsString() {
+ return toStringHelper("StagingCacheStats")
+ .add("requestCount", getRequestCount())
+ .add("hitCount", getHitCount())
+ .add("hitRate", format("%1.2f", getHitRate()))
+ .add("missCount", getMissCount())
+ .add("missRate", format("%1.2f", getMissRate()))
+ .add("loadCount", getLoadCount())
+ .add("loadSuccessCount", getLoadSuccessCount())
+ .add("elementCount", getElementCount())
+ .add("currentMemSize", estimateCurrentMemoryWeight())
+ .add("totalWeight", humanReadableByteCount(estimateCurrentWeight()))
+ .add("maxWeight", humanReadableByteCount(getMaxTotalWeight()))
+ .toString();
+ }
+
+ //~--------------------------------------< CacheStatsMbean - stats that are not (yet) available
+ @Override
+ public long getTotalLoadTime() {
+ return 0;
+ }
+
+ @Override
+ public double getAverageLoadPenalty() {
+ return 0;
+ }
+
+ @Override
+ public long getEvictionCount() {
+ return 0;
+ }
+
+ @Override
+ public void resetStats() {
+ }
+
+ //~--------------------------------------< private helpers
+
+ private static String getStatName(String meter, String cacheName) {
+ return cacheName + "." + meter;
+ }
+}
+
+/**
+ * Wrapper for backend used for uploading
+ */
+interface StagingUploader {
+ void write(String id, File f) throws DataStoreException;
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractListeningExecutorService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class for DataStore cache related tests.
+ */
+public class AbstractDataStoreCacheTest {
+ static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreCacheTest.class);
+
+ static class TestStagingUploader implements StagingUploader {
+ private final File root;
+
+ public TestStagingUploader(File dir) {
+ this.root = new File(dir, "datastore");
+ root.mkdirs();
+ }
+
+ @Override public void write(String id, File f) throws DataStoreException {
+ try {
+ File move = getFile(id, root);
+ move.getParentFile().mkdirs();
+ Files.copy(f, move);
+ LOG.info("In TestStagingUploader after write [{}]", move);
+ } catch (IOException e) {
+ throw new DataStoreException(e);
+ }
+ }
+
+ public File read(String id) {
+ return getFile(id, root);
+ }
+ }
+
+
+ static class TestCacheLoader<S, I> extends CacheLoader<String, FileInputStream> {
+ private final File root;
+
+ public TestCacheLoader(File dir) {
+ this.root = new File(dir, "datastore");
+ root.mkdirs();
+ }
+
+ public void write(String id, File f) throws DataStoreException {
+ try {
+ File move = getFile(id, root);
+ move.getParentFile().mkdirs();
+ Files.copy(f, move);
+ LOG.info("In TestCacheLoader after write [{}], [{}]", id, move);
+ } catch (IOException e) {
+ throw new DataStoreException(e);
+ }
+ }
+
+ @Override public FileInputStream load(@Nonnull String key) throws Exception {
+ return FileUtils.openInputStream(getFile(key, root));
+ }
+ }
+
+ static class TestPoolExecutor extends ThreadPoolExecutor {
+ private final CountDownLatch beforeLatch;
+ private final CountDownLatch afterLatch;
+
+ TestPoolExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch) {
+ super(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("oak-async-thread"));
+ this.beforeLatch = beforeLatch;
+ this.afterLatch = afterLatch;
+ }
+
+ @Override public void beforeExecute(Thread t, Runnable command) {
+ try {
+ LOG.trace("Before execution....waiting for latch");
+ beforeLatch.await();
+ LOG.trace("Before execution....after acquiring latch");
+ super.beforeExecute(t, command);
+ LOG.trace("Completed beforeExecute");
+ } catch (Exception e) {
+ LOG.trace("Error in before execute", e);
+ }
+ }
+
+ @Override protected void afterExecute(Runnable r, Throwable t) {
+ try {
+ LOG.trace("After execution....waiting for latch");
+ afterLatch.countDown();
+ LOG.trace("After execution....after acquiring latch");
+ super.afterExecute(r, t);
+ LOG.trace("Completed afterExecute");
+ } catch (Exception e) {
+ LOG.trace("Error in after execute", e);
+ }
+ }
+ }
+
+
+ static class TestExecutor extends AbstractListeningExecutorService {
+ private final CountDownLatch afterLatch;
+ private final ExecutorService delegate;
+ final List<ListenableFuture<Integer>> futures;
+
+ public TestExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch,
+ CountDownLatch afterExecuteLatch) {
+ this.delegate = new TestPoolExecutor(threads, beforeLatch, afterExecuteLatch);
+ this.futures = Lists.newArrayList();
+ this.afterLatch = afterLatch;
+ }
+
+ @Override @Nonnull public ListenableFuture<?> submit(@Nonnull Callable task) {
+ LOG.trace("Before submitting to super....");
+ ListenableFuture<Integer> submit = super.submit(task);
+ LOG.trace("After submitting to super....");
+
+ futures.add(submit);
+ Futures.addCallback(submit, new TestFutureCallback<Integer>(afterLatch));
+ LOG.trace("Added callback");
+
+ return submit;
+ }
+
+ @Override public void execute(@Nonnull Runnable command) {
+ delegate.execute(command);
+ }
+
+ @Override public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override @Nonnull public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit)
+ throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ static class TestFutureCallback<Integer> implements FutureCallback {
+ private final CountDownLatch latch;
+
+ public TestFutureCallback(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override public void onSuccess(@Nullable Object result) {
+ try {
+ LOG.trace("Waiting for latch in callback");
+ latch.await(100, TimeUnit.MILLISECONDS);
+ LOG.trace("Acquired latch in onSuccess");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override public void onFailure(@Nonnull Throwable t) {
+ try {
+ LOG.trace("Waiting for latch onFailure in callback");
+ latch.await(100, TimeUnit.MILLISECONDS);
+ LOG.trace("Acquired latch in onFailure");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ static InputStream randomStream(int seed, int size) {
+ Random r = new Random(seed);
+ byte[] data = new byte[size];
+ r.nextBytes(data);
+ return new ByteArrayInputStream(data);
+ }
+
+ private static File getFile(String id, File root) {
+ File file = root;
+ file = new File(file, id.substring(0, 2));
+ file = new File(file, id.substring(2, 4));
+ return new File(file, id);
+ }
+
+ static File copyToFile(InputStream stream, File file) throws IOException {
+ FileUtils.copyInputStreamToFile(stream, file);
+ return file;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1766330&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Mon Oct 24 04:49:49 2016
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link UploadStagingCache}.
+ */
+public class UploadStagingCacheTest extends AbstractDataStoreCacheTest {
+ private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCacheTest.class);
+ private static final String ID_PREFIX = "12345";
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ private final Closer closer = Closer.create();
+ private TestStagingUploader uploader;
+ private File root;
+ private CountDownLatch taskLatch;
+ private CountDownLatch callbackLatch;
+ private CountDownLatch afterExecuteLatch;
+ private TestExecutor executor;
+ private UploadStagingCache stagingCache;
+ private StatisticsProvider statsProvider;
+ private ScheduledExecutorService removeExecutor;
+
+ @Before
+ public void setup() throws IOException {
+ root = folder.newFolder();
+ init();
+ }
+
+ private void init() {
+ // uploader
+ uploader = new TestStagingUploader(root);
+
+ // create executor
+ taskLatch = new CountDownLatch(1);
+ callbackLatch = new CountDownLatch(1);
+ afterExecuteLatch = new CountDownLatch(1);
+ executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+
+ // stats
+ ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();
+ closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS));
+ statsProvider = new DefaultStatisticsProvider(statsExecutor);
+
+ removeExecutor = Executors.newSingleThreadScheduledExecutor();
+ closer.register(new ExecutorCloser(removeExecutor, 500, TimeUnit.MILLISECONDS));
+
+ //cache instance
+ stagingCache =
+ new UploadStagingCache(root, 1/*threads*/, 8 * 1024 /* bytes */,
+ uploader, null/*cache*/, statsProvider, executor, null, 3000);
+ closer.register(stagingCache);
+ }
+
+ @After
+ public void tear() throws IOException {
+ closer.close();
+ }
+
+ /**
+ * Stage file successful upload.
+ * @throws Exception
+ */
+ @Test
+ public void testAdd() throws Exception {
+ // add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+
+ assertFuture(futures, 0);
+ assertCacheStats(stagingCache, 0, 0, 1, 1);
+ }
+
+ /**
+ * Stage file unsuccessful upload.
+ * @throws Exception
+ */
+ @Test
+ public void testAddUploadException() throws Exception {
+ // Mock uploader to throw exception on write
+ final TestStagingUploader mockedDS = mock(TestStagingUploader.class);
+ doThrow(new DataStoreException("Error in writing blob")).when(mockedDS)
+ .write(Matchers.any(String.class), Matchers.any(File.class));
+
+ // initialize staging cache using the mocked uploader
+ stagingCache =
+ new UploadStagingCache(root, 1/*threads*/, 4 * 1024 /* bytes */,
+ mockedDS, null/*cache*/, statsProvider, executor, null, 3000);
+ closer.register(stagingCache);
+
+ // Add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+
+ waitFinish(futures);
+
+ // assert file retrieved from staging cache
+ File ret = stagingCache.getIfPresent(ID_PREFIX + 0);
+ assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()), ret));
+
+ assertEquals(1, stagingCache.getStats().getLoadCount());
+ assertEquals(1, stagingCache.getStats().getLoadSuccessCount());
+ assertCacheStats(stagingCache, 1, 4 * 1024, 1, 1);
+ }
+
+ /**
+ * Retrieve without adding.
+ * @throws Exception
+ */
+ @Test
+ public void testGetNoAdd() throws Exception {
+ File ret = stagingCache.getIfPresent(ID_PREFIX + 0);
+
+ // assert no file
+ assertNull(ret);
+ assertEquals(1, stagingCache.getStats().getLoadCount());
+ assertCacheStats(stagingCache, 0, 0, 0, 0);
+ }
+
+ /**
+ * Error in putting file to stage.
+ * @throws Exception
+ */
+ @Test
+ public void testPutMoveFileError() throws Exception {
+ Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, new File("empty"));
+ // assert no file
+ assertFalse(future.isPresent());
+ assertEquals(1, stagingCache.getStats().getMissCount());
+ assertCacheStats(stagingCache, 0, 0, 0, 1);
+ }
+
+ /**
+ * Put and retrieve different files concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void testGetAddDifferent() throws Exception {
+ //add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // Create an async retrieve task
+ final SettableFuture<File> retFuture = SettableFuture.create();
+ Thread t = new Thread(new Runnable() {
+ @Override public void run() {
+ retFuture.set(stagingCache.getIfPresent(ID_PREFIX + 1));
+ }
+ });
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ t.start();
+
+ //assert no file retrieve
+ assertNull(retFuture.get());
+ assertEquals(1, stagingCache.getStats().getLoadCount());
+
+ assertFuture(futures, 0);
+ assertCacheStats(stagingCache, 0, 0, 1, 1);
+ }
+
+ /**
+ * Stage file when cache full.
+ * @throws Exception
+ */
+ @Test
+ public void testCacheFullAdd() throws Exception {
+ // initialize cache to have restricted size
+ stagingCache =
+ new UploadStagingCache(root, 1/*threads*/, 4 * 1024 /* bytes */,
+ uploader, null/*cache*/, statsProvider, executor, null, 3000);
+ closer.register(stagingCache);
+
+ // add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // Add another load
+ File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+ Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2);
+ assertFalse(future2.isPresent());
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ assertFuture(futures, 0);
+
+ // Try 2nd upload again
+ Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 1, f2);
+ futures = Lists.newArrayList();
+ if (future.isPresent()) {
+ futures.add(future.get());
+ }
+ assertFuture(futures, 1);
+
+ assertCacheStats(stagingCache, 0, 0, 2, 3);
+ }
+
+ /**
+ * Stage same file concurrently.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSameAdd() throws Exception {
+ // Add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 0, f);
+ assertFalse(future2.isPresent());
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ assertFuture(futures, 0);
+
+ assertCacheStats(stagingCache, 0, 0, 1, 2);
+ }
+
+ /**
+ * Stage different files concurrently
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentDifferentAdd() throws Exception {
+ // Add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // Add diff load
+ File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
+ Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2);
+ if (future2.isPresent()) {
+ futures.add(future2.get());
+ }
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ assertFuture(futures, 0, 1);
+
+ assertCacheStats(stagingCache, 0, 0, 2, 2);
+ }
+
+ /**
+ * Concurrently retrieve after stage but before upload.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentGetDelete() throws Exception {
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ closer.register(new ExecutorCloser(executorService));
+
+ // Add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // Get a handle to the file and open stream
+ File file = stagingCache.getIfPresent(ID_PREFIX + 0);
+ final FileInputStream fStream = Files.newInputStreamSupplier(file).getInput();
+
+ // task to copy the steam to a file simulating read from the stream
+ File temp = folder.newFile();
+ CountDownLatch copyThreadLatch = new CountDownLatch(1);
+ SettableFuture<File> future1 =
+ copyStreamThread(executorService, fStream, temp, copyThreadLatch);
+
+ //start
+ taskLatch.countDown();
+ callbackLatch.countDown();
+
+ waitFinish(futures);
+
+ // trying copying now
+ copyThreadLatch.countDown();
+ future1.get();
+
+ assertTrue(Files.equal(temp, uploader.read(ID_PREFIX + 0)));
+ }
+
+ /**
+ * Concurrently stage and trigger delete after upload for same file.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentPutDeleteSame() throws Exception {
+ testConcurrentPutDelete(0);
+ }
+
+ /**
+ * Concurrently stage and trigger delete after upload for different file.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentPutDeleteDifferent() throws Exception {
+ testConcurrentPutDelete(1);
+ }
+
+ private void testConcurrentPutDelete(int diff) throws Exception {
+ ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ closer.register(new ExecutorCloser(executorService));
+ //start immediately
+ taskLatch.countDown();
+
+ // Add immediately
+ List<ListenableFuture<Integer>> futures = put(folder);
+
+ // New task to put another file
+ File f2 = copyToFile(randomStream(diff, 4 * 1024), folder.newFile());
+ CountDownLatch putThreadLatch = new CountDownLatch(1);
+ CountDownLatch triggerLatch = new CountDownLatch(1);
+ SettableFuture<Optional<SettableFuture<Integer>>> future1 =
+ putThread(executorService, diff, f2, stagingCache, putThreadLatch, triggerLatch);
+ putThreadLatch.countDown();
+
+ // wait for put thread to go ahead
+ callbackLatch.countDown();
+ ScheduledFuture<?> scheduledFuture =
+ removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+ triggerLatch.await();
+ if (future1.get().isPresent()) {
+ futures.add(future1.get().get());
+ }
+
+ ListenableFuture<List<Integer>> listListenableFuture = Futures.successfulAsList(futures);
+ try {
+ listListenableFuture.get();
+ scheduledFuture.get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+ uploader.read(ID_PREFIX + 0)));
+ assertTrue(Files.equal(copyToFile(randomStream(diff, 4 * 1024), folder.newFile()),
+ uploader.read(ID_PREFIX + diff)));
+ }
+
+ /**
+ * Test build on start.
+ * @throws Exception
+ */
+ @Test
+ public void testBuild() throws Exception {
+ // Add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+ // Close before uploading finished
+ closer.close();
+
+ // Start again
+ init();
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ afterExecuteLatch.await();
+
+ waitFinish(futures);
+
+ assertNull(stagingCache.getIfPresent(ID_PREFIX + 0));
+ assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+ uploader.read(ID_PREFIX + 0)));
+ assertCacheStats(stagingCache, 0, 0, 1, 1);
+ }
+
+ /** -------------------- Helper methods ----------------------------------------------------**/
+
+ private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor,
+ final InputStream fStream, final File temp, final CountDownLatch start) {
+ final SettableFuture<File> future = SettableFuture.create();
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.info("Waiting for start of copying");
+ start.await();
+ LOG.info("Starting copy of [{}]", temp);
+ FileUtils.copyInputStreamToFile(fStream, temp);
+ LOG.info("Finished retrieve");
+ future.set(temp);
+ } catch (Exception e) {
+ LOG.info("Exception in get", e);
+ }
+ }
+ });
+ return future;
+ }
+
+ private static SettableFuture<Optional<SettableFuture<Integer>>> putThread(
+ ListeningExecutorService executor, final int seed, final File f, final UploadStagingCache cache,
+ final CountDownLatch start, final CountDownLatch trigger) {
+ final SettableFuture<Optional<SettableFuture<Integer>>> future = SettableFuture.create();
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.info("Waiting for start to put");
+ start.await();
+ LOG.info("Starting put");
+ trigger.countDown();
+ Optional<SettableFuture<Integer>> opt = cache.put(ID_PREFIX + seed, f);
+ LOG.info("Finished put");
+ future.set(opt);
+ } catch (Exception e) {
+ LOG.info("Exception in get", e);
+ }
+ }
+ });
+ return future;
+ }
+
+ private void waitFinish(List<ListenableFuture<Integer>> futures) {
+ ListenableFuture<List<Integer>> listListenableFuture = Futures.successfulAsList(futures);
+ try {
+ listListenableFuture.get();
+ ScheduledFuture<?> scheduledFuture =
+ removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+ scheduledFuture.get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private List<ListenableFuture<Integer>> put(TemporaryFolder folder)
+ throws IOException {
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f);
+ List<ListenableFuture<Integer>> futures = Lists.newArrayList();
+ if (future.isPresent()) {
+ futures.add(future.get());
+ }
+ return futures;
+ }
+
+ private void assertFuture(List<ListenableFuture<Integer>> futures, int... seeds)
+ throws Exception {
+ waitFinish(futures);
+
+ for (int i = 0; i < seeds.length; i++) {
+ File upload = uploader.read(ID_PREFIX + seeds[i]);
+ assertFile(upload, seeds[i], folder);
+ }
+ }
+
+ private void assertFile(File f, int seed, TemporaryFolder folder) throws IOException {
+ assertTrue(f.exists());
+ File temp = copyToFile(randomStream(seed, 4 * 1024), folder.newFile());
+ assertTrue("Uploaded file content differs", FileUtils.contentEquals(temp, f));
+ }
+
+ private static void assertCacheStats(UploadStagingCache cache, long elems, long weight,
+ long hits, long count) {
+ assertEquals(elems, cache.getStats().getElementCount());
+ assertEquals(weight, cache.getStats().estimateCurrentWeight());
+ assertEquals(hits, cache.getStats().getHitCount());
+ assertEquals(count, cache.getStats().getRequestCount());
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
------------------------------------------------------------------------------
svn:eol-style = native