You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2019/02/08 13:41:35 UTC

svn commit: r1853216 - in /jackrabbit/oak/branches/1.8: ./ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ oak-lucene/src/test/java/org/apache/j...

Author: catholicon
Date: Fri Feb  8 13:41:35 2019
New Revision: 1853216

URL: http://svn.apache.org/viewvc?rev=1853216&view=rev
Log:
OAK-7246: Improve cleanup of locally copied index files (backport r1836548, r1840769 from trunk)
OAK-7751: CopyOnReadDirectory#removeDeletedFiles asks IndexCopier to check timestamp for (remote only) segments.gen leading to failure to clean up local files

OAK-7246: Improve cleanup of locally copied index files
Implement what was discussed except for s/creation time/modified time/

Update IndexCopierTest to use FSDirectory for CoR and CoW and also added
synthetic update of last modified timestamp delayes to simulate reality
without really sleep (this test would still pass with old impl... except
for constants of course)

Added IndexCopierCleanupTest to test new logic.


Added:
    jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
      - copied, changed from r1836548, jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java
Modified:
    jackrabbit/oak/branches/1.8/   (props changed)
    jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
    jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
    jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
    jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Propchange: jackrabbit/oak/branches/1.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb  8 13:41:35 2019
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1820660-1820661,1820729,1820734,1820859,1820861,1820878,1820888,1820947,1821027,1821130,1821140-1821141,1821178,1821237,1821240,1821249,1821258,1821325,1821358,1821361-1821362,1821370,1821375,1821393,1821477,1821487,1821516,1821617,1821663,1821665,1821668,1821681,1821847,1821975-1821983,1822121,1822201,1822207,1822527,1822642,1822723,1822808,1822850,1822934,1823135,1823163,1823169,1823172,1823655,1823669,1824196,1824198,1824253,1824255,1824896,1824962,1825065,1825362,1825381,1825442,1825448,1825466,1825470-1825471,1825475,1825523,1825525,1825561,1825619-1825621,1825651,1825654,1825992,1826079,1826090,1826096,1826216,1826237,1826338,1826516,1826532,1826551,1826560,1826638,1826640,1826730,1826833,1826932,1826957,1827423,1827472,1827486,1827816,1827977,1828349,1828439,1828502,1828529,1828948,1829527,1829534,1829546,1829569,1829587,1829665,1829854,1829864,1829978,1829985,1829987,1829998,1830019,1830048,1830160,1830171,1830197,1830209,1830239,1830347,1830748,1830911
 ,1830923,1831157-1831158,1831163,1831190,1831374,1831560,1831689,1832258,1832376,1832379,1832535,1833308,1833347,1833833,1834112,1834117,1834287,1834291,1834302,1834326,1834328,1834336,1834428,1834468,1834483,1834610,1834648-1834649,1834681,1834823,1834857-1834858,1835060,1835518,1835521,1835635,1835642,1835780,1835819,1836082,1836121,1836167-1836168,1836170-1836187,1836189-1836196,1836206,1836487,1836493,1837057,1837274,1837296,1837326,1837475,1837503,1837547,1837569,1837600,1837657,1837718,1837998,1838076,1838637,1839549,1839570,1839637,1839746,1840019,1840024,1840031,1840226,1840455,1840462,1840574,1841314,1841352,1842089,1842677,1843175,1843222,1843231,1843398,1843618,1843652,1843911,1844325,1844549,1844625,1844627,1844642,1844728,1844775,1844932,1845135,1845336,1845405,1845415,1845730-1845731,1845863,1845865,1846057,1846617,1848073,1848181-1848182,1848191,1848217,1848822-1848823,1850837,1851533-1851535,1851619,1852451,1852492
