You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/15 13:19:59 UTC
svn commit: r1691167 - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/
Author: chetanm
Date: Wed Jul 15 11:19:59 2015
New Revision: 1691167
URL: http://svn.apache.org/r1691167
Log:
OAK-3098 - CopyOnWrite might block Async indexer thread indefinitely
Modified logic to enable better coordination between IndexCopier close and CopyOnWrite directory. Also the latch would be awaited with a timeout. This would ensure that any bug should not cause async indexer thread to get stuck indefinitely
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java (with props)
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Wed Jul 15 11:19:59 2015
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.index.lucene;
+import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,13 +70,14 @@ import org.apache.lucene.store.NoLockFac
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
-public class IndexCopier implements CopyOnReadStatsMBean {
+public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
private static final Set<String> REMOTE_ONLY = ImmutableSet.of("segments.gen");
private static final int MAX_FAILURE_ENTRIES = 10000;
private static final AtomicInteger UNIQUE_COUNTER = new AtomicInteger();
@@ -112,6 +115,7 @@ public class IndexCopier implements Copy
private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
private final boolean prefetchEnabled;
+ private volatile boolean closed;
public IndexCopier(Executor executor, File indexRootDir) throws IOException {
this(executor, indexRootDir, false);
@@ -135,6 +139,11 @@ public class IndexCopier implements Copy
return new CopyOnWriteDirectory(remote, local, reindexMode);
}
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+ }
+
File getIndexWorkDir() {
return indexWorkDir;
}
@@ -457,6 +466,11 @@ public class IndexCopier implements Copy
});
}
+ @Override
+ public String toString() {
+ return String.format("[COR] Local %s, Remote %s", local, remote);
+ }
+
private void removeDeletedFiles() throws IOException {
//Files present in dest but not present in source have to be deleted
Set<String> filesToBeDeleted = Sets.difference(
@@ -567,7 +581,12 @@ public class IndexCopier implements Copy
@Override
public void run() {
currentTask = new NotifyingFutureTask(task);
- executor.execute(currentTask);
+ try {
+ executor.execute(currentTask);
+ } catch (RejectedExecutionException e){
+ checkIfClosed(false);
+ throw e;
+ }
}
};
@@ -645,7 +664,16 @@ public class IndexCopier implements Copy
//Wait for all pending copy task to finish
try {
long start = PERF_LOGGER.start();
- copyDone.await();
+
+ //Loop untill queue finished or IndexCopier
+ //found to be closed. Doing it with timeout to
+ //prevent any bug causing the thread to wait indefinitely
+ while (!copyDone.await(10, TimeUnit.SECONDS)) {
+ if (closed) {
+ throw new IndexCopierClosedException("IndexCopier found to be closed " +
+ "while processing copy task for" + remote.toString());
+ }
+ }
PERF_LOGGER.end(start, -1, "Completed pending copying task {}", pendingCopies);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -657,6 +685,10 @@ public class IndexCopier implements Copy
throw new IOException("Error occurred while copying files", t);
}
+ //Sanity check
+ checkArgument(queue.isEmpty(), "Copy queue still " +
+ "has pending task left [%d]. %s", queue.size(), queue);
+
long skippedFilesSize = getSkippedFilesSize();
for (String fileName : deletedFilesLocal){
@@ -680,6 +712,11 @@ public class IndexCopier implements Copy
remote.close();
}
+ @Override
+ public String toString() {
+ return String.format("[COW] Local %s, Remote %s", local, remote);
+ }
+
private long getSkippedFilesSize() {
long size = 0;
for (String name : skippedFiles){
@@ -757,10 +794,24 @@ public class IndexCopier implements Copy
}
private void addTask(Callable<Void> task){
+ checkIfClosed(true);
queue.add(task);
currentTask.onComplete(completionHandler);
}
+ private void checkIfClosed(boolean throwException) {
+ if (closed) {
+ IndexCopierClosedException e = new IndexCopierClosedException("IndexCopier found to be closed " +
+ "while processing" +remote.toString());
+ errorInCopy.set(e);
+ copyDone.countDown();
+
+ if (throwException) {
+ throw e;
+ }
+ }
+ }
+
private abstract class COWFileReference {
protected final String name;
@@ -1000,6 +1051,11 @@ public class IndexCopier implements Copy
}
}
}
+
+ @Override
+ public String toString() {
+ return "DeleteOldDirOnClose wrapper for " + getDelegate();
+ }
}
static final class LocalIndexFile {
Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java?rev=1691167&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java Wed Jul 15 11:19:59 2015
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+public class IndexCopierClosedException extends RuntimeException{
+ public IndexCopierClosedException(String message) {
+ super(message);
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Wed Jul 15 11:19:59 2015
@@ -184,7 +184,7 @@ public class LuceneIndexProviderService
}
@Deactivate
- private void deactivate() throws InterruptedException {
+ private void deactivate() throws InterruptedException, IOException {
for (ServiceRegistration reg : regs) {
reg.unregister();
}
@@ -202,6 +202,11 @@ public class LuceneIndexProviderService
indexProvider = null;
}
+ //Close the copier first i.e. before executorService
+ if (indexCopier != null){
+ indexCopier.close();
+ }
+
if (executorService != null){
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java Wed Jul 15 11:19:59 2015
@@ -166,6 +166,11 @@ class OakDirectory extends Directory {
return lockFactory;
}
+ @Override
+ public String toString() {
+ return "Directory for " + definition.getIndexName();
+ }
+
private Set<String> getListing(){
long start = PERF_LOGGER.start();
Iterable<String> fileNames = null;
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Wed Jul 15 11:19:59 2015
@@ -24,7 +24,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -37,6 +36,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.TabularData;
@@ -69,6 +70,7 @@ import static org.apache.jackrabbit.oak.
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@@ -838,8 +840,58 @@ public class IndexCopierTest {
}
@Test
- public void cowIndexPathNotDefined() throws Exception{
+ public void cowPoolClosedWithTaskInQueue() throws Exception{
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ Directory baseDir = new CloseSafeDir();
+ IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+ IndexCopier copier = new RAMIndexCopier(baseDir, executorService, getWorkDir());
+
+ final Set<String> toPause = Sets.newHashSet();
+ final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
+ Directory remote = new CloseSafeDir() {
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ if (toPause.contains(name)){
+ try {
+ pauseCopyLatch.await();
+ } catch (InterruptedException ignore) {
+
+ }
+ }
+ return super.createOutput(name, context);
+ }
+ };
+
+ final Directory local = copier.wrapForWrite(defn, remote, false);
+ toPause.add("t2");
+ byte[] t1 = writeFile(local, "t1");
+ byte[] t2 = writeFile(local, "t2");
+ byte[] t3 = writeFile(local, "t3");
+ byte[] t4 = writeFile(local, "t4");
+
+ final AtomicReference<Throwable> error =
+ new AtomicReference<Throwable>();
+ Thread closer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ local.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ error.set(e);
+ }
+ }
+ });
+
+ closer.start();
+
+ copier.close();
+ executorService.shutdown();
+ executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+ pauseCopyLatch.countDown();
+ closer.join();
+ assertNotNull("Close should have thrown an exception", error.get());
}
private byte[] writeFile(Directory dir, String name) throws IOException {