You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/16 10:27:44 UTC

svn commit: r1691333 - in /jackrabbit/oak/trunk/oak-lucene/src: main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Author: chetanm
Date: Thu Jul 16 08:27:43 2015
New Revision: 1691333

URL: http://svn.apache.org/r1691333
Log:
OAK-3110 - AsyncIndexer fails due to FileNotFoundException thrown by CopyOnWrite logic

COR and COW now share a working set which would allow COW to inform COR about new files it has created. COR would then ignore such files

Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691333&r1=1691332&r2=1691333&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Thu Jul 16 08:27:43 2015
@@ -75,6 +75,7 @@ import static com.google.common.base.Pre
 import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Maps.newConcurrentMap;
+import static com.google.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 
 public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
@@ -111,6 +112,7 @@ public class IndexCopier implements Copy
 
 
     private final Map<String, String> indexPathMapping = newConcurrentMap();
+    private final Map<String, Set<String>> sharedWorkingSetMap = newHashMap();
     private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
     private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
@@ -131,12 +133,13 @@ public class IndexCopier implements Copy
     public Directory wrapForRead(String indexPath, IndexDefinition definition,
             Directory remote) throws IOException {
         Directory local = createLocalDirForIndexReader(indexPath, definition);
-        return new CopyOnReadDirectory(remote, local, prefetchEnabled, indexPath);
+        return new CopyOnReadDirectory(remote, local, prefetchEnabled, indexPath, getSharedWorkingSet(definition));
     }
 
     public Directory wrapForWrite(IndexDefinition definition, Directory remote, boolean reindexMode) throws IOException {
         Directory local = createLocalDirForIndexWriter(definition);
-        return new CopyOnWriteDirectory(remote, local, reindexMode, getIndexPathForLogging(definition));
+        return new CopyOnWriteDirectory(remote, local, reindexMode,
+                getIndexPathForLogging(definition), getSharedWorkingSet(definition));
     }
 
     @Override
@@ -238,6 +241,34 @@ public class IndexCopier implements Copy
     }
 
     /**
+     * Provide the corresponding shared state to enable COW inform COR
+     * about new files it is creating while indexing. This would allow COR to ignore
+     * such files while determining the deletion candidates.
+     *
+     * @param defn index definition for which the directory is being created
+     * @return a set to maintain the state of new files being created by the COW Directory
+     */
+    private Set<String> getSharedWorkingSet(IndexDefinition defn){
+        String indexPath = defn.getIndexPathFromConfig();
+
+        if (indexPath == null){
+            //With indexPath null the working directory would not
+            //be shared between COR and COW. So just return a new set
+            return new HashSet<String>();
+        }
+
+        Set<String> sharedSet;
+        synchronized (sharedWorkingSetMap){
+            sharedSet = sharedWorkingSetMap.get(indexPath);
+            if (sharedSet == null){
+                sharedSet = Sets.newConcurrentHashSet();
+                sharedWorkingSetMap.put(indexPath, sharedSet);
+            }
+        }
+        return sharedSet;
+    }
+
+    /**
      * Creates the workDir. If it exists then it is cleaned
      *
      * @param indexRootDir root directory under which all indexing related files are managed
@@ -274,12 +305,17 @@ public class IndexCopier implements Copy
          */
         private final Set<String> localFileNames = Sets.newConcurrentHashSet();
 
-        public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch, String indexPath) throws IOException {
+        public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch,
+                                   String indexPath, Set<String> sharedWorkingSet) throws IOException {
             super(remote);
             this.remote = remote;
             this.local = local;
             this.indexPath = indexPath;
+
             this.localFileNames.addAll(Arrays.asList(local.listAll()));
+            //Remove files which are being worked upon by COW
+            this.localFileNames.removeAll(sharedWorkingSet);
+
             if (prefetch) {
                 prefetchIndexFiles();
             }
@@ -549,6 +585,7 @@ public class IndexCopier implements Copy
         private final CountDownLatch copyDone = new CountDownLatch(1);
         private final boolean reindexMode;
         private final String indexPathForLogging;
+        private final Set<String> sharedWorkingSet;
 
         /**
          * Current background task
@@ -602,12 +639,13 @@ public class IndexCopier implements Copy
         };
 
         public CopyOnWriteDirectory(Directory remote, Directory local, boolean reindexMode,
-                                    String indexPathForLogging) throws IOException {
+                                    String indexPathForLogging, Set<String> sharedWorkingSet) throws IOException {
             super(local);
             this.remote = remote;
             this.local = local;
             this.indexPathForLogging = indexPathForLogging;
             this.reindexMode = reindexMode;
+            this.sharedWorkingSet = sharedWorkingSet;
             initialize();
         }
 
@@ -647,6 +685,7 @@ public class IndexCopier implements Copy
             }
             ref = new COWLocalFileReference(name);
             fileMap.put(name, ref);
+            sharedWorkingSet.add(name);
             return ref.createOutput(context);
         }
 
@@ -723,6 +762,7 @@ public class IndexCopier implements Copy
 
             local.close();
             remote.close();
+            sharedWorkingSet.clear();
         }
 
         @Override
@@ -994,7 +1034,7 @@ public class IndexCopier implements Copy
         } catch (IOException e) {
             failedToDelete(file);
             log.debug("Error occurred while removing deleted file {} from Local {}. " +
-                    "Attempt would be maid to delete it on next run ", fileName, dir, e);
+                    "Attempt would be made to delete it on next run ", fileName, dir, e);
         }
         return successFullyDeleted;
     }

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691333&r1=1691332&r2=1691333&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Thu Jul 16 08:27:43 2015
@@ -896,10 +896,15 @@ public class IndexCopierTest {
         assertNotNull("Close should have thrown an exception", error.get());
     }
 
-    @Ignore("OAK-3110")
+    /**
+     * Test the interaction between COR and COW using same underlying directory
+     */
     @Test
-    public void cowAndCorConcurrentAccess() throws Exception{
+    public void cowConcurrentAccess() throws Exception{
         CollectingExecutor executor = new CollectingExecutor();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        executor.setForwardingExecutor(executorService);
+
         Directory baseDir = new CloseSafeDir();
         String indexPath = "/foo";
         builder.setProperty(LuceneIndexConstants.INDEX_PATH, indexPath);
@@ -948,6 +953,7 @@ public class IndexCopierTest {
         cow1.close();
         assertTrue("f2 should exist", remote.fileExists("f2"));
 
+        executorService.shutdown();
     }
 
     private byte[] writeFile(Directory dir, String name) throws IOException {
@@ -1028,20 +1034,22 @@ public class IndexCopierTest {
 
     private static class CollectingExecutor implements Executor {
         final BlockingQueue<Runnable> commands = new LinkedBlockingQueue<Runnable>();
-        private boolean immediateExecution = false;
+        private volatile boolean immediateExecution = false;
         private volatile Executor forwardingExecutor;
 
         @Override
         public void execute(Runnable command) {
+            if (immediateExecution){
+                command.run();
+                return;
+            }
+
             if (forwardingExecutor != null){
                 forwardingExecutor.execute(command);
                 return;
             }
-            if (immediateExecution){
-                command.run();
-            } else {
-                commands.add(command);
-            }
+
+            commands.add(command);
         }
 
         void executeAll(){