+/jackrabbit/oak/trunk:1820660-1820661,1820729,1820734,1820859,1820861,1820878,1820888,1820947,1821027,1821130,1821140-1821141,1821178,1821237,1821240,1821249,1821258,1821325,1821358,1821361-1821362,1821370,1821375,1821393,1821477,1821487,1821516,1821617,1821663,1821665,1821668,1821681,1821847,1821975-1821983,1822121,1822201,1822207,1822527,1822642,1822723,1822808,1822850,1822934,1823135,1823163,1823169,1823172,1823655,1823669,1824196,1824198,1824253,1824255,1824896,1824962,1825065,1825362,1825381,1825442,1825448,1825466,1825470-1825471,1825475,1825523,1825525,1825561,1825619-1825621,1825651,1825654,1825992,1826079,1826090,1826096,1826216,1826237,1826338,1826516,1826532,1826551,1826560,1826638,1826640,1826730,1826833,1826932,1826957,1827423,1827472,1827486,1827816,1827977,1828349,1828439,1828502,1828529,1828948,1829527,1829534,1829546,1829569,1829587,1829665,1829854,1829864,1829978,1829985,1829987,1829998,1830019,1830048,1830160,1830171,1830197,1830209,1830239,1830347,1830748,1830911
 ,1830923,1831157-1831158,1831163,1831190,1831374,1831560,1831689,1832258,1832376,1832379,1832535,1833308,1833347,1833833,1834112,1834117,1834287,1834291,1834302,1834326,1834328,1834336,1834428,1834468,1834483,1834610,1834648-1834649,1834681,1834823,1834857-1834858,1835060,1835518,1835521,1835635,1835642,1835780,1835819,1836082,1836121,1836167-1836168,1836170-1836187,1836189-1836196,1836206,1836487,1836493,1836548,1837057,1837274,1837296,1837326,1837475,1837503,1837547,1837569,1837600,1837657,1837718,1837998,1838076,1838637,1839549,1839570,1839637,1839746,1840019,1840024,1840031,1840226,1840455,1840462,1840574,1840769,1841314,1841352,1842089,1842677,1843175,1843222,1843231,1843398,1843618,1843652,1843911,1844325,1844549,1844625,1844627,1844642,1844728,1844775,1844932,1845135,1845336,1845405,1845415,1845730-1845731,1845863,1845865,1846057,1846617,1848073,1848181-1848182,1848191,1848217,1848822-1848823,1850837,1851533-1851535,1851619,1852451,1852492
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1853216&r1=1853215&r2=1853216&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Fri Feb  8 13:41:35 2019
@@ -70,7 +70,7 @@ public class IndexCopier implements Copy
     private static final int MAX_FAILURE_ENTRIES = 10000;
     private static final String WORK_DIR_NAME = "indexWriterDir";
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final Logger log = LoggerFactory.getLogger(IndexCopier.class);
     private final Executor executor;
     private final File indexWorkDir;
 
@@ -95,7 +95,6 @@ public class IndexCopier implements Copy
     private final AtomicLong uploadTime = new AtomicLong();
 
 
-    private final Map<String, Set<String>> sharedWorkingSetMap = newHashMap();
     private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
     private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
@@ -174,18 +173,6 @@ public class IndexCopier implements Copy
         return indexRootDirectory.getIndexDir(definition, indexPath, dirName);
     }
 
-    public void addIndexFileBeingWritten(String indexPath, String name) {
-        getSharedWorkingSet(indexPath).add(name);
-    }
-
-    public void clearIndexFilesBeingWritten(String indexPath) {
-        getSharedWorkingSet(indexPath).clear();
-    }
-
-    public Set<String> getIndexFilesBeingWritten(String indexPath) {
-        return getSharedWorkingSet(indexPath);
-    }
-
     Map<String, LocalIndexFile> getFailedToDeleteFiles() {
         return Collections.unmodifiableMap(failedToDeleteFiles);
     }
@@ -216,26 +203,6 @@ public class IndexCopier implements Copy
         }
     }
 
