You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/10/24 04:51:04 UTC
svn commit: r1766337 - in /jackrabbit/oak/trunk: oak-blob/
oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/
Author: amitj
Date: Mon Oct 24 04:51:04 2016
New Revision: 1766337
URL: http://svn.apache.org/viewvc?rev=1766337&view=rev
Log:
OAK-4979: Caching sub-system implementation for DataStore
* AbstractCachingDataStore implementation using the CompositeDataStoreCache
* Introduce SharedBackend interface using the JR2 Backend interface as a base
Added:
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-blob/pom.xml
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
Modified: jackrabbit/oak/trunk/oak-blob/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/pom.xml?rev=1766337&r1=1766336&r2=1766337&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob/pom.xml Mon Oct 24 04:51:04 2016
@@ -79,6 +79,11 @@
</dependency>
<dependency>
+ <groupId>javax.jcr</groupId>
+ <artifactId>jcr</artifactId>
+ <version>2.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>jackrabbit-data</artifactId>
<version>${jackrabbit.version}</version>
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java?rev=1766337&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java Mon Oct 24 04:51:04 2016
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.blob;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ */
+public interface SharedBackend {
+ /**
+ * Return inputstream of record identified by identifier.
+ *
+ * @param identifier
+ * identifier of record.
+ * @return inputstream of the record.
+ * @throws DataStoreException
+ * if record not found or any error.
+ */
+ InputStream read(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Stores file to backend with identifier used as key. If key pre-exists, it
+ * updates the timestamp of the key.
+ *
+ * @param identifier
+ * key of the file
+ * @param file
+ * file that would be stored in backend.
+ * @throws DataStoreException
+ * for any error.
+ */
+ void write(DataIdentifier identifier, File file) throws DataStoreException;
+
+ /**
+ * Gets the record with the specified identifier
+ *
+ * @param id the record identifier
+ * @return the metadata DataRecord
+ */
+ DataRecord getRecord(DataIdentifier id) throws DataStoreException;
+
+
+ /**
+ * Returns identifiers of all records that exists in backend.
+ *
+ * @return iterator consisting of all identifiers
+ * @throws DataStoreException
+ */
+ Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException;
+
+
+ /**
+ * Returns a list of all DataRecords
+ *
+ * @return iterator over DataRecords
+ * @throws DataStoreException
+ */
+ Iterator<DataRecord> getAllRecords() throws DataStoreException;
+
+
+ /**
+ * This method check the existence of record in backend.
+ *
+ * @param identifier
+ * identifier to be checked.
+ * @return true if records exists else false.
+ * @throws DataStoreException
+ */
+ boolean exists(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Close backend and release resources like database connection if any.
+ *
+ * @throws DataStoreException
+ */
+ void close() throws DataStoreException;
+
+ /**
+ * Delete record identified by identifier. No-op if identifier not found.
+ *
+ * @param identifier
+ * @throws DataStoreException
+ */
+ void deleteRecord(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Adds a metadata record with the specified name
+ *
+ * @param input the record input stream
+ * @param name the name
+ * @throws org.apache.jackrabbit.core.data.DataStoreException
+ */
+ void addMetadataRecord(final InputStream input, final String name) throws DataStoreException;
+
+ /**
+ * Adds a metadata record with the specified name
+ *
+ * @param input the record file
+ * @param name the name
+ * @throws org.apache.jackrabbit.core.data.DataStoreException
+ */
+ void addMetadataRecord(final File input, final String name) throws DataStoreException;
+
+ /**
+ * Gets the metadata of the specified name.
+ *
+ * @param name the name of the record
+ * @return the metadata DataRecord
+ */
+ DataRecord getMetadataRecord(String name);
+
+ /**
+ * Gets all the metadata with a specified prefix.
+ *
+ * @param prefix the prefix of the records to retrieve
+ * @return list of all the metadata DataRecords
+ */
+ List<DataRecord> getAllMetadataRecords(String prefix);
+
+ /**
+ * Deletes the metadata record with the specified name
+ *
+ * @param name the name of the record
+ * @return boolean to indicate success of deletion
+ */
+ boolean deleteMetadataRecord(String name);
+
+ /**
+ * Deletes all the metadata records with the specified prefix.
+ *
+ * @param prefix the prefix of the record
+ */
+ void deleteAllMetadataRecords(String prefix);
+
+ /**
+ * Initialize
+ */
+ void init() throws DataStoreException;
+}
Propchange: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/SharedBackend.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1766337&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Mon Oct 24 04:51:04 2016
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
+import org.apache.jackrabbit.core.data.AbstractDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.util.TransientFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Caches files locally and stages files locally for async uploads.
+ * Configuration:
+ *
+ * <pre>
+ * <DataStore class="org.apache.jackrabbit.oak.plugins.blob.AbstractCachingDataStore">
+ *
+ * <param name="{@link #setPath(String) path}"/>
+ * <param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/>
+ * <param name="{@link #setStagingSplitPercentage(int) staginSplitPercentage}" value="10"/>
+ * <param name="{@link #setUploadThreads(int) uploadThreads}" value="10"/>
+ * <param name="{@link #setStagingPurgeInterval(int) stagingPurgeInterval}" value="300"/>
+ * </DataStore>
+
+ */
+public abstract class AbstractSharedCachingDataStore extends AbstractDataStore
+ implements MultiDataStoreAware, SharedDataStore {
+ /**
+ * Logger instance.
+ */
+ static final Logger LOG = LoggerFactory.getLogger(org.apache.jackrabbit.core.data.LocalCache.class);
+
+ /**
+ * The digest algorithm used to uniquely identify records.
+ */
+ private static final String DIGEST = "SHA-1";
+
+ /**
+ * The root path
+ */
+ private String path;
+
+ /**
+ * The number of bytes in the cache. The default value is 64 GB.
+ */
+ private long cacheSize = 64L * 1024 * 1024 * 1024;
+
+ /**
+ * The % of cache utilized for upload staging.
+ */
+ private int stagingSplitPercentage = 10;
+
+ /**
+ * The number of upload threads used for asynchronous uploads from staging.
+ */
+ private int uploadThreads = 10;
+
+ /**
+ * The interval for remove job in seconds.
+ */
+ private int stagingPurgeInterval = 300;
+
+ /**
+ * The root rootDirectory where the files are created.
+ */
+ private File rootDirectory;
+
+ /**
+ * The rootDirectory where tmp files are created.
+ */
+ private File tmp;
+
+ /**
+ * Statistics provider.
+ */
+ private StatisticsProvider statisticsProvider;
+
+ /**
+ * DataStore cache
+ */
+ private CompositeDataStoreCache cache;
+
+ /**
+ * The delegate backend
+ */
+ protected SharedBackend backend;
+
+ protected ListeningExecutorService listeningExecutor;
+
+ protected ScheduledExecutorService schedulerExecutor;
+
+ public void init(String homeDir) throws DataStoreException {
+ if (path == null) {
+ path = homeDir + "/repository/datastore";
+ }
+
+ checkArgument(stagingSplitPercentage >= 0 && stagingSplitPercentage <= 50,
+ "Staging percentage cache should be between 0 and 50");
+
+ this.rootDirectory = new File(path);
+ this.tmp = new File(rootDirectory, "tmp");
+ tmp.mkdirs();
+
+ this.backend = createBackend();
+ backend.init();
+
+ this.cache = new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
+ new CacheLoader<String, InputStream>() {
+ @Override public InputStream load(String key) throws Exception {
+ InputStream is = null;
+ boolean threw = true;
+ try {
+ is = backend.read(new DataIdentifier(key));
+ threw = false;
+ } finally {
+ Closeables.close(is, threw);
+ }
+ return is;
+ }
+ }, new StagingUploader() {
+ @Override
+ public void write(String id, File file) throws DataStoreException {
+ backend.write(new DataIdentifier(id), file);
+ }
+ }, statisticsProvider, listeningExecutor, schedulerExecutor, stagingPurgeInterval);
+ }
+
+ protected abstract SharedBackend createBackend();
+
+ @Override
+ @Nullable
+ public DataRecord getRecordIfStored(DataIdentifier dataIdentifier)
+ throws DataStoreException {
+ // Return file attributes from cache only if corresponding file is cached
+ // This avoids downloading the file for just accessing the meta data
+ File cached = cache.getIfPresent(dataIdentifier.toString());
+ if (cached != null && cached.exists()) {
+ return new FileCacheDataRecord(this, dataIdentifier, cached.length(),
+ cached.lastModified());
+ }
+
+ // File not in cache so, retrieve the meta data from the backend explicitly
+ DataRecord record;
+ try {
+ return backend.getRecord(dataIdentifier);
+ } catch (Exception e) {
+ LOG.error("Error retrieving record [{}] from backend", dataIdentifier, e);
+ }
+ return null;
+ }
+
+ @Override
+ public DataRecord addRecord(InputStream inputStream) throws DataStoreException {
+ Stopwatch watch = Stopwatch.createStarted();
+ try {
+ TransientFileFactory fileFactory = TransientFileFactory.getInstance();
+ File tmpFile = fileFactory.createTransientFile("upload", null, tmp);
+
+ // Copy the stream to the temporary file and calculate the
+ // stream length and the message digest of the stream
+ MessageDigest digest = MessageDigest.getInstance(DIGEST);
+ OutputStream output = new DigestOutputStream(new FileOutputStream(tmpFile), digest);
+ long length = 0;
+ try {
+ length = IOUtils.copyLarge(inputStream, output);
+ } finally {
+ output.close();
+ }
+
+ DataIdentifier identifier = new DataIdentifier(encodeHexString(digest.digest()));
+ LOG.debug("SHA1 of [{}], length =[{}] took [{}]ms ",
+ new Object[] {identifier, length, watch.elapsed(TimeUnit.MILLISECONDS)});
+
+ File cachedFile;
+ // asynchronously stage for upload if the size limit of staging cache permits
+ if (!cache.stage(identifier.toString(), tmpFile)) {
+ backend.write(identifier, tmpFile);
+ // Update the last modified for the file if present in the download cache
+ cachedFile = cache.getIfPresent(identifier.toString());
+ if (cachedFile != null) {
+ FileUtils.touch(cachedFile);
+ }
+ } else {
+ cachedFile = cache.getIfPresent(identifier.toString());
+ }
+
+ return new FileCacheDataRecord(this, identifier, cachedFile.length(), cachedFile.lastModified());
+ } catch (Exception e) {
+ LOG.error("Error in adding record");
+ throw new DataStoreException("Error in adding record ", e);
+ }
+ }
+
+ @Override
+ public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
+ return Iterators.concat(Iterators.transform(cache.getStagingCache().getAllIdentifiers(),
+ new Function<String, DataIdentifier>() {
+ @Nullable @Override public DataIdentifier apply(@Nullable String id) {
+ return new DataIdentifier(id);
+ }
+ }), backend.getAllIdentifiers());
+ }
+
+ @Override
+ public void deleteRecord(DataIdentifier dataIdentifier) throws DataStoreException {
+ cache.invalidate(dataIdentifier.toString());
+ backend.deleteRecord(dataIdentifier);
+ }
+
+ @Override
+ public void close() throws DataStoreException {
+ backend.close();
+ cache.close();
+ }
+
+ /**
+ * Need a DataRecord implementation that
+ * * decorates the data record of the backened if available
+ * * creates a record from the paramaters of the file in cache
+ *
+ */
+ static class FileCacheDataRecord extends AbstractDataRecord {
+ private long length;
+ private long lastModified;
+ private AbstractSharedCachingDataStore store;
+
+ public FileCacheDataRecord(AbstractSharedCachingDataStore store, DataIdentifier identifier, long length,
+ long lastModified) {
+ super(store, identifier);
+ this.length = length;
+ this.lastModified = lastModified;
+ this.store = store;
+ }
+
+ @Override
+ public long getLength() throws DataStoreException {
+ return length;
+ }
+
+ @Override
+ public InputStream getStream() throws DataStoreException {
+ try {
+ return new FileInputStream(store.cache.get(getIdentifier().toString()));
+ } catch (final Exception e) {
+ throw new DataStoreException(
+ "Error opening input stream for identifier " + getIdentifier(), e);
+ }
+ }
+
+ @Override
+ public long getLastModified() {
+ return lastModified;
+ }
+ }
+
+ /**
+ * Look in the backend for a record matching the given identifier. Returns true
+ * if such a record exists.
+ *
+ * @param identifier - An identifier for the record.
+ * @return true if a record for the provided identifier can be found.
+ */
+ public boolean exists(final DataIdentifier identifier) {
+ try {
+ if (identifier != null) {
+ return backend.exists(identifier);
+ }
+ }
+ catch (DataStoreException e) {
+ LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads",
+ identifier), e);
+ }
+ return false;
+ }
+
+ public List<DataStoreCacheStatsMBean> getStats() {
+ return ImmutableList.of(cache.getCacheStats(), cache.getStagingCacheStats());
+ }
+
+ protected CompositeDataStoreCache getCache() {
+ return cache;
+ }
+
+ /**------------------------- setters ----------------------------------------------**/
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setCacheSize(long cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public void setStagingSplitPercentage(int stagingSplitPercentage) {
+ this.stagingSplitPercentage = stagingSplitPercentage;
+ }
+
+ public void setUploadThreads(int uploadThreads) {
+ this.uploadThreads = uploadThreads;
+ }
+
+ public void setStagingPurgeInterval(int stagingPurgeInterval) {
+ this.stagingPurgeInterval = stagingPurgeInterval;
+ }
+
+ public void setStatisticsProvider(StatisticsProvider statisticsProvider) {
+ this.statisticsProvider = statisticsProvider;
+ }
+
+ /**------------------------ SharedDataStore methods -----------------------------------------**/
+
+ @Override
+ public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+ backend.addMetadataRecord(stream, name);
+ }
+
+ @Override
+ public void addMetadataRecord(File f, String name) throws DataStoreException {
+ backend.addMetadataRecord(f, name);
+ }
+
+ @Override
+ public DataRecord getMetadataRecord(String name) {
+ return backend.getMetadataRecord(name);
+ }
+
+ @Override
+ public List<DataRecord> getAllMetadataRecords(String prefix) {
+ return backend.getAllMetadataRecords(prefix);
+ }
+
+ @Override
+ public boolean deleteMetadataRecord(String name) {
+ return backend.deleteMetadataRecord(name);
+ }
+
+ @Override
+ public void deleteAllMetadataRecords(String prefix) {
+ backend.deleteAllMetadataRecords(prefix);
+ }
+
+ @Override
+ public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+ return backend.getAllRecords();
+ }
+
+ @Override
+ public DataRecord getRecordForId(DataIdentifier identifier) throws DataStoreException {
+ return backend.getRecord(identifier);
+ }
+
+ @Override
+ public SharedDataStore.Type getType() {
+ return SharedDataStore.Type.SHARED;
+ }
+
+ /**------------------------ unimplemented methods -------------------------------------------**/
+
+ @Override
+ public void clearInUse() {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public void updateModifiedDateOnAccess(long l) {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+
+ @Override
+ public int deleteAllOlderThan(long l) throws DataStoreException {
+ throw new UnsupportedOperationException("Operation not supported");
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1766337&r1=1766336&r2=1766337&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Mon Oct 24 04:51:04 2016
@@ -21,9 +21,12 @@ package org.apache.jackrabbit.oak.plugin
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -37,14 +40,18 @@ import javax.annotation.Nullable;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractListeningExecutorService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,9 +135,9 @@ public class AbstractDataStoreCacheTest
@Override protected void afterExecute(Runnable r, Throwable t) {
try {
- LOG.trace("After execution....waiting for latch");
+ LOG.trace("After execution....counting down latch");
afterLatch.countDown();
- LOG.trace("After execution....after acquiring latch");
+ LOG.trace("After execution....after counting down latch");
super.afterExecute(r, t);
LOG.trace("Completed afterExecute");
} catch (Exception e) {
@@ -218,6 +225,81 @@ public class AbstractDataStoreCacheTest
}
}
+ // A mock Backend implementation that uses a Map to keep track of what
+ // records have been added and removed, for test purposes only.
+ class TestMemoryBackend implements SharedBackend {
+ final Map<DataIdentifier, File> _backend = Maps.newHashMap();
+
+ @Override public InputStream read(DataIdentifier identifier) throws DataStoreException {
+ try {
+ return new FileInputStream(_backend.get(identifier));
+ } catch (FileNotFoundException e) {
+ throw new DataStoreException(e);
+ }
+ }
+
+ @Override public void write(DataIdentifier identifier, File file)
+ throws DataStoreException {
+ if (file != null && file.exists()) {
+ _backend.put(identifier, file);
+ } else {
+ throw new DataStoreException(
+ String.format("file %s of id %s", new Object[] {file, identifier.toString()}));
+ }
+ }
+
+ @Override public DataRecord getRecord(DataIdentifier id) throws DataStoreException {
+ return null;
+ }
+
+ @Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
+ return _backend.keySet().iterator();
+ }
+
+ @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+ return null;
+ }
+
+ @Override public boolean exists(DataIdentifier identifier) throws DataStoreException {
+ return _backend.containsKey(identifier);
+ }
+
+ @Override public void close() throws DataStoreException {
+ }
+
+ @Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+ if (_backend.containsKey(identifier)) {
+ _backend.remove(identifier);
+ }
+ }
+
+ @Override public void addMetadataRecord(InputStream input, String name)
+ throws DataStoreException {
+ }
+
+ @Override public void addMetadataRecord(File input, String name) throws DataStoreException {
+ }
+
+ @Override public DataRecord getMetadataRecord(String name) {
+ return null;
+ }
+
+ @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
+ return null;
+ }
+
+ @Override public boolean deleteMetadataRecord(String name) {
+ return false;
+ }
+
+ @Override public void deleteAllMetadataRecords(String prefix) {
+ }
+
+ @Override public void init() throws DataStoreException {
+
+ }
+ }
+
static InputStream randomStream(int seed, int size) {
Random r = new Random(seed);
byte[] data = new byte[size];
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java?rev=1766337&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java Mon Oct 24 04:51:04 2016
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.Closer;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.spi.blob.SharedBackend;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class CachingDataStoreTest extends AbstractDataStoreCacheTest {
+ private static final Logger LOG = LoggerFactory.getLogger(CachingDataStoreTest.class);
+ private static final String ID_PREFIX = "12345";
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ private final Closer closer = Closer.create();
+ private File root;
+
+ private CountDownLatch taskLatch;
+ private CountDownLatch callbackLatch;
+ private CountDownLatch afterExecuteLatch;
+ private TestExecutor executor;
+ private StatisticsProvider statsProvider;
+ private ScheduledExecutorService scheduledExecutor;
+ private AbstractSharedCachingDataStore dataStore;
+
+ @Before
+ public void setup() throws Exception {
+ root = folder.newFolder();
+ init(1);
+ }
+
+ private void init(int i) throws Exception {
+ // create executor
+ taskLatch = new CountDownLatch(1);
+ callbackLatch = new CountDownLatch(1);
+ afterExecuteLatch = new CountDownLatch(i);
+ executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
+
+ // stats
+ ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor();
+ closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS));
+ statsProvider = new DefaultStatisticsProvider(statsExecutor);
+
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS));
+ dataStore = new AbstractSharedCachingDataStore() {
+ @Override protected SharedBackend createBackend() {
+ return new TestMemoryBackend();
+ }
+
+ @Override public int getMinRecordLength() {
+ return 0;
+ }
+ };
+ dataStore.setStatisticsProvider(statsProvider);
+ dataStore.listeningExecutor = executor;
+ dataStore.schedulerExecutor = scheduledExecutor;
+ dataStore.init(root.getAbsolutePath());
+ }
+
+ /**
+ * {@link CompositeDataStoreCache#getIfPresent(String)} when no cache.
+ */
+ @Test
+ public void getRecordNotAvailable() throws DataStoreException {
+ DataRecord rec = dataStore.getRecordIfStored(new DataIdentifier(ID_PREFIX + 0));
+ assertNull(rec);
+ }
+
+ /**
+ * {@link CompositeDataStoreCache#get(String)} when no cache.
+ * @throws IOException
+ */
+ @Test
+ public void exists() throws IOException {
+ assertFalse(dataStore.exists(new DataIdentifier(ID_PREFIX + 0)));
+ }
+
+ /**
+ * Add in datastore.
+ */
+ @Test
+ public void addDelete() throws Exception {
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ String id = getIdForInputStream(f);
+ FileInputStream fin = new FileInputStream(f);
+ closer.register(fin);
+
+ DataRecord rec = dataStore.addRecord(fin);
+ assertEquals(id, rec.getIdentifier().toString());
+
+ //start & finish
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ waitFinish();
+
+ rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+ assertNotNull(rec);
+ assertFile(rec.getStream(), f, folder);
+
+ dataStore.deleteRecord(new DataIdentifier(id));
+ rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+ assertNull(rec);
+ }
+
+ @Test
+ public void addStagingAndDelete() throws Exception {
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ String id = getIdForInputStream(f);
+ FileInputStream fin = new FileInputStream(f);
+ closer.register(fin);
+
+ DataRecord rec = dataStore.addRecord(fin);
+ assertEquals(id, rec.getIdentifier().toString());
+
+ rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+ assertNotNull(rec);
+ assertFile(rec.getStream(), f, folder);
+
+ dataStore.deleteRecord(new DataIdentifier(id));
+ rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+ assertNull(rec);
+
+ Thread.sleep(1000);
+ //start & finish
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ waitFinish();
+
+ rec = dataStore.getRecordIfStored(new DataIdentifier(id));
+ assertNull(rec);
+ }
+
+ /**
+ * Get all Identifiers.
+ */
+ @Test
+ public void getAllIdentifiers() throws Exception {
+ File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
+ String id = getIdForInputStream(f);
+ FileInputStream fin = new FileInputStream(f);
+ closer.register(fin);
+
+ DataRecord rec = dataStore.addRecord(fin);
+ assertEquals(id, rec.getIdentifier().toString());
+
+ assertTrue(Iterators.contains(dataStore.getAllIdentifiers(), new DataIdentifier(id)));
+
+ //start & finish
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ waitFinish();
+
+ assertTrue(Iterators.contains(dataStore.getAllIdentifiers(), new DataIdentifier(id)));
+ }
+
+ @After
+ public void tear() throws Exception {
+ closer.close();
+ dataStore.close();
+ }
+
+ private static void assertFile(InputStream is, File org, TemporaryFolder folder)
+ throws IOException {
+ try {
+ File ret = folder.newFile();
+ FileUtils.copyInputStreamToFile(is, ret);
+ assertTrue(Files.equal(org, ret));
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
+ }
+
+ private String getIdForInputStream(File f)
+ throws Exception {
+ FileInputStream in = new FileInputStream(f);
+ MessageDigest digest = MessageDigest.getInstance("SHA-1");
+ OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
+ try {
+ IOUtils.copyLarge(in, output);
+ } finally {
+ IOUtils.closeQuietly(output);
+ IOUtils.closeQuietly(in);
+ }
+ return encodeHexString(digest.digest());
+ }
+
+ private void waitFinish() {
+ try {
+ // wait for upload finish
+ afterExecuteLatch.await();
+ // Force execute removal from staging cache
+ ScheduledFuture<?> scheduledFuture = scheduledExecutor
+ .schedule(dataStore.getCache().getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS);
+ scheduledFuture.get();
+ LOG.info("After jobs completed");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CachingDataStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native