You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/16 10:27:44 UTC
svn commit: r1691333 - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Author: chetanm
Date: Thu Jul 16 08:27:43 2015
New Revision: 1691333
URL: http://svn.apache.org/r1691333
Log:
OAK-3110 - AsyncIndexer fails due to FileNotFoundException thrown by CopyOnWrite logic
COR and COW now share a working set which would allow COW to inform COR about new files it has created. COR would then ignore such files
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691333&r1=1691332&r2=1691333&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Thu Jul 16 08:27:43 2015
@@ -75,6 +75,7 @@ import static com.google.common.base.Pre
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
+import static com.google.common.collect.Maps.newHashMap;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
@@ -111,6 +112,7 @@ public class IndexCopier implements Copy
private final Map<String, String> indexPathMapping = newConcurrentMap();
+ private final Map<String, Set<String>> sharedWorkingSetMap = newHashMap();
private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
@@ -131,12 +133,13 @@ public class IndexCopier implements Copy
public Directory wrapForRead(String indexPath, IndexDefinition definition,
Directory remote) throws IOException {
Directory local = createLocalDirForIndexReader(indexPath, definition);
- return new CopyOnReadDirectory(remote, local, prefetchEnabled, indexPath);
+ return new CopyOnReadDirectory(remote, local, prefetchEnabled, indexPath, getSharedWorkingSet(definition));
}
public Directory wrapForWrite(IndexDefinition definition, Directory remote, boolean reindexMode) throws IOException {
Directory local = createLocalDirForIndexWriter(definition);
- return new CopyOnWriteDirectory(remote, local, reindexMode, getIndexPathForLogging(definition));
+ return new CopyOnWriteDirectory(remote, local, reindexMode,
+ getIndexPathForLogging(definition), getSharedWorkingSet(definition));
}
@Override
@@ -238,6 +241,34 @@ public class IndexCopier implements Copy
}
/**
+ * Provide the corresponding shared state to enable COW inform COR
+ * about new files it is creating while indexing. This would allow COR to ignore
+ * such files while determining the deletion candidates.
+ *
+ * @param defn index definition for which the directory is being created
+ * @return a set to maintain the state of new files being created by the COW Directory
+ */
+ private Set<String> getSharedWorkingSet(IndexDefinition defn){
+ String indexPath = defn.getIndexPathFromConfig();
+
+ if (indexPath == null){
+ //With indexPath null the working directory would not
+ //be shared between COR and COW. So just return a new set
+ return new HashSet<String>();
+ }
+
+ Set<String> sharedSet;
+ synchronized (sharedWorkingSetMap){
+ sharedSet = sharedWorkingSetMap.get(indexPath);
+ if (sharedSet == null){
+ sharedSet = Sets.newConcurrentHashSet();
+ sharedWorkingSetMap.put(indexPath, sharedSet);
+ }
+ }
+ return sharedSet;
+ }
+
+ /**
* Creates the workDir. If it exists then it is cleaned
*
* @param indexRootDir root directory under which all indexing related files are managed
@@ -274,12 +305,17 @@ public class IndexCopier implements Copy
*/
private final Set<String> localFileNames = Sets.newConcurrentHashSet();
- public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch, String indexPath) throws IOException {
+ public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch,
+ String indexPath, Set<String> sharedWorkingSet) throws IOException {
super(remote);
this.remote = remote;
this.local = local;
this.indexPath = indexPath;
+
this.localFileNames.addAll(Arrays.asList(local.listAll()));
+ //Remove files which are being worked upon by COW
+ this.localFileNames.removeAll(sharedWorkingSet);
+
if (prefetch) {
prefetchIndexFiles();
}
@@ -549,6 +585,7 @@ public class IndexCopier implements Copy
private final CountDownLatch copyDone = new CountDownLatch(1);
private final boolean reindexMode;
private final String indexPathForLogging;
+ private final Set<String> sharedWorkingSet;
/**
* Current background task
@@ -602,12 +639,13 @@ public class IndexCopier implements Copy
};
public CopyOnWriteDirectory(Directory remote, Directory local, boolean reindexMode,
- String indexPathForLogging) throws IOException {
+ String indexPathForLogging, Set<String> sharedWorkingSet) throws IOException {
super(local);
this.remote = remote;
this.local = local;
this.indexPathForLogging = indexPathForLogging;
this.reindexMode = reindexMode;
+ this.sharedWorkingSet = sharedWorkingSet;
initialize();
}
@@ -647,6 +685,7 @@ public class IndexCopier implements Copy
}
ref = new COWLocalFileReference(name);
fileMap.put(name, ref);
+ sharedWorkingSet.add(name);
return ref.createOutput(context);
}
@@ -723,6 +762,7 @@ public class IndexCopier implements Copy
local.close();
remote.close();
+ sharedWorkingSet.clear();
}
@Override
@@ -994,7 +1034,7 @@ public class IndexCopier implements Copy
} catch (IOException e) {
failedToDelete(file);
log.debug("Error occurred while removing deleted file {} from Local {}. " +
- "Attempt would be maid to delete it on next run ", fileName, dir, e);
+ "Attempt would be made to delete it on next run ", fileName, dir, e);
}
return successFullyDeleted;
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691333&r1=1691332&r2=1691333&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Thu Jul 16 08:27:43 2015
@@ -896,10 +896,15 @@ public class IndexCopierTest {
assertNotNull("Close should have thrown an exception", error.get());
}
- @Ignore("OAK-3110")
+ /**
+ * Test the interaction between COR and COW using same underlying directory
+ */
@Test
- public void cowAndCorConcurrentAccess() throws Exception{
+ public void cowConcurrentAccess() throws Exception{
CollectingExecutor executor = new CollectingExecutor();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ executor.setForwardingExecutor(executorService);
+
Directory baseDir = new CloseSafeDir();
String indexPath = "/foo";
builder.setProperty(LuceneIndexConstants.INDEX_PATH, indexPath);
@@ -948,6 +953,7 @@ public class IndexCopierTest {
cow1.close();
assertTrue("f2 should exist", remote.fileExists("f2"));
+ executorService.shutdown();
}
private byte[] writeFile(Directory dir, String name) throws IOException {
@@ -1028,20 +1034,22 @@ public class IndexCopierTest {
private static class CollectingExecutor implements Executor {
final BlockingQueue<Runnable> commands = new LinkedBlockingQueue<Runnable>();
- private boolean immediateExecution = false;
+ private volatile boolean immediateExecution = false;
private volatile Executor forwardingExecutor;
@Override
public void execute(Runnable command) {
+ if (immediateExecution){
+ command.run();
+ return;
+ }
+
if (forwardingExecutor != null){
forwardingExecutor.execute(command);
return;
}
- if (immediateExecution){
- command.run();
- } else {
- commands.add(command);
- }
+
+ commands.add(command);
}
void executeAll(){