-    /**
-     * Provide the corresponding shared state to enable COW inform COR
-     * about new files it is creating while indexing. This would allow COR to ignore
-     * such files while determining the deletion candidates.
-     *
-     * @param defn index definition for which the directory is being created
-     * @return a set to maintain the state of new files being created by the COW Directory
-     */
-    private Set<String> getSharedWorkingSet(String indexPath){
-        Set<String> sharedSet;
-        synchronized (sharedWorkingSetMap){
-            sharedSet = sharedWorkingSetMap.get(indexPath);
-            if (sharedSet == null){
-                sharedSet = Sets.newConcurrentHashSet();
-                sharedWorkingSetMap.put(indexPath, sharedSet);
-            }
-        }
-        return sharedSet;
-    }
-
     private void checkIntegrity(String indexPath, Directory local, Directory remote) throws IOException {
         if (validatedIndexPaths.contains(indexPath)){
             return;
@@ -291,6 +258,77 @@ public class IndexCopier implements Copy
         return successFullyDeleted;
     }
 
+    /**
+     * This method would return the latest modification timestamp from the set of file{@code names}
+     * on the file system.
+     * The parameter {@code localDir} is expected to be an instance of {@link FSDirectory} (or wrapped one in
+     * {@link FilterDirectory}. If this assumption doesn't hold, the method would return -1.
+     * Each of file names are expected to be existing in {@code localDir}. If this fails the method shall return -1.
+     * In case of any error while computing modified timestamps on the file system, the method shall return -1.
+     * @param names file names to evaluate on local FS
+     * @param localDir {@link Directory} implementation to be used to get the files
+     * @return latest timestamp or -1 (with logs) in case of any doubt
+     */
+    public static long getNewestLocalFSTimestampFor(Set<String> names, Directory localDir) {
+        File localFSDir = LocalIndexFile.getFSDir(localDir);
+
+        if (localFSDir == null) {
+            log.warn("Couldn't get FSDirectory instance for {}.", localDir);
+            return -1;
+        }
+
+        long maxTS = 0L;
+        for (String  name : names) {
+            File f = new File(localFSDir, name);
+
+            if (!f.exists()) {
+                log.warn("File {} doesn't exist in {}", name, localFSDir);
+                return -1;
+            }
+
+            long modTS = f.lastModified();
+            if (modTS == 0L) {
+                log.warn("Couldn't get lastModification timestamp for {} in {}", name, localFSDir);
+                return -1;
+            }
+
+            if (modTS > maxTS) {
+                maxTS  = modTS;
+            }
+        }
+
+        return maxTS;
+    }
+
+    /**
+     * @param name file name to evaluate on local FS
+     * @param localDir {@link Directory} implementation to be used to get the file
+     * @param millis timestamp to compare file's modified timestamp against
+     * @return {@code true} if file referred to be {@code name} is modified before {@code millis}; false otherwise
+     */
+    public static boolean isFileModifiedBefore(String name, Directory localDir, long millis) {
+        File localFSDir = LocalIndexFile.getFSDir(localDir);
+
+        if (localFSDir == null) {
+            log.warn("Couldn't get FSDirectory instance for {}.", localDir);
+            return false;
+        }
+
+        File f = new File(localFSDir, name);
+        if (!f.exists()) {
+            log.warn("File {} doesn't exist in {}", name, localFSDir);
+            return false;
+        }
+
+        long modTS = f.lastModified();
+        if (modTS == 0L) {
+            log.warn("Couldn't get lastModification timestamp for {} in {}", name, localFSDir);
+            return false;
+        }
+
+        return modTS < millis;
+    }
+
     public long startCopy(LocalIndexFile file) {
         updateMaxInProgress(copyInProgressCount.incrementAndGet());
         copyInProgressSize.addAndGet(file.getSize());

Modified: jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java?rev=1853216&r1=1853215&r2=1853216&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java (original)
+++ jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnReadDirectory.java Fri Feb  8 13:41:35 2019
@@ -27,7 +27,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -44,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.collect.Maps.newConcurrentMap;
+import static java.util.Arrays.stream;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 
 /**
@@ -53,6 +56,10 @@ import static org.apache.jackrabbit.oak.
 public class CopyOnReadDirectory extends FilterDirectory {
     private static final Logger log = LoggerFactory.getLogger(CopyOnReadDirectory.class);
     private static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
+
+    public static final String DELETE_MARGIN_MILLIS_NAME = "oak.lucene.delete.margin";
+    public final long DELETE_MARGIN_MILLIS = Long.getLong(DELETE_MARGIN_MILLIS_NAME, TimeUnit.MINUTES.toMillis(5));
+
     private final IndexCopier indexCopier;
     private final Directory remote;
     private final Directory local;
@@ -61,11 +68,6 @@ public class CopyOnReadDirectory extends
     private final AtomicBoolean closed = new AtomicBoolean();
 
     private final ConcurrentMap<String, CORFileReference> files = newConcurrentMap();
-    /**
-     * Set of fileNames bound to current local dir. It is updated with any new file
-     * which gets added by this directory
-     */
-    private final Set<String> localFileNames = Sets.newConcurrentHashSet();
 
     public CopyOnReadDirectory(IndexCopier indexCopier, Directory remote, Directory local, boolean prefetch,
                                String indexPath, Executor executor) throws IOException {
@@ -76,10 +78,6 @@ public class CopyOnReadDirectory extends
         this.local = local;
         this.indexPath = indexPath;
 
-        this.localFileNames.addAll(Arrays.asList(local.listAll()));
-        //Remove files which are being worked upon by COW
-        this.localFileNames.removeAll(indexCopier.getIndexFilesBeingWritten(indexPath));
-
         if (prefetch) {
             prefetchIndexFiles();
         }
@@ -309,11 +307,29 @@ public class CopyOnReadDirectory extends
     }
 
     private void removeDeletedFiles() throws IOException {
-        //Files present in dest but not present in source have to be deleted
-        Set<String> filesToBeDeleted = Sets.difference(
-                ImmutableSet.copyOf(localFileNames),
-                ImmutableSet.copyOf(remote.listAll())
-        );
+        Set<String> remoteFiles = stream(remote.listAll())
+                .filter(name -> !IndexCopier.REMOTE_ONLY.contains(name))
+                .collect(Collectors.toSet());
+
+        long maxTS = IndexCopier.getNewestLocalFSTimestampFor(remoteFiles, local);
+        if (maxTS == -1) {
+            log.warn("Couldn't compute safe timestamp to delete files from {}", local);
+            return;
+        }
+
+        // subtract DELETE_MARGIN_MILLIS from maxTS for safety (you can never be too careful with time)
+        final long deleteBeforeTS = maxTS - DELETE_MARGIN_MILLIS;
+
+        Set<String> filesToBeDeleted =
+                // Files present locally
+                ImmutableSet.copyOf(local.listAll()).stream()
+                // but not in my view
+                .filter(name -> !remoteFiles.contains(name))
+                // and also older than a safe timestamp (deleteBeforeTS)
+                .filter(name -> IndexCopier.isFileModifiedBefore(name, local, deleteBeforeTS))
+                // can be deleted
+                .collect(Collectors.toSet())
+        ;
 
         Set<String> failedToDelete = Sets.newHashSet();
 
@@ -352,7 +368,6 @@ public class CopyOnReadDirectory extends
 
         void markValid(){
             this.valid = true;
-            localFileNames.add(name);
         }
     }
 }

Modified: jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java?rev=1853216&r1=1853215&r2=1853216&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java (original)
+++ jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/CopyOnWriteDirectory.java Fri Feb  8 13:41:35 2019
@@ -143,7 +143,6 @@ public class CopyOnWriteDirectory extend
         this.executor = executor;
         this.indexPath = indexPath;
         this.reindexMode = reindexMode;
-        indexCopier.clearIndexFilesBeingWritten(indexPath);
         initialize();
     }
 
