You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2016/12/01 08:42:03 UTC

svn commit: r1772153 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/blob/ test/java/org/apache/jackrabbit/oak/plugins/blob/

Author: amitj
Date: Thu Dec  1 08:42:02 2016
New Revision: 1772153

URL: http://svn.apache.org/viewvc?rev=1772153&view=rev
Log:
OAK-5201: Support upgrade of DataStore cache

- Upgrade utility class exposes methods to move older cached files to newer cache areas.
- UploadStagingCache and FIleCache now upgrade the previous version of cache files if available to their respective  caches.

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/AbstractSharedCachingDataStore.java Thu Dec  1 08:42:02 2016
@@ -159,8 +159,10 @@ public abstract class AbstractSharedCach
         this.backend = createBackend();
         backend.init();
 
+        String home = FilenameUtils.normalizeNoEndSeparator(new File(homeDir).getAbsolutePath());
         this.cache =
-            new CompositeDataStoreCache(path, cacheSize, stagingSplitPercentage, uploadThreads,
+            new CompositeDataStoreCache(path, new File(home), cacheSize, stagingSplitPercentage,
+                uploadThreads,
                 new CacheLoader<String, InputStream>() {
                     @Override public InputStream load(String key) throws Exception {
                         InputStream is = null;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCache.java Thu Dec  1 08:42:02 2016
@@ -59,7 +59,7 @@ public class CompositeDataStoreCache ext
      */
     private final File directory;
 
-    public CompositeDataStoreCache(String path, long size, int uploadSplitPercentage,
+    public CompositeDataStoreCache(String path, File home, long size, int uploadSplitPercentage,
         int uploadThreads, CacheLoader<String, InputStream> loader, final StagingUploader uploader,
         StatisticsProvider statsProvider, ListeningExecutorService executor,
         ScheduledExecutorService scheduledExecutor /* purge scheduled executor */,
@@ -73,11 +73,11 @@ public class CompositeDataStoreCache ext
 
         long uploadSize = (size * uploadSplitPercentage) / 100;
 
-        this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null);
-
         this.stagingCache = UploadStagingCache
-            .build(directory, uploadThreads, uploadSize, uploader, downloadCache, statsProvider,
+            .build(directory, home, uploadThreads, uploadSize, uploader, null, statsProvider,
                 executor, scheduledExecutor, purgeInterval, stagingRetryInterval);
+        this.downloadCache = FileCache.build((size - uploadSize), directory, loader, null);
+        stagingCache.setDownloadCache(downloadCache);
     }
 
     @Nullable

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java?rev=1772153&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java Thu Dec  1 08:42:02 2016
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUtils.recursiveDelete;
+
+/**
+ * Utility methods to upgrade Old DataStore cache
+ * {@link org.apache.jackrabbit.core.data.CachingDataStore}.
+ */
+public class DataStoreCacheUpgradeUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(DataStoreCacheUpgradeUtils.class);
+
+    static final String UPLOAD_MAP = "async-pending-uploads.ser";
+    static final String UPLOAD_STAGING_DIR = UploadStagingCache.UPLOAD_STAGING_DIR;
+    static final String DOWNLOAD_DIR = FileCache.DOWNLOAD_DIR;
+
+    private static Map<String, Long> deSerializeUploadMap(File homeDir) {
+        Map<String, Long> asyncUploadMap = Maps.newHashMap();
+
+        File asyncUploadMapFile = new File(homeDir, UPLOAD_MAP);
+        if (asyncUploadMapFile.exists()) {
+            String path = asyncUploadMapFile.getAbsolutePath();
+
+            InputStream fis = null;
+            try {
+                fis = new BufferedInputStream(new FileInputStream(path));
+                ObjectInput input = new ObjectInputStream(fis);
+                asyncUploadMap = (Map<String, Long>) input.readObject();
+            } catch (Exception e) {
+                LOG.warn("Error in reading pending uploads map [{}] from location [{}]", UPLOAD_MAP,
+                    homeDir);
+            } finally {
+                IOUtils.closeQuietly(fis);
+            }
+            LOG.debug("AsyncUploadMap read [{}]", asyncUploadMap);
+        }
+        return asyncUploadMap;
+    }
+
+    private static void deleteSerializedUploadMap(File homeDir) {
+        File uploadMap = new File(homeDir, UPLOAD_MAP);
+        FileUtils.deleteQuietly(uploadMap);
+        LOG.info("Deleted asyncUploadMap [{}] from [{}]", UPLOAD_MAP, homeDir);
+    }
+
+    private static boolean notInExceptions(File file, List<String> exceptions) {
+        String parent = file.getParent();
+        for (String exception : exceptions) {
+            if (parent.contains(exception)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static void moveDownloadCache(final File path) {
+        final List<String> exceptions = ImmutableList.of("tmp", UPLOAD_STAGING_DIR, DOWNLOAD_DIR);
+        File newDownloadDir = new File(path, DOWNLOAD_DIR);
+
+        Iterator<File> iterator =
+            Files.fileTreeTraverser().postOrderTraversal(path)
+                .filter(new Predicate<File>() {
+                    @Override public boolean apply(File input) {
+                        return input.isFile()
+                            && !input.getParentFile().equals(path)
+                            && !notInExceptions(input, exceptions);
+                    }
+                }).iterator();
+
+        while (iterator.hasNext()) {
+            File download = iterator.next();
+            LOG.trace("Download cache file absolute pre-upgrade path " + download);
+
+            File newDownload = new File(newDownloadDir,
+                download.getAbsolutePath().substring(path.getAbsolutePath().length()));
+            newDownload.getParentFile().mkdirs();
+            LOG.trace("Downloaded cache file absolute post-upgrade path " + newDownload);
+
+            try {
+                FileUtils.moveFile(download, newDownload);
+                LOG.info("Download cache file [{}] moved to [{}]", download, newDownload);
+                recursiveDelete(download, path);
+            } catch (Exception e) {
+                LOG.warn("Unable to move download cache file [{}] to [{}]", download, newDownload);
+            }
+        }
+    }
+
+    public static void movePendingUploadsToStaging(File homeDir, File path, boolean deleteMap) {
+        File newUploadDir = new File(path, UPLOAD_STAGING_DIR);
+
+        Map<String, Long> pendingUploads = deSerializeUploadMap(homeDir);
+        Iterator<String> pendingFileIter = pendingUploads.keySet().iterator();
+
+        while(pendingFileIter.hasNext()) {
+            String file = pendingFileIter.next();
+            File upload = new File(path, file);
+            LOG.trace("Pending upload absolute path " + upload.getAbsolutePath());
+
+            File newUpload = new File(newUploadDir, file);
+            LOG.trace("Pending upload upgrade absolute path " + newUpload.getAbsolutePath());
+
+            newUpload.getParentFile().mkdirs();
+
+            if (upload.exists()) {
+                LOG.trace(upload + " File exists");
+                try {
+                    FileUtils.moveFile(upload, newUpload);
+                    LOG.info("Pending upload file [{}] moved to [{}]", upload, newUpload);
+                    recursiveDelete(upload, path);
+                } catch (Exception e) {
+                    LOG.warn("Unable to move pending upload file [{}] to [{}]", upload, newUpload);
+                }
+            } else {
+                LOG.warn("File [{}] does not exist", upload);
+            }
+        }
+
+        if (deleteMap) {
+            deleteSerializedUploadMap(homeDir);
+        }
+    }
+
+    public static void upgrade(File homeDir, File path, boolean moveCache, boolean deleteMap) {
+        movePendingUploadsToStaging(homeDir, path, deleteMap);
+
+        if (moveCache) {
+            moveDownloadCache(path);
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/FileCache.java Thu Dec  1 08:42:02 2016
@@ -62,6 +62,13 @@ public class FileCache extends AbstractC
      */
     private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
 
+    protected static final String DOWNLOAD_DIR = "download";
+
+    /**
+     * Parent of the cache root directory
+     */
+    private File parent;
+
     /**
      * The cacheRoot directory of the cache.
      */
@@ -91,7 +98,8 @@ public class FileCache extends AbstractC
     private FileCache(long maxSize /* bytes */, File root,
         final CacheLoader<String, InputStream> loader, @Nullable final ExecutorService executor) {
 
-        this.cacheRoot = new File(root, "download");
+        this.parent = root;
+        this.cacheRoot = new File(root, DOWNLOAD_DIR);
 
         /* convert to 4 KB block */
         long size = Math.round(maxSize / (1024L * 4));
@@ -286,6 +294,10 @@ public class FileCache extends AbstractC
      */
     private int build() {
         int count = 0;
+
+        // Move older generation cache downloaded files to the new folder
+        DataStoreCacheUpgradeUtils.moveDownloadCache(parent);
+
         // Iterate over all files in the cache folder
         Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(cacheRoot)
             .filter(new Predicate<File>() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java Thu Dec  1 08:42:02 2016
@@ -63,6 +63,8 @@ import org.slf4j.LoggerFactory;
 import static com.google.common.base.Objects.toStringHelper;
 import static java.lang.String.format;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils
+    .movePendingUploadsToStaging;
 
 /**
  * Cache for staging async uploads. This serves as a temporary cache for serving local
@@ -78,6 +80,8 @@ public class UploadStagingCache implemen
      */
     private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
 
+    protected static final String UPLOAD_STAGING_DIR = "upload";
+
     //Rough estimate of the in-memory key, value pair
     private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
         @Override public int weigh(String key, File value) {
@@ -141,7 +145,7 @@ public class UploadStagingCache implemen
      */
     private LinkedBlockingQueue<String> retryQueue;
 
-    private UploadStagingCache(File dir, int uploadThreads, long size /* bytes */,
+    private UploadStagingCache(File dir, File home, int uploadThreads, long size /* bytes */,
         StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider,
         @Nullable ListeningExecutorService executor,
         @Nullable ScheduledExecutorService scheduledExecutor,
@@ -169,7 +173,7 @@ public class UploadStagingCache implemen
         this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
         this.downloadCache = cache;
 
-        build();
+        build(home, dir);
 
         this.scheduledExecutor
             .scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
@@ -180,13 +184,13 @@ public class UploadStagingCache implemen
     private UploadStagingCache() {
     }
 
-    public static UploadStagingCache build(File dir, int uploadThreads, long size
+    public static UploadStagingCache build(File dir, File home, int uploadThreads, long size
         /* bytes */, StagingUploader uploader, @Nullable FileCache cache,
         StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor,
         @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval /* secs */,
         int retryInterval /* secs */) {
         if (size > 0) {
-            return new UploadStagingCache(dir, uploadThreads, size, uploader, cache,
+            return new UploadStagingCache(dir, home, uploadThreads, size, uploader, cache,
                 statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
         }
         return new UploadStagingCache() {
@@ -216,9 +220,13 @@ public class UploadStagingCache implemen
 
     /**
      * Retrieves all the files staged in the staging area and schedules them for uploads.
+     * @param home the home of the repo
+     * @param rootPath the parent of the cache
      */
-    private void build() {
+    private void build(File home, File rootPath) {
         LOG.info("Scheduling pending uploads");
+        // Move any older cache pending uploads
+        movePendingUploadsToStaging(home, rootPath, true);
 
         Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
             .filter(new Predicate<File>() {
@@ -487,6 +495,10 @@ public class UploadStagingCache implemen
         new ExecutorCloser(scheduledExecutor).close();
     }
 
+    protected void setDownloadCache(@Nullable FileCache downloadCache) {
+        this.downloadCache = downloadCache;
+    }
+
     /**
      * Class which calls remove on all
      */

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/AbstractDataStoreCacheTest.java Thu Dec  1 08:42:02 2016
@@ -18,12 +18,17 @@
  */
 package org.apache.jackrabbit.oak.plugins.blob;
 
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +52,7 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.core.data.DataIdentifier;
 import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
@@ -334,7 +340,7 @@ public class AbstractDataStoreCacheTest
         return new ByteArrayInputStream(data);
     }
 
-    private static File getFile(String id, File root) {
+    protected static File getFile(String id, File root) {
         File file = root;
         file = new File(file, id.substring(0, 2));
         file = new File(file, id.substring(2, 4));
@@ -345,4 +351,17 @@ public class AbstractDataStoreCacheTest
         FileUtils.copyInputStreamToFile(stream, file);
         return file;
     }
+
+    static void serializeMap(Map<String,Long> pendingupload, File file) throws IOException {
+        OutputStream fos = new FileOutputStream(file);
+        OutputStream buffer = new BufferedOutputStream(fos);
+        ObjectOutput output = new ObjectOutputStream(buffer);
+        try {
+            output.writeObject(pendingupload);
+            output.flush();
+        } finally {
+            output.close();
+            IOUtils.closeQuietly(buffer);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/CompositeDataStoreCacheTest.java Thu Dec  1 08:42:02 2016
@@ -81,8 +81,8 @@ public class CompositeDataStoreCacheTest
     @Before
     public void setup() throws IOException {
         root = folder.newFolder();
-        loader = new TestCacheLoader<String, InputStream>(root);
-        uploader = new TestStagingUploader(root);
+        loader = new TestCacheLoader<String, InputStream>(folder.newFolder());
+        uploader = new TestStagingUploader(folder.newFolder());
 
         // create executor
         taskLatch = new CountDownLatch(1);
@@ -99,9 +99,10 @@ public class CompositeDataStoreCacheTest
         closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS));
 
         //cache instance
-        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
-            80 * 1024 /* bytes */, 10, 1/*threads*/, loader,
-            uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
+        cache =
+            new CompositeDataStoreCache(root.getAbsolutePath(), null, 80 * 1024 /* bytes */, 10,
+                1/*threads*/,
+                loader, uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
         closer.register(cache);
     }
 
@@ -112,8 +113,8 @@ public class CompositeDataStoreCacheTest
 
     @Test
     public void zeroCache() throws IOException {
-        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
-            0 /* bytes */, 10, 1/*threads*/, loader,
+        cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 0 /* bytes
+        */, 10, 1/*threads*/, loader,
             uploader, statsProvider, executor, scheduledExecutor, 3000, 6000);
         closer.register(cache);
 
@@ -188,8 +189,8 @@ public class CompositeDataStoreCacheTest
      */
     @Test
     public void addCacheFull() throws IOException {
-        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
-            40 * 1024 /* bytes */, 10 /* staging % */,
+        cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 40 * 1024 /*
+        bytes */, 10 /* staging % */,
             1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
             6000);
         closer.register(cache);
@@ -224,8 +225,8 @@ public class CompositeDataStoreCacheTest
         callbackLatch = new CountDownLatch(2);
         afterExecuteLatch = new CountDownLatch(2);
         executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch);
-        cache = new CompositeDataStoreCache(root.getAbsolutePath(),
-            80 * 1024 /* bytes */, 10 /* staging % */,
+        cache = new CompositeDataStoreCache(root.getAbsolutePath(), null, 80 * 1024 /*
+        bytes */, 10 /* staging % */,
             1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000,
             6000);
         closer.register(cache);
@@ -383,9 +384,11 @@ public class CompositeDataStoreCacheTest
 
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());
         loader.write(ID_PREFIX + 0, f);
+        assertTrue(f.exists());
 
         File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
         loader.write(ID_PREFIX + 1, f2);
+        assertTrue(f2.exists());
 
         CountDownLatch thread1Start = new CountDownLatch(1);
         SettableFuture<File> future1 =
@@ -433,6 +436,7 @@ public class CompositeDataStoreCacheTest
         // Add file to backend
         File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile());
         loader.write(ID_PREFIX + 1, f2);
+        assertTrue(f2.exists());
 
         // stage for upload
         File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile());

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java?rev=1772153&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java Thu Dec  1 08:42:02 2016
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils.DOWNLOAD_DIR;
+import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils.UPLOAD_STAGING_DIR;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link DataStoreCacheUpgradeUtils}
+ */
+public class DataStoreCacheUpgradeUtilsTest extends AbstractDataStoreCacheTest {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    File homeDir;
+    File path;
+    File pendingUploads;
+
+    @Before
+    public void setup() throws IOException {
+        homeDir = folder.getRoot();
+        path = folder.newFolder("repository", "datastore");
+        pendingUploads = new File(homeDir + "/" + DataStoreCacheUpgradeUtils.UPLOAD_MAP);
+    }
+
+    @Test
+    public void upgradeNoDownloads() throws Exception {
+        setupUploads("1111110", "2222220", "3333330");
+
+        DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true);
+
+        assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+        assertFalse(pendingUploads.exists());
+    }
+
+    @Test
+    public void upgradeNoDownloadsDelPendingFileFalse() throws Exception {
+        setupUploads("1111110", "2222220", "3333330");
+
+        DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, false);
+
+        assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+        assertTrue(pendingUploads.exists());
+    }
+
+    @Test
+    public void upgradeMoveDownloadsFalse() throws Exception {
+        setupUploads("1111110", "2222220", "3333330");
+        setupDownloads("4444440", "5555550", "6666660");
+
+        DataStoreCacheUpgradeUtils.upgrade(homeDir, path, false, true);
+
+        assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+        assertFalse(pendingUploads.exists());
+        assertFilesNoMove(DOWNLOAD_DIR, "4444440", "5555550", "6666660");
+    }
+
+    @Test
+    public void upgradeNoUploads() throws Exception {
+        setupDownloads("1111110", "2222220", "3333330");
+
+        DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true);
+
+        assertFiles(DOWNLOAD_DIR, "1111110", "2222220", "3333330");
+    }
+
+    @Test
+    public void upgradeNoUploadMap() throws Exception {
+        setupUploads("1111110", "2222220", "3333330");
+        FileUtils.deleteQuietly(pendingUploads);
+
+        DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, true);
+
+        assertFiles(DOWNLOAD_DIR, "1111110", "2222220", "3333330");
+        assertFalse(pendingUploads.exists());
+    }
+
+    @Test
+    public void upgrade() throws Exception {
+        upgrade(true);
+    }
+
+    @Test
+    public void upgradeDelPendingFileFalse() throws Exception {
+        upgrade(false);
+    }
+
+    private void upgrade(boolean pendingFileDelete) throws Exception {
+        setupUploads("1111110", "2222220", "3333330");
+        setupDownloads("4444440", "5555550", "6666660");
+
+        DataStoreCacheUpgradeUtils.upgrade(homeDir, path, true, pendingFileDelete);
+
+        assertFiles(UPLOAD_STAGING_DIR, "1111110", "2222220", "3333330");
+        if (pendingFileDelete) {
+            assertFalse(pendingUploads.exists());
+        } else {
+            assertTrue(pendingUploads.exists());
+        }
+        assertFiles(DOWNLOAD_DIR, "4444440", "5555550", "6666660");
+    }
+
+    private void setupUploads(String... ids) throws IOException {
+        Map<String, Long> pendingMap = Maps.newHashMap();
+
+        for (String id : ids) {
+            File f1 = copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), getFile(id, path));
+            pendingMap.put(getFileName(id), System.currentTimeMillis());
+        }
+        serializeMap(pendingMap, pendingUploads);
+    }
+
+    private void setupDownloads(String... ids) throws IOException {
+        for (String id : ids) {
+            copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), getFile(id, path));
+        }
+    }
+
+    private void assertFiles(String moveFolder, String... ids) throws Exception {
+        for (String id : ids) {
+            File file = getFile(id, path);
+            assertFalse(file.exists());
+            file = getFile(id, new File(path, moveFolder));
+            assertTrue(file.exists());
+            assertTrue(Files.equal(file,
+                copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), folder.newFile())));
+        }
+    }
+
+    private void assertFilesNoMove(String moveFolder, String... ids) throws Exception {
+        for (String id : ids) {
+            File file = getFile(id, path);
+            assertTrue(file.exists());
+            assertTrue(Files.equal(file,
+                copyToFile(randomStream(Integer.parseInt(id), 4 * 1024), folder.newFile())));
+            file = getFile(id, new File(path, moveFolder));
+            assertFalse(file.exists());
+        }
+    }
+
+    private static String getFileName(String name) {
+        return name.substring(0, 2) + "/" + name.substring(2, 4) + "/" + name;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/DataStoreCacheUpgradeUtilsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/FileCacheTest.java Thu Dec  1 08:42:02 2016
@@ -64,26 +64,24 @@ public class FileCacheTest extends Abstr
     @Rule
     public TestName testName = new TestName();
 
+    CountDownLatch afterExecuteLatch;
     @Before
     public void setup() throws Exception {
         root = folder.newFolder();
         closer = Closer.create();
-        loader = new TestCacheLoader<String, InputStream>(root);
+        loader = new TestCacheLoader<String, InputStream>(folder.newFolder());
 
-        if (!testName.getMethodName().equals("rebuild")) {
-
-            CountDownLatch beforeLatch = new CountDownLatch(1);
-            CountDownLatch afterLatch = new CountDownLatch(1);
-            CountDownLatch afterExecuteLatch = new CountDownLatch(1);
+        CountDownLatch beforeLatch = new CountDownLatch(1);
+        CountDownLatch afterLatch = new CountDownLatch(1);
+        afterExecuteLatch = new CountDownLatch(1);
 
-            TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
-            beforeLatch.countDown();
-            afterLatch.countDown();
-            cache = FileCache.build(4 * 1024/* KB */, root, loader, executor);
-            Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
+        TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+        beforeLatch.countDown();
+        afterLatch.countDown();
+        cache = FileCache.build(4 * 1024/* KB */, root, loader, executor);
+        Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
 
-            closer.register(cache);
-        }
+        closer.register(cache);
     }
 
     @After
@@ -354,30 +352,53 @@ public class FileCacheTest extends Abstr
     @Test
     public void rebuild() throws Exception {
         LOG.info("Started rebuild");
+        afterExecuteLatch.await();
+        LOG.info("Cache built");
+
+        File f = createFile(0, loader, cache, folder);
+        assertCache(0, cache, f);
+        cache.close();
 
-        root = folder.newFolder();
         CountDownLatch beforeLatch = new CountDownLatch(1);
         CountDownLatch afterLatch = new CountDownLatch(1);
-        CountDownLatch afterExecuteLatch = new CountDownLatch(1);
+        afterExecuteLatch = new CountDownLatch(1);
 
         TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
         beforeLatch.countDown();
         afterLatch.countDown();
         cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
-
+        closer.register(cache);
         afterExecuteLatch.await();
         Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get();
-        LOG.info("Cache built");
+        LOG.info("Cache rebuilt");
+
+        assertCacheIfPresent(0, cache, f);
+        assertCacheStats(cache, 1, 4 * 1024, 0, 0);
+
+        LOG.info("Finished rebuild");
+    }
+
+    /**
+     * Trigger upgrade cache on start.
+     * @throws Exception
+     */
+    @Test
+    public void upgrade() throws Exception {
+        LOG.info("Started upgrade");
+
+        afterExecuteLatch.await();
 
         File f = createFile(0, loader, cache, folder);
         assertCache(0, cache, f);
         cache.close();
 
-        beforeLatch = new CountDownLatch(1);
-        afterLatch = new CountDownLatch(1);
+        copyToFile(randomStream(1, 4 * 1024), getFile(ID_PREFIX + 1, root));
+
+        CountDownLatch beforeLatch = new CountDownLatch(1);
+        CountDownLatch afterLatch = new CountDownLatch(1);
         afterExecuteLatch = new CountDownLatch(1);
 
-        executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
+        TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch);
         beforeLatch.countDown();
         afterLatch.countDown();
         cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor);
@@ -387,9 +408,12 @@ public class FileCacheTest extends Abstr
         LOG.info("Cache rebuilt");
 
         assertCacheIfPresent(0, cache, f);
-        assertCacheStats(cache, 1, 4 * 1024, 0, 0);
+        assertCacheIfPresent(1, cache, copyToFile(randomStream(1, 4 * 1024), folder.newFile()));
+        assertFalse(getFile(ID_PREFIX + 1, root).exists());
 
-        LOG.info("Finished rebuild");
+        assertCacheStats(cache, 2, 8 * 1024, 0, 0);
+
+        LOG.info("Finished upgrade");
     }
 
     /**------------------------------ Helper methods --------------------------------------------**/

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java?rev=1772153&r1=1772152&r2=1772153&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCacheTest.java Thu Dec  1 08:42:02 2016
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -34,6 +35,7 @@ import java.util.concurrent.atomic.Atomi
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.Futures;
@@ -86,11 +88,11 @@ public class UploadStagingCacheTest exte
         init(0);
     }
 
-    private void init(int i) {
-        init(i, new TestStagingUploader(root));
+    private void init(int i) throws IOException {
+        init(i, new TestStagingUploader(folder.newFolder()), null);
     }
 
-    private void init(int i, TestStagingUploader testUploader) {
+    private void init(int i, TestStagingUploader testUploader, File homeDir) {
         // uploader
         uploader = testUploader;
 
@@ -110,7 +112,7 @@ public class UploadStagingCacheTest exte
 
         //cache instance
         stagingCache =
-            UploadStagingCache.build(root, 1/*threads*/, 8 * 1024 /* bytes */,
+            UploadStagingCache.build(root, homeDir, 1/*threads*/, 8 * 1024 /* bytes */,
                 uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
         closer.register(stagingCache);
     }
@@ -123,7 +125,7 @@ public class UploadStagingCacheTest exte
     @Test
     public void testZeroCache() throws IOException {
         stagingCache =
-            UploadStagingCache.build(root, 1/*threads*/, 0 /* bytes */,
+            UploadStagingCache.build(root, null, 1/*threads*/, 0 /* bytes */,
                 uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
         closer.register(stagingCache);
 
@@ -160,7 +162,7 @@ public class UploadStagingCacheTest exte
     @Test
     public void testAddUploadException() throws Exception {
         final AtomicInteger count = new AtomicInteger(0);
-        TestStagingUploader secondTimeUploader = new TestStagingUploader(root) {
+        TestStagingUploader secondTimeUploader = new TestStagingUploader(folder.newFolder()) {
             @Override
             public void write(String id, File f) throws DataStoreException {
                 if (count.get() == 0) {
@@ -171,7 +173,7 @@ public class UploadStagingCacheTest exte
         };
 
         // initialize staging cache using the mocked uploader
-        init(2, secondTimeUploader);
+        init(2, secondTimeUploader, null);
 
         // Add load
         List<ListenableFuture<Integer>> futures = put(folder);
@@ -290,7 +292,7 @@ public class UploadStagingCacheTest exte
     public void testCacheFullAdd() throws Exception {
         // initialize cache to have restricted size
         stagingCache =
-            UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */,
+            UploadStagingCache.build(root, null, 1/*threads*/, 4 * 1024 /* bytes */,
                 uploader, null/*cache*/, statsProvider, executor, null, 3000, 6000);
         closer.register(stagingCache);
 
@@ -578,8 +580,58 @@ public class UploadStagingCacheTest exte
         assertCacheStats(stagingCache, 0, 0, 3, 4);
     }
 
+    /**
+     * Test upgrade with build on start.
+     * @throws Exception
+     */
+    @Test
+    public void testUpgrade() throws Exception {
+        // Add load
+        List<ListenableFuture<Integer>> futures = put(folder);
+        // Close before uploading finished
+        closer.close();
+
+        // Create pre-upgrade load
+        File home = folder.newFolder();
+        File pendingUploadsFile = new File(home, DataStoreCacheUpgradeUtils.UPLOAD_MAP);
+        createUpgradeLoad(home, pendingUploadsFile);
+
+        // Start again
+        init(1, new TestStagingUploader(folder.newFolder()), home);
+
+        taskLatch.countDown();
+        callbackLatch.countDown();
+        afterExecuteLatch.await();
+
+        waitFinish(futures);
+
+        assertNull(stagingCache.getIfPresent(ID_PREFIX + 0));
+        assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()),
+            uploader.read(ID_PREFIX + 0)));
+
+        assertUpgrade(pendingUploadsFile);
+
+        assertCacheStats(stagingCache, 0, 0, 2, 2);
+    }
+
     /** -------------------- Helper methods ----------------------------------------------------**/
 
+    private void createUpgradeLoad(File home, File pendingUploadFile) throws IOException {
+        String id = ID_PREFIX + 1;
+        copyToFile(randomStream(1, 4 * 1024), getFile(id, root));
+        String name = id.substring(0, 2) + "/" + id.substring(2, 4) + "/" + id;
+        Map<String, Long> pendingUploads = Maps.newHashMap();
+        pendingUploads.put(name, System.currentTimeMillis());
+        serializeMap(pendingUploads, pendingUploadFile);
+    }
+
+    private void assertUpgrade(File pendingUploadFile) throws IOException {
+        assertNull(stagingCache.getIfPresent(ID_PREFIX + 1));
+        assertTrue(Files.equal(copyToFile(randomStream(1, 4 * 1024), folder.newFile()),
+            uploader.read(ID_PREFIX + 1)));
+        assertFalse(pendingUploadFile.exists());
+    }
+
     private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor,
         final InputStream fStream, final File temp, final CountDownLatch start) {
         final SettableFuture<File> future = SettableFuture.create();