You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/12/01 08:42:03 UTC
svn commit: r1772153 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/blob/
test/java/org/apache/jackrabbit/oak/plugins/blob/
Author: amitj
Date: Thu Dec 1 08:42:02 2016
New Revision: 1772153
URL: http://svn.apache.org/viewvc?rev=1772153&view=rev
Log:
OAK-5201: Support upgrade of DataStore cache
- Upgrade utility class exposes methods to move older cached files to newer cache areas.
- UploadStagingCache and FIleCache now upgrade the previous version of cache files if available to their respective caches.
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Thu Dec 1 08:42:02 2016
@@ -159,8 +159,10 @@ public abstract class AbstractSharedCach
this.backend = createBackend();
backend.init();
+ String home = FilenameUtils.normalizeNoEndSeparator(new File(homeDir).getAbsolutePath());
this.cache =
- new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
+ new CompositeDataStoreCache(path, new File(home), cacheSize, stagingSplitPercentage,
+ uploadThreads,
new CacheLoader<String, InputStream>() {
@Override public InputStream load(String key) throws Exception {
InputStream is = null;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Thu Dec 1 08:42:02 2016
@@ -59,7 +59,7 @@ public class CompositeDataStoreCache ext
*/
private final File directory;
- public CompositeDataStoreCache(String path, long size, int uploadSplitPercentage,
+ public CompositeDataStoreCache(String path, File home, long size, int uploadSplitPercentage,
int uploadThreads, CacheLoader<String, InputStream> loader, final StagingUploader uploader,
StatisticsProvider statsProvider, ListeningExecutorService executor,
ScheduledExecutorService scheduledExecutor /* purge scheduled executor */,
@@ -73,11 +73,11 @@ public class CompositeDataStoreCache ext
long uploadSize = (size * uploadSplitPercentage) / 100;
- this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null);
-
this.stagingCache = UploadStagingCache
- .build(directory, uploadThreads, uploadSize, uploader, downloadCache, statsProvider,
+ .build(directory, home, uploadThreads, uploadSize, uploader, null, statsProvider,
executor, scheduledExecutor, purgeInterval, stagingRetryInterval);
+ this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null);
+ stagingCache.setDownloadCache(downloadCache);
}
@Nullable
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java?rev=1772153&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java Thu Dec 1 08:42:02 2016
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUtils.recursiveDelete;
+
+/**
+ * Utility methods to upgrade Old DataStore cache
+ * {@link org.apache.jackrabbit.core.data.CachingDataStore}.
+ */
+public class DataStoreCacheUpgradeUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(DataStoreCacheUpgradeUtils.class);
+
+ static final String UPLOAD_MAP = "async-pending-uploads.ser";
+ static final String UPLOAD_STAGING_DIR = UploadStagingCache.UPLOAD_STAGING_DIR;
+ static final String DOWNLOAD_DIR = FileCache.DOWNLOAD_DIR;
+
+ private static Map<String, Long> deSerializeUploadMap(File homeDir) {
+ Map<String, Long> asyncUploadMap = Maps.newHashMap();
+
+ File asyncUploadMapFile = new File(homeDir, UPLOAD_MAP);
+ if (asyncUploadMapFile.exists()) {
+ String path = asyncUploadMapFile.getAbsolutePath();
+
+ InputStream fis = null;
+ try {
+ fis = new BufferedInputStream(new FileInputStream(path));
+ ObjectInput input = new ObjectInputStream(fis);
+ asyncUploadMap = (Map<String, Long>) input.readObject();
+ } catch (Exception e) {
+ LOG.warn("Error in reading pending uploads map [{}] from location [{}]", UPLOAD_MAP,
+ homeDir);
+ } finally {
+ IOUtils.closeQuietly(fis);
+ }
+ LOG.debug("AsyncUploadMap read [{}]", asyncUploadMap);
+ }
+ return asyncUploadMap;
+ }
+
+ private static void deleteSerializedUploadMap(File homeDir) {
+ File uploadMap = new File(homeDir, UPLOAD_MAP);
+ FileUtils.deleteQuietly(uploadMap);
+ LOG.info("Deleted asyncUploadMap [{}] from [{}]", UPLOAD_MAP, homeDir);
+ }
+
+ private static boolean notInExceptions(File file, List<String> exceptions) {
+ String parent = file.getParent();
+ for (String exception : exceptions) {
+ if (parent.contains(exception)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static void moveDownloadCache(final File path) {
+ final List<String> exceptions = ImmutableList.of("tmp", UPLOAD_STAGING_DIR, DOWNLOAD_DIR);
+ File newDownloadDir = new File(path, DOWNLOAD_DIR);
+
+ Iterator<File> iterator =
+ Files.fileTreeTraverser().postOrderTraversal(path)
+ .filter(new Predicate<File>() {
+ @Override public boolean apply(File input) {
+ return input.isFile()
+ && !input.getParentFile().equals(path)
+ && !notInExceptions(input, exceptions);
+ }
+ }).iterator();
+
+ while (iterator.hasNext()) {
+ File download = iterator.next();
+ LOG.trace("Download cache file absolute pre-upgrade path " + download);
+
+ File newDownload = new File(newDownloadDir,
+ download.getAbsolutePath().substring(path.getAbsolutePath().length()));
+ newDownload.getParentFile().mkdirs();
+ LOG.trace("Downloaded cache file absolute post-upgrade path " + newDownload);
+
+ try {
+ FileUtils.moveFile(download, newDownload);
+ LOG.info("Download cache file [{}] moved to [{}]", download, newDownload);
+ recursiveDelete(download, path);
+ } catch (Exception e) {
+ LOG.warn("Unable to move download cache file [{}] to [{}]", download, newDownload);
+ }
+ }
+ }
+
+ public static void movePendingUploadsToStaging(File homeDir, File path, boolean deleteMap) {
+ File newUploadDir = new File(path, UPLOAD_STAGING_DIR);
+
+ Map<String, Long> pendingUploads = deSerializeUploadMap(homeDir);
+ Iterator<String> pendingFileIter = pendingUploads.keySet().iterator();
+
+ while(pendingFileIter.hasNext()) {
+ String file = pendingFileIter.next();
+ File upload = new File(path, file);
+ LOG.trace("Pending upload absolute path " + upload.getAbsolutePath());
+
+ File newUpload = new File(newUploadDir, file);
+ LOG.trace("Pending upload upgrade absolute path " + newUpload.getAbsolutePath());
+
+ newUpload.getParentFile().mkdirs();
+
+ if (upload.exists()) {
+ LOG.trace(upload + " File exists");
+ try {
+ FileUtils.moveFile(upload, newUpload);
+ LOG.info("Pending upload file [{}] moved to [{}]", upload, newUpload);
+ recursiveDelete(upload, path);
+ } catch (Exception e) {
+ LOG.warn("Unable to move pending upload file [{}] to [{}]", upload, newUpload);
+ }
+ } else {
+ LOG.warn("File [{}] does not exist", upload);
+ }
+ }
+
+ if (deleteMap) {
+ deleteSerializedUploadMap(homeDir);
+ }
+ }
+
+ public static void upgrade(File homeDir, File path, boolean moveCache, boolean deleteMap) {
+ movePendingUploadsToStaging(homeDir, path, deleteMap);
+
+ if (moveCache) {
+ moveDownloadCache(path);
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java Thu Dec 1 08:42:02 2016
@@ -62,6 +62,13 @@ public class FileCache extends AbstractC
*/
private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
+ protected static final String DOWNLOAD_DIR = "download";
+
+ /**
+ * Parent of the cache root directory
+ */
+ private File parent;
+
/**
* The cacheRoot directory of the cache.
*/
@@ -91,7 +98,8 @@ public class FileCache extends AbstractC
private FileCache(long maxSize /* bytes */, File root,
final CacheLoader<String, InputStream> loader, @Nullable final ExecutorService executor) {
- this.cacheRoot = new File(root, "download");
+ this.parent = root;
+ this.cacheRoot = new File(root, DOWNLOAD_DIR);
/* convert to 4 KB block */
long size = Math.round(maxSize / (1024L * 4));
@@ -286,6 +294,10 @@ public class FileCache extends AbstractC
*/
private int build() {
int count = 0;
+
+ // Move older generation cache downloaded files to the new folder
+ DataStoreCacheUpgradeUtils.moveDownloadCache(parent);
+
// Iterate over all files in the cache folder
Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(cacheRoot)
.filter(new Predicate<File>() {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Thu Dec 1 08:42:02 2016
@@ -63,6 +63,8 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Objects.toStringHelper;
import static java.lang.String.format;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils
+ .movePendingUploadsToStaging;
/**
* Cache for staging async uploads. This serves as a temporary cache for serving local
@@ -78,6 +80,8 @@ public class UploadStagingCache implemen
*/
private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
+ protected static final String UPLOAD_STAGING_DIR = "upload";
+
//Rough estimate of the in-memory key, value pair
private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
@Override public int weigh(String key, File value) {
@@ -141,7 +145,7 @@ public class UploadStagingCache implemen
*/
private LinkedBlockingQueue<String> retryQueue;
- private UploadStagingCache(File dir, int uploadThreads, long size /* bytes */,
+ private UploadStagingCache(File dir, File home, int uploadThreads, long size /* bytes */,
StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider,
@Nullable ListeningExecutorService executor,
@Nullable ScheduledExecutorService scheduledExecutor,
@@ -169,7 +173,7 @@ public class UploadStagingCache implemen
this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
this.downloadCache = cache;
- build();
+ build(home, dir);
this.scheduledExecutor
.scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
@@ -180,13 +184,13 @@ public class UploadStagingCache implemen
private UploadStagingCache() {
}
- public static UploadStagingCache build(File dir, int uploadThreads, long size
+ public static UploadStagingCache build(File dir, File home, int uploadThreads, long size
/* bytes */, StagingUploader uploader, @Nullable FileCache cache,
StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor,
@Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval /* secs */,
int retryInterval /* secs */) {
if (size > 0) {
- return new UploadStagingCache(dir, uploadThreads, size, uploader, cache,
+ return new UploadStagingCache(dir, home, uploadThreads, size, uploader, cache,
statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
}
return new UploadStagingCache() {
@@ -216,9 +220,13 @@ public class UploadStagingCache implemen
/**
* Retrieves all the files staged in the staging area and schedules them for uploads.
+ * @param home the home of the repo
+ * @param rootPath the parent of the cache
*/
- private void build() {
+ private void build(File home, File rootPath) {
LOG.info("Scheduling pending uploads");
+ // Move any older cache pending uploads
+ movePendingUploadsToStaging(home, rootPath, true);
Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
.filter(new Predicate<File>() {
@@ -487,6 +495,10 @@ public class UploadStagingCache implemen
new ExecutorCloser(scheduledExecutor).close();
}
+ protected void setDownloadCache(@Nullable FileCache downloadCache) {
+ this.downloadCache = downloadCache;
+ }
+
/**
* Class which calls remove on all
*/
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Thu Dec 1 08:42:02 2016
@@ -18,12 +18,17 @@
*/
package org.apache.jackrabbit.oak.plugins.blob;
+import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,6 +52,7 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
@@ -334,7 +340,7 @@ public class AbstractDataStoreCacheTest
return new ByteArrayInputStream(data);
}
- private static File getFile(String id, File root) {
+ protected static File getFile(String id, File root) {
File file = root;
file = new File(file, id.substring(0, 2));
file = new File(file, id.substring(2, 4));
@@ -345,4 +351,17 @@ public class AbstractDataStoreCacheTest
FileUtils.copyInputStreamToFile(stream, file);
return file;
}
+
+ static void serializeMap(Map<String,Long> pendingupload, File file) throws IOException {
+ OutputStream fos = new FileOutputStream(file);
+ OutputStream buffer = new BufferedOutputStream(fos);
+ ObjectOutput output = new ObjectOutputStream(buffer);
+ try {
+ output.writeObject(pendingupload);
+ output.flush();
+ } finally {
+ output.close();
+ IOUtils.closeQuietly(buffer);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Thu Dec 1 08:42:02 2016
@@ -81,8 +81,8 @@ public class CompositeDataStoreCacheTest
@Before
public void setup() throws IOException {
root = folder.newFolder();
- loader = new TestCacheLoader<String, InputStream>(root);
- uploader = new TestStagingUploader(root);
+ loader = new TestCacheLoader<String, InputStream>(folder.newFolder());
+ uploader = new TestStagingUploader(folder.newFolder());
// create executor
taskLatch = new CountDownLatch(1);
@@ -99,9 +99,10 @@ public class CompositeDataStoreCacheTest
closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS));
//cache instance
- cache = new CompositeDataStoreCache(root.getAbsolutePath(),
- 80 * 1024 /* bytes */, 10, 1/*threads*/, loader,
- uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
+ cache =
+ new CompositeDataStoreCache(root.getAbsolutePath(), null, 80 * 1024 /* bytes */, 10,
+ 1/*threads*/,
+ loader, uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
closer.register(cache);
}
@@ -112,8 +113,8 @@ public class CompositeDataStoreCacheTest
@Test
public void zeroCache() throws IOException {
- cache = new CompositeDataStoreCache(root.getAbsolutePath(),
- 0 /* bytes */, 10, 1/*threads*/, loader,
+ cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 0 /* bytes
+ */, 10, 1/*threads*/, loader,
uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
closer.register(cache);
@@ -188,8 +189,8 @@ public class CompositeDataStoreCacheTest
*/
@Test
public void addCacheFull() throws IOException {
- cache = new CompositeDataStoreCache(root.getAbsolutePath(),
- 40 * 1024 /* bytes */, 10 /* staging % */,
+ cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 40 * 1024 /*
+ bytes */, 10 /* staging % */,
1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
6000);
closer.register(cache);
@@ -224,8 +225,8 @@ public class CompositeDataStoreCacheTest
callbackLatch = new CountDownLatch(2);
afterExecuteLatch = new CountDownLatch(2);
executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
- cache = new CompositeDataStoreCache(root.getAbsolutePath(),
- 80 * 1024 /* bytes */, 10 /* staging % */,
+ cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 80 * 1024 /*
+ bytes */, 10 /* staging % */,
1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
6000);
closer.register(cache);
@@ -383,9 +384,11 @@ public class CompositeDataStoreCacheTest
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
loader.write(ID_PREFIX + 0, f);
+ assertTrue(f.exists());
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
loader.write(ID_PREFIX + 1, f2);
+ assertTrue(f2.exists());
CountDownLatch thread1Start = new CountDownLatch(1);
SettableFuture<File> future1 =
@@ -433,6 +436,7 @@ public class CompositeDataStoreCacheTest
// Add file to backend
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
loader.write(ID_PREFIX + 1, f2);
+ assertTrue(f2.exists());
// stage for upload
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java?rev=1772153&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java Thu Dec 1 08:42:02 2016
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils.DOWNLOAD_DIR;
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils.UPLOAD_STAGING_DIR;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link DataStoreCacheUpgradeUtils}
+ */
+public class DataStoreCacheUpgradeUtilsTest extends AbstractDataStoreCacheTest {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ File homeDir;
+ File path;
+ File pendingUploads;
+
+ @Before
+ public void setup() throws IOException {
+ homeDir = folder.getRoot();
+ path = folder.newFolder("repository", "datastore");
+ pendingUploads = new File(homeDir + "/" + DataStoreCacheUpgradeUtils.UPLOAD_MAP);
+ }
+
+ @Test
+ public void upgradeNoDownloads() throws Exception {
+ setupUploads("1111110", "2222220", "3333330");
+
+ DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true);
+
+ assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+ assertFalse(pendingUploads.exists());
+ }
+
+ @Test
+ public void upgradeNoDownloadsDelPendingFileFalse() throws Exception {
+ setupUploads("1111110", "2222220", "3333330");
+
+ DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, false);
+
+ assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+ assertTrue(pendingUploads.exists());
+ }
+
+ @Test
+ public void upgradeMoveDownloadsFalse() throws Exception {
+ setupUploads("1111110", "2222220", "3333330");
+ setupDownloads("4444440", "5555550", "6666660");
+
+ DataStoreCacheUpgradeUtils.upgrade(homeDir, path, false, true);
+
+ assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+ assertFalse(pendingUploads.exists());
+ assertFilesNoMove(DOWNLOAD_DIR, "4444440", "5555550", "6666660");
+ }
+
+ @Test
+ public void upgradeNoUploads() throws Exception {
+ setupDownloads("1111110", "2222220", "3333330");
+
+ DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true);
+
+ assertFiles(DOWNLOAD_DIR, "1111110", "2222220", "3333330");
+ }
+
+ @Test
+ public void upgradeNoUploadMap() throws Exception {
+ setupUploads("1111110", "2222220", "3333330");
+ FileUtils.deleteQuietly(pendingUploads);
+
+ DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true);
+
+ assertFiles(DOWNLOAD_DIR, "1111110", "2222220", "3333330");
+ assertFalse(pendingUploads.exists());
+ }
+
+ @Test
+ public void upgrade() throws Exception {
+ upgrade(true);
+ }
+
+ @Test
+ public void upgradeDelPendingFileFalse() throws Exception {
+ upgrade(false);
+ }
+
+ private void upgrade(boolean pendingFileDelete) throws Exception {
+ setupUploads("1111110", "2222220", "3333330");
+ setupDownloads("4444440", "5555550", "6666660");
+
+ DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, pendingFileDelete);
+
+ assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+ if (pendingFileDelete) {
+ assertFalse(pendingUploads.exists());
+ } else {
+ assertTrue(pendingUploads.exists());
+ }
+ assertFiles(DOWNLOAD_DIR, "4444440", "5555550", "6666660");
+ }
+
+ private void setupUploads(String... ids) throws IOException {
+ Map<String, Long> pendingMap = Maps.newHashMap();
+
+ for (String id : ids) {
+ File f1 = copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), getFile(id, path));
+ pendingMap.put(getFileName(id), System.currentTimeMillis());
+ }
+ serializeMap(pendingMap, pendingUploads);
+ }
+
+ private void setupDownloads(String... ids) throws IOException {
+ for (String id : ids) {
+ copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), getFile(id, path));
+ }
+ }
+
+ private void assertFiles(String moveFolder, String... ids) throws Exception {
+ for (String id : ids) {
+ File file = getFile(id, path);
+ assertFalse(file.exists());
+ file = getFile(id, new File(path, moveFolder));
+ assertTrue(file.exists());
+ assertTrue(Files.equal(file,
+ copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), folder.newFile())));
+ }
+ }
+
+ private void assertFilesNoMove(String moveFolder, String... ids) throws Exception {
+ for (String id : ids) {
+ File file = getFile(id, path);
+ assertTrue(file.exists());
+ assertTrue(Files.equal(file,
+ copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), folder.newFile())));
+ file = getFile(id, new File(path, moveFolder));
+ assertFalse(file.exists());
+ }
+ }
+
+ private static String getFileName(String name) {
+ return name.substring(0, 2) + "/" + name.substring(2, 4) + "/" + name;
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java Thu Dec 1 08:42:02 2016
@@ -64,26 +64,24 @@ public class FileCacheTest extends Abstr
@Rule
public TestName testName = new TestName();
+ CountDownLatch afterExecuteLatch;
@Before
public void setup() throws Exception {
root = folder.newFolder();
closer = Closer.create();
- loader = new TestCacheLoader<String, InputStream>(root);
+ loader = new TestCacheLoader<String, InputStream>(folder.newFolder());
- if (!testName.getMethodName().equals("rebuild")) {
-
- CountDownLatch beforeLatch = new CountDownLatch(1);
- CountDownLatch afterLatch = new CountDownLatch(1);
- CountDownLatch afterExecuteLatch = new CountDownLatch(1);
+ CountDownLatch beforeLatch = new CountDownLatch(1);
+ CountDownLatch afterLatch = new CountDownLatch(1);
+ afterExecuteLatch = new CountDownLatch(1);
- TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
- beforeLatch.countDown();
- afterLatch.countDown();
- cache = FileCache.build(4 * 1024/* KB */, root, loader, executor);
- Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
+ TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+ beforeLatch.countDown();
+ afterLatch.countDown();
+ cache = FileCache.build(4 * 1024/* KB */, root, loader, executor);
+ Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
- closer.register(cache);
- }
+ closer.register(cache);
}
@After
@@ -354,30 +352,53 @@ public class FileCacheTest extends Abstr
@Test
public void rebuild() throws Exception {
LOG.info("Started rebuild");
+ afterExecuteLatch.await();
+ LOG.info("Cache built");
+
+ File f = createFile(0, loader, cache, folder);
+ assertCache(0, cache, f);
+ cache.close();
- root = folder.newFolder();
CountDownLatch beforeLatch = new CountDownLatch(1);
CountDownLatch afterLatch = new CountDownLatch(1);
- CountDownLatch afterExecuteLatch = new CountDownLatch(1);
+ afterExecuteLatch = new CountDownLatch(1);
TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
beforeLatch.countDown();
afterLatch.countDown();
cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
-
+ closer.register(cache);
afterExecuteLatch.await();
Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
- LOG.info("Cache built");
+ LOG.info("Cache rebuilt");
+
+ assertCacheIfPresent(0, cache, f);
+ assertCacheStats(cache, 1, 4 * 1024, 0, 0);
+
+ LOG.info("Finished rebuild");
+ }
+
+ /**
+ * Trigger upgrade cache on start.
+ * @throws Exception
+ */
+ @Test
+ public void upgrade() throws Exception {
+ LOG.info("Started upgrade");
+
+ afterExecuteLatch.await();
File f = createFile(0, loader, cache, folder);
assertCache(0, cache, f);
cache.close();
- beforeLatch = new CountDownLatch(1);
- afterLatch = new CountDownLatch(1);
+ copyToFile(randomStream(1, 4 * 1024), getFile(ID_PREFIX + 1, root));
+
+ CountDownLatch beforeLatch = new CountDownLatch(1);
+ CountDownLatch afterLatch = new CountDownLatch(1);
afterExecuteLatch = new CountDownLatch(1);
- executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+ TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
beforeLatch.countDown();
afterLatch.countDown();
cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
@@ -387,9 +408,12 @@ public class FileCacheTest extends Abstr
LOG.info("Cache rebuilt");
assertCacheIfPresent(0, cache, f);
- assertCacheStats(cache, 1, 4 * 1024, 0, 0);
+ assertCacheIfPresent(1, cache, copyToFile(randomStream(1, 4 * 1024), folder.newFile()));
+ assertFalse(getFile(ID_PREFIX + 1, root).exists());
- LOG.info("Finished rebuild");
+ assertCacheStats(cache, 2, 8 * 1024, 0, 0);
+
+ LOG.info("Finished upgrade");
}
/**------------------------------ Helper methods --------------------------------------------**/
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Thu Dec 1 08:42:02 2016
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -34,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Futures;
@@ -86,11 +88,11 @@ public class UploadStagingCacheTest exte
init(0);
}
- private void init(int i) {
- init(i, new TestStagingUploader(root));
+ private void init(int i) throws IOException {
+ init(i, new TestStagingUploader(folder.newFolder()), null);
}
- private void init(int i, TestStagingUploader testUploader) {
+ private void init(int i, TestStagingUploader testUploader, File homeDir) {
// uploader
uploader = testUploader;
@@ -110,7 +112,7 @@ public class UploadStagingCacheTest exte
//cache instance
stagingCache =
- UploadStagingCache.build(root, 1/*threads*/, 8 * 1024 /* bytes */,
+ UploadStagingCache.build(root, homeDir, 1/*threads*/, 8 * 1024 /* bytes */,
uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
closer.register(stagingCache);
}
@@ -123,7 +125,7 @@ public class UploadStagingCacheTest exte
@Test
public void testZeroCache() throws IOException {
stagingCache =
- UploadStagingCache.build(root, 1/*threads*/, 0 /* bytes */,
+ UploadStagingCache.build(root, null, 1/*threads*/, 0 /* bytes */,
uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
closer.register(stagingCache);
@@ -160,7 +162,7 @@ public class UploadStagingCacheTest exte
@Test
public void testAddUploadException() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
- TestStagingUploader secondTimeUploader = new TestStagingUploader(root) {
+ TestStagingUploader secondTimeUploader = new TestStagingUploader(folder.newFolder()) {
@Override
public void write(String id, File f) throws DataStoreException {
if (count.get() == 0) {
@@ -171,7 +173,7 @@ public class UploadStagingCacheTest exte
};
// initialize staging cache using the mocked uploader
- init(2, secondTimeUploader);
+ init(2, secondTimeUploader, null);
// Add load
List<ListenableFuture<Integer>> futures = put(folder);
@@ -290,7 +292,7 @@ public class UploadStagingCacheTest exte
public void testCacheFullAdd() throws Exception {
// initialize cache to have restricted size
stagingCache =
- UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */,
+ UploadStagingCache.build(root, null, 1/*threads*/, 4 * 1024 /* bytes */,
uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
closer.register(stagingCache);
@@ -578,8 +580,58 @@ public class UploadStagingCacheTest exte
assertCacheStats(stagingCache, 0, 0, 3, 4);
}
+ /**
+ * Test upgrade with build on start.
+ * @throws Exception
+ */
+ @Test
+ public void testUpgrade() throws Exception {
+ // Add load
+ List<ListenableFuture<Integer>> futures = put(folder);
+ // Close before uploading finished
+ closer.close();
+
+ // Create pre-upgrade load
+ File home = folder.newFolder();
+ File pendingUploadsFile = new File(home, DataStoreCacheUpgradeUtils.UPLOAD_MAP);
+ createUpgradeLoad(home, pendingUploadsFile);
+
+ // Start again
+ init(1, new TestStagingUploader(folder.newFolder()), home);
+
+ taskLatch.countDown();
+ callbackLatch.countDown();
+ afterExecuteLatch.await();
+
+ waitFinish(futures);
+
+ assertNull(stagingCache.getIfPresent(ID_PREFIX + 0));
+ assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+ uploader.read(ID_PREFIX + 0)));
+
+ assertUpgrade(pendingUploadsFile);
+
+ assertCacheStats(stagingCache, 0, 0, 2, 2);
+ }
+
/** -------------------- Helper methods ----------------------------------------------------**/
+ private void createUpgradeLoad(File home, File pendingUploadFile) throws IOException {
+ String id = ID_PREFIX + 1;
+ copyToFile(randomStream(1, 4 * 1024), getFile(id, root));
+ String name = id.substring(0, 2) + "/" + id.substring(2, 4) + "/" + id;
+ Map<String, Long> pendingUploads = Maps.newHashMap();
+ pendingUploads.put(name, System.currentTimeMillis());
+ serializeMap(pendingUploads, pendingUploadFile);
+ }
+
+ private void assertUpgrade(File pendingUploadFile) throws IOException {
+ assertNull(stagingCache.getIfPresent(ID_PREFIX + 1));
+ assertTrue(Files.equal(copyToFile(randomStream(1, 4 * 1024), folder.newFile()),
+ uploader.read(ID_PREFIX + 1)));
+ assertFalse(pendingUploadFile.exists());
+ }
+
private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor,
final InputStream fStream, final File temp, final CountDownLatch start) {
final SettableFuture<File> future = SettableFuture.create();