@@ -183,7 +182,6 @@ public class CopyOnWriteDirectory extend
         }
         ref = new COWLocalFileReference(name);
         fileMap.put(name, ref);
-        indexCopier.addIndexFileBeingWritten(indexPath, name);
         return ref.createOutput(context);
     }
 
@@ -260,7 +258,6 @@ public class CopyOnWriteDirectory extend
 
         local.close();
         remote.close();
-        indexCopier.clearIndexFilesBeingWritten(indexPath);
     }
 
     @Override

Modified: jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java?rev=1853216&r1=1853215&r2=1853216&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java (original)
+++ jackrabbit/oak/branches/1.8/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/LocalIndexFile.java Fri Feb  8 13:41:35 2019
@@ -116,7 +116,7 @@ public final class LocalIndexFile {
         return dir != null ? new File(dir, name).length() : 0;
     }
 
-    static File getFSDir(Directory dir) {
+    public static File getFSDir(Directory dir) {
         if (dir instanceof FilterDirectory){
             dir = ((FilterDirectory) dir).getDelegate();
         }

Copied: jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java (from r1836548, jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java?p2=jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java&p1=jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java&r1=1836548&r2=1853216&rev=1853216&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java (original)
+++ jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierCleanupTest.java Fri Feb  8 13:41:35 2019
@@ -45,7 +45,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
-import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT;
+import static org.apache.jackrabbit.oak.InitialContent.INITIAL_CONTENT;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory.DELETE_MARGIN_MILLIS_NAME;
 import static org.junit.Assert.assertEquals;
@@ -352,6 +352,98 @@ public class IndexCopierCleanupTest {
                 Sets.newHashSet(new SimpleFSDirectory(localFSDir).listAll()));
     }
 
+    @Test
+    public void remoteOnlyFilesDontAvoidDeletion() throws Exception {
+        writeFile(remote, "a");
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            writeFile(remote, name);
+        }
+        remote.close();
+
+        // get files to get locally copied
+        copier.getCoRDir().close();
+
+        remote.deleteFile("a");
+        writeFile(remote, "b");
+        remote.close();
+
+        assertTrue(existsLocally("a"));
+
+        copier.getCoRDir().close();
+
+        assertFalse(existsLocally("a"));
+        assertTrue(existsLocally("b"));
+    }
+
+    @Test
+    public void remoteOnlyFilesIfExistingGetDeleted() throws Exception {
+        Directory cow = copier.getCoWDir();
+        writeFile(cow, "a");
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            writeFile(cow, name);
+        }
+        cow.close();
+
+        remote.deleteFile("a");
+        writeFile(remote, "b");
+        remote.close();
+
+        assertTrue(existsLocally("a"));
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            assertTrue(existsLocally(name));
+        }
+
+        copier.getCoRDir().close();
+
+        assertFalse(existsLocally("a"));
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            assertFalse(existsLocally(name));
+        }
+        assertTrue(existsLocally("b"));
+    }
+
+    @Test
+    public void remoteOnlyFilesNotCleanedIfUpdatedRecently() throws Exception {
+        // Create remote_only files (and a normal one)
+        Directory cow = copier.getCoWDir();
+        writeFile(cow, "a");
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            writeFile(cow, name);
+        }
+        cow.close();
+
+        // delete all existing files and create a new normal one
+        remote.deleteFile("a");
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            remote.deleteFile(name);
+        }
+        writeFile(remote, "b");
+        remote.close();
+
+        // get a CoR at this state (only sees "b" in list of files)
+        Directory cor = copier.getCoRDir();
+
+        // re-create remote_only files
+        cow = copier.getCoWDir();
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            writeFile(cow, name);
+        }
+        cow.close();
+
+        assertTrue(existsLocally("a"));
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            assertTrue(existsLocally(name));
+        }
+
+        cor.close();
+
+        assertFalse(existsLocally("a"));
+        for (String name : IndexCopier.REMOTE_ONLY) {
+            assertTrue(existsLocally(name));
+        }
+        assertTrue(existsLocally("b"));
+    }
+
     private boolean existsLocally(String fileName) {
         return new File(localFSDir, fileName).exists();
     }

