You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:51:04 UTC

svn commit: r1766337 - in /jackrabbit/oak/trunk: oak-blob/ oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/

Author: amitj
Date: Mon Oct 24 04:51:04 2016
New Revision: 1766337

URL: http://svn.apache.org/viewvc?rev=1766337&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore

* AbstractCachingDataStore implementation using the CompositeDataStoreCache
* Introduce SharedBackend interface using the JR2 Backend interface as a base

Added:
    jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-blob/pom.xml
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java

Modified: jackrabbit/oak/trunk/oak-blob/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/pom.xml?rev=1766337&r1=1766336&r2=1766337&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob/pom.xml Mon Oct 24 04:51:04 2016
@@ -79,6 +79,11 @@
     </dependency>
 
     <dependency>
+      <groupId>javax.jcr</groupId>
+      <artifactId>jcr</artifactId>
+      <version>2.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.jackrabbit</groupId>
       <artifactId>jackrabbit-data</artifactId>
       <version>${jackrabbit.version}</version>

Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java?rev=1766337&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java Mon Oct 24 04:51:04 2016
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.blob;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ */
+public interface SharedBackend {
+    /**
+     * Return inputstream of record identified by identifier.
+     *
+     * @param identifier
+     *            identifier of record.
+     * @return inputstream of the record.
+     * @throws DataStoreException
+     *             if record not found or any error.
+     */
+    InputStream read(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Stores file to backend with identifier used as key. If key pre-exists, it
+     * updates the timestamp of the key.
+     *
+     * @param identifier
+     *            key of the file
+     * @param file
+     *            file that would be stored in backend.
+     * @throws DataStoreException
+     *             for any error.
+     */
+    void write(DataIdentifier identifier, File file) throws DataStoreException;
+
+    /**
+     * Gets the record with the specified identifier
+     *
+     * @param id the record identifier
+     * @return the metadata DataRecord
+     */
+    DataRecord getRecord(DataIdentifier id) throws DataStoreException;
+
+
+    /**
+     * Returns identifiers of all records that exists in backend.
+     *
+     * @return iterator consisting of all identifiers
+     * @throws DataStoreException
+     */
+    Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException;
+
+
+    /**
+     * Returns a list of all DataRecords
+     *
+     * @return iterator over DataRecords
+     * @throws DataStoreException
+     */
+    Iterator<DataRecord> getAllRecords() throws DataStoreException;
+
+
+    /**
+     * This method check the existence of record in backend.
+     *
+     * @param identifier
+     *            identifier to be checked.
+     * @return true if records exists else false.
+     * @throws DataStoreException
+     */
+    boolean exists(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Close backend and release resources like database connection if any.
+     *
+     * @throws DataStoreException
+     */
+    void close() throws DataStoreException;
+
+    /**
+     * Delete record identified by identifier. No-op if identifier not found.
+     *
+     * @param identifier
+     * @throws DataStoreException
+     */
+    void deleteRecord(DataIdentifier identifier) throws DataStoreException;
+
+    /**
+     * Adds a metadata record with the specified name
+     *
+     * @param input the record input stream
+     * @param name the name
+     * @throws org.apache.jackrabbit.core.data.DataStoreException
+     */
+    void addMetadataRecord(final InputStream input, final String name) throws DataStoreException;
+
+    /**
+     * Adds a metadata record with the specified name
+     *
+     * @param input the record file
+     * @param name the name
+     * @throws org.apache.jackrabbit.core.data.DataStoreException
+     */
+    void addMetadataRecord(final File input, final String name) throws DataStoreException;
+
+    /**
+     * Gets the metadata of the specified name.
+     *
+     * @param name the name of the record
+     * @return the metadata DataRecord
+     */
+    DataRecord getMetadataRecord(String name);
+
+    /**
+     * Gets all the metadata with a specified prefix.
+     *
+     * @param prefix the prefix of the records to retrieve
+     * @return list of all the metadata DataRecords
+     */
+    List<DataRecord> getAllMetadataRecords(String prefix);
+
+    /**
+     * Deletes the metadata record with the specified name
+     *
+     * @param name the name of the record
+     * @return boolean to indicate success of deletion
+     */
+    boolean deleteMetadataRecord(String name);
+
+    /**
+     * Deletes all the metadata records with the specified prefix.
+     *
+     * @param prefix the prefix of the record
+     */
+    void deleteAllMetadataRecords(String prefix);
+
+    /**
+     * Initialize
+     */
+    void init() throws DataStoreException;
+}

Propchange: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1766337&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Mon Oct 24 04:51:04 2016
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
+import org.apache.jackrabbit.core.data.AbstractDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.util.TransientFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Caches files locally and stages files locally for async uploads.
+ * Configuration:
+ *
+ * <pre>
+ * &lt;DataStore class="org.apache.jackrabbit.oak.plugins.blob.AbstractCachingDataStore">
+ *
+ *     &lt;param name="{@link #setPath(String) path}"/>
+ *     &lt;param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/>
+ *     &lt;param name="{@link #setStagingSplitPercentage(int) staginSplitPercentage}" value="10"/>
+ *     &lt;param name="{@link #setUploadThreads(int) uploadThreads}" value="10"/>
+ *     &lt;param name="{@link #setStagingPurgeInterval(int) stagingPurgeInterval}" value="300"/>
+ * &lt;/DataStore>
+
+ */
+public abstract class AbstractSharedCachingDataStore extends AbstractDataStore
+    implements MultiDataStoreAware, SharedDataStore {
+    /**
+     * Logger instance.
+     */
+    static final Logger LOG = LoggerFactory.getLogger(org.apache.jackrabbit.core.data.LocalCache.class);
+
+    /**
+     * The digest algorithm used to uniquely identify records.
+     */
+    private static final String DIGEST = "SHA-1";
+
+    /**
+     * The root path
+     */
+    private String path;
+
+    /**
+     * The number of bytes in the cache. The default value is 64 GB.
+     */
+    private long cacheSize = 64L * 1024 * 1024 * 1024;
+
+    /**
+     * The % of cache utilized for upload staging.
+     */
+    private int stagingSplitPercentage = 10;
+
+    /**
+     * The number of upload threads used for asynchronous uploads from staging.
+     */
+    private int uploadThreads = 10;
+
+    /**
+     * The interval for remove job in seconds.
+     */
+    private int stagingPurgeInterval = 300;
+
+    /**
+     * The root rootDirectory where the files are created.
+     */
+    private File rootDirectory;
+
+    /**
+     * The rootDirectory where tmp files are created.
+     */
+    private File tmp;
+
+    /**
+     * Statistics provider.
+     */
+    private StatisticsProvider statisticsProvider;
+
+    /**
+     * DataStore cache
+     */
+    private CompositeDataStoreCache cache;
+
+    /**
+     * The delegate backend
+     */
+    protected SharedBackend backend;
+
+    protected ListeningExecutorService listeningExecutor;
+
+    protected ScheduledExecutorService schedulerExecutor;
+
+    public void init(String homeDir) throws DataStoreException {
+        if (path == null) {
+            path = homeDir + "/repository/datastore";
+        }
+
+        checkArgument(stagingSplitPercentage >= 0 && stagingSplitPercentage <= 50,
+            "Staging percentage cache should be between 0 and 50");
+
+        this.rootDirectory = new File(path);
+        this.tmp = new File(rootDirectory, "tmp");
+        tmp.mkdirs();
+
+        this.backend = createBackend();
+        backend.init();
+
+        this.cache = new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
+            new CacheLoader<String, InputStream>() {
+                @Override public InputStream load(String key) throws Exception {
+                    InputStream is = null;
+                    boolean threw = true;
+                    try {
+                        is = backend.read(new DataIdentifier(key));
+                        threw = false;
+                    } finally {
+                        Closeables.close(is, threw);
+                    }
+                    return is;
+                }
+            }, new StagingUploader() {
+                @Override
+                public void write(String id, File file) throws DataStoreException {
+                    backend.write(new DataIdentifier(id), file);
+                }
+            }, statisticsProvider, listeningExecutor, schedulerExecutor, stagingPurgeInterval);
+    }
+
+    protected abstract SharedBackend createBackend();
+
+    @Override
+    @Nullable
+    public DataRecord getRecordIfStored(DataIdentifier dataIdentifier)
+        throws DataStoreException {
+        // Return file attributes from cache only if corresponding file is cached
+        // This avoids downloading the file for just accessing the meta data
+        File cached = cache.getIfPresent(dataIdentifier.toString());
+        if (cached != null && cached.exists()) {
+            return new FileCacheDataRecord(this, dataIdentifier, cached.length(),
+                cached.lastModified());
+        }
+
+        // File not in cache so, retrieve the meta data from the backend explicitly
+        DataRecord record;
+        try {
+            return backend.getRecord(dataIdentifier);
+        } catch (Exception e) {
+            LOG.error("Error retrieving record [{}] from backend", dataIdentifier, e);
+        }
+        return null;
+    }
+
+    @Override
+    public DataRecord addRecord(InputStream inputStream) throws DataStoreException {
+        Stopwatch watch = Stopwatch.createStarted();
+        try {
+            TransientFileFactory fileFactory = TransientFileFactory.getInstance();
+            File tmpFile = fileFactory.createTransientFile("upload", null, tmp);
+
+            // Copy the stream to the temporary file and calculate the
+            // stream length and the message digest of the stream
+            MessageDigest digest = MessageDigest.getInstance(DIGEST);
+            OutputStream output = new DigestOutputStream(new FileOutputStream(tmpFile), digest);
+            long length = 0;
+            try {
+                length = IOUtils.copyLarge(inputStream, output);
+            } finally {
+                output.close();
+            }
+
+            DataIdentifier identifier = new DataIdentifier(encodeHexString(digest.digest()));
+            LOG.debug("SHA1 of [{}], length =[{}] took [{}]ms ",
+                new Object[] {identifier, length, watch.elapsed(TimeUnit.MILLISECONDS)});
+
+            File cachedFile;
+            // asynchronously stage for upload if the size limit of staging cache permits
+            if (!cache.stage(identifier.toString(), tmpFile)) {
+                backend.write(identifier, tmpFile);
+                // Update the last modified for the file if present in the download cache
+                cachedFile = cache.getIfPresent(identifier.toString());
+                if (cachedFile != null) {
+                    FileUtils.touch(cachedFile);
+                }
+            } else {
+                cachedFile = cache.getIfPresent(identifier.toString());
+            }
+
+            return new FileCacheDataRecord(this, identifier, cachedFile.length(), cachedFile.lastModified());
+        } catch (Exception e) {
+            LOG.error("Error in adding record");
+            throw new DataStoreException("Error in adding record ", e);
+        }
+    }
+
+    @Override
+    public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
+        return Iterators.concat(Iterators.transform(cache.getStagingCache().getAllIdentifiers(),
+            new Function<String, DataIdentifier>() {
+                @Nullable @Override public DataIdentifier apply(@Nullable String id) {
+                    return new DataIdentifier(id);
+                }
+            }), backend.getAllIdentifiers());
+    }
+
+    @Override
+    public void deleteRecord(DataIdentifier dataIdentifier) throws DataStoreException {
+        cache.invalidate(dataIdentifier.toString());
+        backend.deleteRecord(dataIdentifier);
+    }
+
+    @Override
+    public void close() throws DataStoreException {
+        backend.close();
+        cache.close();
+    }
+
+    /**
+     * Need a DataRecord implementation that
+     * * decorates the data record of the backened if available
+     * * creates a record from the paramaters of the file in cache
+     *
+     */
+    static class FileCacheDataRecord extends AbstractDataRecord {
+        private long length;
+        private long lastModified;
+        private AbstractSharedCachingDataStore store;
+
+        public FileCacheDataRecord(AbstractSharedCachingDataStore store, DataIdentifier identifier, long length,
+            long lastModified) {
+            super(store, identifier);
+            this.length = length;
+            this.lastModified = lastModified;
+            this.store = store;
+        }
+
+        @Override
+        public long getLength() throws DataStoreException {
+            return length;
+        }
+
+        @Override
+        public InputStream getStream() throws DataStoreException {
+            try {
+                return new FileInputStream(store.cache.get(getIdentifier().toString()));
+            } catch (final Exception e) {
+                throw new DataStoreException(
+                    "Error opening input stream for identifier " + getIdentifier(), e);
+            }
+        }
+
+        @Override
+        public long getLastModified() {
+            return lastModified;
+        }
+    }
+
+    /**
+     * Look in the backend for a record matching the given identifier.  Returns true
+     * if such a record exists.
+     *
+     * @param identifier - An identifier for the record.
+     * @return true if a record for the provided identifier can be found.
+     */
+    public boolean exists(final DataIdentifier identifier) {
+        try {
+            if (identifier != null) {
+                return backend.exists(identifier);
+            }
+        }
+        catch (DataStoreException e) {
+            LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads",
+                identifier), e);
+        }
+        return false;
+    }
+
+    public List<DataStoreCacheStatsMBean> getStats() {
+        return ImmutableList.of(cache.getCacheStats(), cache.getStagingCacheStats());
+    }
+
+    protected CompositeDataStoreCache getCache() {
+        return cache;
+    }
+
+    /**------------------------- setters ----------------------------------------------**/
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public void setCacheSize(long cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public void setStagingSplitPercentage(int stagingSplitPercentage) {
+        this.stagingSplitPercentage = stagingSplitPercentage;
+    }
+
+    public void setUploadThreads(int uploadThreads) {
+        this.uploadThreads = uploadThreads;
+    }
+
+    public void setStagingPurgeInterval(int stagingPurgeInterval) {
+        this.stagingPurgeInterval = stagingPurgeInterval;
+    }
+
+    public void setStatisticsProvider(StatisticsProvider statisticsProvider) {
+        this.statisticsProvider = statisticsProvider;
+    }
+
+    /**------------------------ SharedDataStore methods -----------------------------------------**/
+
+    @Override
+    public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+        backend.addMetadataRecord(stream, name);
+    }
+
+    @Override
+    public void addMetadataRecord(File f, String name) throws DataStoreException {
+        backend.addMetadataRecord(f, name);
+    }
+
+    @Override
+    public DataRecord getMetadataRecord(String name) {
+        return backend.getMetadataRecord(name);
+    }
+
+    @Override
+    public List<DataRecord> getAllMetadataRecords(String prefix) {
+        return backend.getAllMetadataRecords(prefix);
+    }
+
+    @Override
+    public boolean deleteMetadataRecord(String name) {
+        return backend.deleteMetadataRecord(name);
+    }
+
+    @Override
+    public void deleteAllMetadataRecords(String prefix) {
+        backend.deleteAllMetadataRecords(prefix);
+    }
+
+    @Override
+    public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+        return backend.getAllRecords();
+    }
+
+    @Override
+    public DataRecord getRecordForId(DataIdentifier identifier) throws DataStoreException {
+        return backend.getRecord(identifier);
+    }
+
+    @Override
+    public SharedDataStore.Type getType() {
+        return SharedDataStore.Type.SHARED;
+    }
+
+    /**------------------------ unimplemented methods -------------------------------------------**/
+
+    @Override
+    public void clearInUse() {
+        throw new UnsupportedOperationException("Operation not supported");
+    }
+
+    @Override
+    public void updateModifiedDateOnAccess(long l) {
+        throw new UnsupportedOperationException("Operation not supported");
+    }
+
+    @Override
+    public int deleteAllOlderThan(long l) throws DataStoreException {
+        throw new UnsupportedOperationException("Operation not supported");
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1766337&r1=1766336&r2=1766337&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Mon Oct 24 04:51:04 2016
@@ -21,9 +21,12 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -37,14 +40,18 @@ import javax.annotation.Nullable;
 
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractListeningExecutorService;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,9 +135,9 @@ public class AbstractDataStoreCacheTest
 
         @Override protected void afterExecute(Runnable r, Throwable t) {
             try {
-                LOG.trace("After execution....waiting for latch");
+                LOG.trace("After execution....counting down latch");
                 afterLatch.countDown();
-                LOG.trace("After execution....after acquiring latch");
+                LOG.trace("After execution....after counting down latch");
                 super.afterExecute(r, t);
                 LOG.trace("Completed afterExecute");
             } catch (Exception e) {
@@ -218,6 +225,81 @@ public class AbstractDataStoreCacheTest
         }
     }
 
+    // A mock Backend implementation that uses a Map to keep track of what
+    // records have been added and removed, for test purposes only.
+    class TestMemoryBackend implements SharedBackend {
+        final Map<DataIdentifier, File> _backend = Maps.newHashMap();
+
+        @Override public InputStream read(DataIdentifier identifier) throws DataStoreException {
+            try {
+                return new FileInputStream(_backend.get(identifier));
+            } catch (FileNotFoundException e) {
+                throw new DataStoreException(e);
+            }
+        }
+
+        @Override public void write(DataIdentifier identifier, File file)
+            throws DataStoreException {
+            if (file != null && file.exists()) {
+                _backend.put(identifier, file);
+            } else {
+                throw new DataStoreException(
+                    String.format("file %s of id %s", new Object[] {file, identifier.toString()}));
+            }
+        }
+
+        @Override public DataRecord getRecord(DataIdentifier id) throws DataStoreException {
+            return null;
+        }
+
+        @Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
+            return _backend.keySet().iterator();
+        }
+
+        @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+            return null;
+        }
+
+        @Override public boolean exists(DataIdentifier identifier) throws DataStoreException {
+            return _backend.containsKey(identifier);
+        }
+
+        @Override public void close() throws DataStoreException {
+        }
+
+        @Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+            if (_backend.containsKey(identifier)) {
+                _backend.remove(identifier);
+            }
+        }
+
+        @Override public void addMetadataRecord(InputStream input, String name)
+            throws DataStoreException {
+        }
+
+        @Override public void addMetadataRecord(File input, String name) throws DataStoreException {
+        }
+
+        @Override public DataRecord getMetadataRecord(String name) {
+            return null;
+        }
+
+        @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
+            return null;
+        }
+
+        @Override public boolean deleteMetadataRecord(String name) {
+            return false;
+        }
+
+        @Override public void deleteAllMetadataRecords(String prefix) {
+        }
+
+        @Override public void init() throws DataStoreException {
+
+        }
+    }
+
     static InputStream randomStream(int seed, int size) {
         Random r = new Random(seed);
         byte[] data = new byte[size];

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java?rev=1766337&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java Mon Oct 24 04:51:04 2016
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class CachingDataStoreTest extends AbstractDataStoreCacheTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CachingDataStoreTest.class);
+    private static final String ID_PREFIX = "12345";
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public ExpectedException expectedEx = ExpectedException.none();
+
+    private final Closer closer = Closer.create();
+    private File root;
+
+    private CountDownLatch taskLatch;
+    private CountDownLatch callbackLatch;
+    private CountDownLatch afterExecuteLatch;
+    private TestExecutor executor;
+    private StatisticsProvider statsProvider;
+    private ScheduledExecutorService scheduledExecutor;
+    private AbstractSharedCachingDataStore dataStore;
+
+    @Before
+    public void setup() throws Exception {
+        root = folder.newFolder();
+        init(1);
+    }
+
+    private void init(int i) throws Exception {
+        // create executor
+        taskLatch = new CountDownLatch(1);
+        callbackLatch = new CountDownLatch(1);
+        afterExecuteLatch = new CountDownLatch(i);
+        executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+
+        // stats
+        ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS));
+        statsProvider = new DefaultStatisticsProvider(statsExecutor);
+
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS));
+        dataStore = new AbstractSharedCachingDataStore() {
+            @Override protected SharedBackend createBackend() {
+                return new TestMemoryBackend();
+            }
+
+            @Override public int getMinRecordLength() {
+                return 0;
+            }
+        };
+        dataStore.setStatisticsProvider(statsProvider);
+        dataStore.listeningExecutor = executor;
+        dataStore.schedulerExecutor = scheduledExecutor;
+        dataStore.init(root.getAbsolutePath());
+    }
+
+    /**
+     * {@link CompositeDataStoreCache#getIfPresent(String)} when no cache.
+     */
+    @Test
+    public void getRecordNotAvailable() throws DataStoreException {
+        DataRecord rec = dataStore.getRecordIfStored(new DataIdentifier(ID_PREFIX + 0));
+        assertNull(rec);
+    }
+
+    /**
+     * {@link CompositeDataStoreCache#get(String)} when no cache.
+     * @throws IOException
+     */
+    @Test
+    public void exists() throws IOException {
+        assertFalse(dataStore.exists(new DataIdentifier(ID_PREFIX + 0)));
+    }
+
+    /**
+     * Add in datastore.
+     */
+    @Test
+    public void addDelete() throws Exception {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        String id = getIdForInputStream(f);
+        FileInputStream fin = new FileInputStream(f);
+        closer.register(fin);
+
+        DataRecord rec = dataStore.addRecord(fin);
+        assertEquals(id, rec.getIdentifier().toString());
+
+        //start & finish
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        waitFinish();
+
+        rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+        assertNotNull(rec);
+        assertFile(rec.getStream(), f, folder);
+
+        dataStore.deleteRecord(new DataIdentifier(id));
+        rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+        assertNull(rec);
+    }
+
+    @Test
+    public void addStagingAndDelete() throws Exception {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        String id = getIdForInputStream(f);
+        FileInputStream fin = new FileInputStream(f);
+        closer.register(fin);
+
+        DataRecord rec = dataStore.addRecord(fin);
+        assertEquals(id, rec.getIdentifier().toString());
+
+        rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+        assertNotNull(rec);
+        assertFile(rec.getStream(), f, folder);
+
+        dataStore.deleteRecord(new DataIdentifier(id));
+        rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+        assertNull(rec);
+
+        Thread.sleep(1000);
+        //start & finish
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        waitFinish();
+
+        rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+        assertNull(rec);
+    }
+
+    /**
+     * Get all Identifiers.
+     */
+    @Test
+    public void getAllIdentifiers() throws Exception {
+        File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+        String id = getIdForInputStream(f);
+        FileInputStream fin = new FileInputStream(f);
+        closer.register(fin);
+
+        DataRecord rec = dataStore.addRecord(fin);
+        assertEquals(id, rec.getIdentifier().toString());
+
+        assertTrue(Iterators.contains(dataStore.getAllIdentifiers(), new DataIdentifier(id)));
+
+        //start & finish
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        waitFinish();
+
+        assertTrue(Iterators.contains(dataStore.getAllIdentifiers(), new DataIdentifier(id)));
+    }
+
+    @After
+    public void tear() throws Exception {
+        closer.close();
+        dataStore.close();
+    }
+
+    private static void assertFile(InputStream is, File org, TemporaryFolder folder)
+        throws IOException {
+        try {
+            File ret = folder.newFile();
+            FileUtils.copyInputStreamToFile(is, ret);
+            assertTrue(Files.equal(org, ret));
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+    }
+
+    private String getIdForInputStream(File f)
+        throws Exception {
+        FileInputStream in = new FileInputStream(f);
+        MessageDigest digest = MessageDigest.getInstance("SHA-1");
+        OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
+        try {
+            IOUtils.copyLarge(in, output);
+        } finally {
+            IOUtils.closeQuietly(output);
+            IOUtils.closeQuietly(in);
+        }
+        return encodeHexString(digest.digest());
+    }
+
+    private void waitFinish() {
+        try {
+            // wait for upload finish
+            afterExecuteLatch.await();
+            // Force execute removal from staging cache
+            ScheduledFuture<?> scheduledFuture = scheduledExecutor
+                .schedule(dataStore.getCache().getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+            scheduledFuture.get();
+            LOG.info("After jobs completed");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native