Modified: jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1853216&r1=1853215&r2=1853216&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/branches/1.8/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Fri Feb  8 13:41:35 2019
@@ -44,6 +44,7 @@ import javax.management.openmbean.Tabula
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ForwardingListeningExecutorService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -54,12 +55,15 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.store.SimpleFSDirectory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -70,6 +74,7 @@ import static com.google.common.collect.
 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static org.apache.jackrabbit.oak.InitialContent.INITIAL_CONTENT;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory.DELETE_MARGIN_MILLIS_NAME;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -79,7 +84,6 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 public class IndexCopierTest {
     private Random rnd = new Random();
@@ -87,6 +91,8 @@ public class IndexCopierTest {
 
     private NodeState root = INITIAL_CONTENT;
 
+    private static final Clock CLOCK = new Clock.Virtual();
+
     @Rule
     public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
 
@@ -94,9 +100,26 @@ public class IndexCopierTest {
 
     private String indexPath = "/oak:index/test";
 
+    private final Closer closer = Closer.create();
+    static {
+        try {
+            CLOCK.waitUntil(Clock.SIMPLE.getTime());
+        } catch (InterruptedException e) {
+            // ignored
+        }
+    }
+
     @Before
     public void setUp(){
+        System.setProperty(DELETE_MARGIN_MILLIS_NAME, String.valueOf(TimeUnit.SECONDS.toMillis(1)));
         LuceneIndexEditorContext.configureUniqueId(builder);
+        DelayCopyingSimpleFSDirectory.temporaryFolder = temporaryFolder;
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        closer.close();
+        DelayCopyingSimpleFSDirectory.temporaryFolder = null;
     }
 
     @Test
@@ -277,7 +300,7 @@ public class IndexCopierTest {
 
         IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
 
-        TestRAMDirectory remote = new TestRAMDirectory();
+        FileTrackingDirectory remote = new FileTrackingDirectory();
         Directory wrapped = c1.wrapForRead("/foo", defn, remote, INDEX_DATA_CHILD_NAME);
 
         byte[] t1 = writeFile(remote , "t1");
@@ -329,7 +352,7 @@ public class IndexCopierTest {
 
         final CountDownLatch copyProceed = new CountDownLatch(1);
         final CountDownLatch copyRequestArrived = new CountDownLatch(1);
-        TestRAMDirectory remote = new TestRAMDirectory(){
+        FileTrackingDirectory remote = new FileTrackingDirectory(){
             @Override
             public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
                 copyRequestArrived.countDown();
@@ -384,7 +407,7 @@ public class IndexCopierTest {
         IndexDefinition defn = new IndexDefinition(root, builder.getNodeState(), "/foo");
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
 
-        TestRAMDirectory remote = new TestRAMDirectory();
+        FileTrackingDirectory remote = new FileTrackingDirectory();
         Directory wrapped = c1.wrapForRead("/foo", defn, remote, INDEX_DATA_CHILD_NAME);
 
         byte[] t1 = writeFile(remote, "t1");
@@ -442,18 +465,14 @@ public class IndexCopierTest {
     }
 
     @Test
-    public void deletesOnClose() throws Exception{
-        //Use a close safe dir. In actual case the FSDir would
-        //be opened on same file system hence it can retain memory
-        //but RAMDirectory does not retain memory hence we simulate
-        //that by not closing the RAMDir and reuse it
+    public void deletesOnClose() throws Exception {
         Directory baseDir = new CloseSafeDir();
 
 
         IndexDefinition defn = new IndexDefinition(root, builder.getNodeState(), "/foo");
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
 
-        Directory r1 = new RAMDirectory();
+        Directory r1 = new DelayCopyingSimpleFSDirectory();
 
         byte[] t1 = writeFile(r1, "t1");
         byte[] t2 = writeFile(r1 , "t2");
@@ -496,7 +515,7 @@ public class IndexCopierTest {
         IndexDefinition defn = new IndexDefinition(root, builder.getNodeState(), "/foo");
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
 
-        Directory r1 = new RAMDirectory();
+        Directory r1 = new DelayCopyingSimpleFSDirectory();
 
         byte[] t1 = writeFile(r1, "t1");
         byte[] t2 = writeFile(r1 , "t2");
@@ -974,10 +993,15 @@ public class IndexCopierTest {
 
         Directory remote = new CloseSafeDir();
         byte[] f1 = writeFile(remote, "f1");
+        byte[] f1a = writeFile(remote, "f1a");
 
         Directory cor1 = copier.wrapForRead(indexPath, defn, remote, INDEX_DATA_CHILD_NAME);
         readAndAssert(cor1, "f1", f1);
+        readAndAssert(cor1, "f1a", f1a);
+        //Ensure that deletion task submitted to executor get processed immediately
+        executor.enableImmediateExecution();
         cor1.close();
+        executor.enableDelayedExecution();
 
         final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
         Directory remote2 = new FilterDirectory(remote) {
@@ -999,8 +1023,13 @@ public class IndexCopierTest {
         //Before copy is done to remote lets delete f1 from remote and
         //open a COR and close it such that it triggers delete of f1
         remote.deleteFile("f1");
+        writeFile(remote, "f1b");
         Directory cor2 = copier.wrapForRead(indexPath, defn, remote, INDEX_DATA_CHILD_NAME);
 
+        // Since we're talking of hypothetical stuck cow1 stuck and are running next CoW cycle on 'remote', let's
+        // update timestamps on file that cow1 created to maintain some temporal sanity
+        updateLastModified(cow1, "f2");
+
         //Ensure that deletion task submitted to executor get processed immediately
         executor.enableImmediateExecution();
         cor2.close();
@@ -1062,9 +1091,16 @@ public class IndexCopierTest {
         IndexOutput o = dir.createOutput(name, IOContext.DEFAULT);
         o.writeBytes(data, data.length);
         o.close();
+
+        updateLastModified(dir, name);
+
         return data;
     }
 
+    private static void updateLastModified(Directory dir, String name) throws IOException {
+        DelayCopyingSimpleFSDirectory.updateLastModified(dir, name);
+    }
+
     private byte[] randomBytes(int size) {
         byte[] data = new byte[size];
         rnd.nextBytes(data);
@@ -1113,9 +1149,57 @@ public class IndexCopierTest {
         }
     }
 
-    private static class TestRAMDirectory extends RAMDirectory {
+    private static class DelayCopyingSimpleFSDirectory extends SimpleFSDirectory {
+        private static TemporaryFolder temporaryFolder;
+
+        public DelayCopyingSimpleFSDirectory() throws IOException {
+            super(temporaryFolder.newFolder());
+        }
+
+        public static void updateLastModified(Directory dir, String name) {
+            DelayCopyingSimpleFSDirectory d = null;
+            if (dir instanceof DelayCopyingSimpleFSDirectory) {
+                d = (DelayCopyingSimpleFSDirectory)dir;
+            } else if (dir instanceof FilterDirectory) {
+                Directory delegate = ((FilterDirectory)dir).getDelegate();
+                if (delegate instanceof DelayCopyingSimpleFSDirectory) {
+                    d = (DelayCopyingSimpleFSDirectory)delegate;
+                }
+            }
+
+            if (d != null) {
+                d.updateLastModified(name);
+            }
+        }
+
+        private void updateLastModified(String name) {
+            try {
+                // Update file timestamp manually to mimic last updated time updates without sleeping
+                CLOCK.waitUntil(CLOCK.getTime() + TimeUnit.SECONDS.toMillis(2));
+
+                File f = new File(directory, name);
+                f.setLastModified(CLOCK.getTimeIncreasing());
+            } catch (InterruptedException ie) {
+                // ignored
+            }
+        }
+
+        @Override
+        public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
+            super.copy(to, src, dest, context);
+
+            if (to instanceof DelayCopyingSimpleFSDirectory) {
+                ((DelayCopyingSimpleFSDirectory)to).updateLastModified(dest);
+            }
+        }
+    }
+
+    private class FileTrackingDirectory extends DelayCopyingSimpleFSDirectory {
         final List<String> openedFiles = newArrayList();
 
+        public FileTrackingDirectory() throws IOException {
+        }
+
         @Override
         public IndexInput openInput(String name, IOContext context) throws IOException {
             openedFiles.add(name);
@@ -1127,10 +1211,17 @@ public class IndexCopierTest {
         }
     }
 
-    private static class CloseSafeDir extends RAMDirectory {
+    private class CloseSafeDir extends DelayCopyingSimpleFSDirectory {
+        public CloseSafeDir() throws IOException {
+        }
+
         @Override
         public void close() {
+            closer.register(this::close0);
+        }
 
+        private void close0() {
+            super.close();
         }
     }