You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/15 13:35:16 UTC
svn commit: r1691172 - in /jackrabbit/oak/branches/1.2: ./
oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/
oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/
Author: chetanm
Date: Wed Jul 15 11:35:16 2015
New Revision: 1691172
URL: http://svn.apache.org/r1691172
Log:
OAK-3098 - CopyOnWrite might block Async indexer thread indefinitely
Merged 1691167
Added:
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java
- copied unchanged from r1691167, jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java
Modified:
jackrabbit/oak/branches/1.2/ (props changed)
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jul 15 11:35:16 2015
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685370,1685552
,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691151
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820,1684868,1685023,1685370,1685552
,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691151,1691167
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691172&r1=1691171&r2=1691172&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Wed Jul 15 11:35:16 2015
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.plugins.index.lucene;
+import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,13 +70,14 @@ import org.apache.lucene.store.NoLockFac
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
-public class IndexCopier implements CopyOnReadStatsMBean {
+public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
private static final Set<String> REMOTE_ONLY = ImmutableSet.of("segments.gen");
private static final int MAX_FAILURE_ENTRIES = 10000;
private static final AtomicInteger UNIQUE_COUNTER = new AtomicInteger();
@@ -112,6 +115,7 @@ public class IndexCopier implements Copy
private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
private final boolean prefetchEnabled;
+ private volatile boolean closed;
public IndexCopier(Executor executor, File indexRootDir) throws IOException {
this(executor, indexRootDir, false);
@@ -135,6 +139,11 @@ public class IndexCopier implements Copy
return new CopyOnWriteDirectory(remote, local, reindexMode);
}
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+ }
+
File getIndexWorkDir() {
return indexWorkDir;
}
@@ -457,6 +466,11 @@ public class IndexCopier implements Copy
});
}
+ @Override
+ public String toString() {
+ return String.format("[COR] Local %s, Remote %s", local, remote);
+ }
+
private void removeDeletedFiles() throws IOException {
//Files present in dest but not present in source have to be deleted
Set<String> filesToBeDeleted = Sets.difference(
@@ -567,7 +581,12 @@ public class IndexCopier implements Copy
@Override
public void run() {
currentTask = new NotifyingFutureTask(task);
- executor.execute(currentTask);
+ try {
+ executor.execute(currentTask);
+ } catch (RejectedExecutionException e){
+ checkIfClosed(false);
+ throw e;
+ }
}
};
@@ -645,7 +664,16 @@ public class IndexCopier implements Copy
//Wait for all pending copy task to finish
try {
long start = PERF_LOGGER.start();
- copyDone.await();
+
+ //Loop untill queue finished or IndexCopier
+ //found to be closed. Doing it with timeout to
+ //prevent any bug causing the thread to wait indefinitely
+ while (!copyDone.await(10, TimeUnit.SECONDS)) {
+ if (closed) {
+ throw new IndexCopierClosedException("IndexCopier found to be closed " +
+ "while processing copy task for" + remote.toString());
+ }
+ }
PERF_LOGGER.end(start, -1, "Completed pending copying task {}", pendingCopies);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -657,6 +685,10 @@ public class IndexCopier implements Copy
throw new IOException("Error occurred while copying files", t);
}
+ //Sanity check
+ checkArgument(queue.isEmpty(), "Copy queue still " +
+ "has pending task left [%d]. %s", queue.size(), queue);
+
long skippedFilesSize = getSkippedFilesSize();
for (String fileName : deletedFilesLocal){
@@ -680,6 +712,11 @@ public class IndexCopier implements Copy
remote.close();
}
+ @Override
+ public String toString() {
+ return String.format("[COW] Local %s, Remote %s", local, remote);
+ }
+
private long getSkippedFilesSize() {
long size = 0;
for (String name : skippedFiles){
@@ -757,10 +794,24 @@ public class IndexCopier implements Copy
}
private void addTask(Callable<Void> task){
+ checkIfClosed(true);
queue.add(task);
currentTask.onComplete(completionHandler);
}
+ private void checkIfClosed(boolean throwException) {
+ if (closed) {
+ IndexCopierClosedException e = new IndexCopierClosedException("IndexCopier found to be closed " +
+ "while processing" +remote.toString());
+ errorInCopy.set(e);
+ copyDone.countDown();
+
+ if (throwException) {
+ throw e;
+ }
+ }
+ }
+
private abstract class COWFileReference {
protected final String name;
@@ -1000,6 +1051,11 @@ public class IndexCopier implements Copy
}
}
}
+
+ @Override
+ public String toString() {
+ return "DeleteOldDirOnClose wrapper for " + getDelegate();
+ }
}
static final class LocalIndexFile {
Modified: jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1691172&r1=1691171&r2=1691172&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Wed Jul 15 11:35:16 2015
@@ -184,7 +184,7 @@ public class LuceneIndexProviderService
}
@Deactivate
- private void deactivate() throws InterruptedException {
+ private void deactivate() throws InterruptedException, IOException {
for (ServiceRegistration reg : regs) {
reg.unregister();
}
@@ -202,6 +202,11 @@ public class LuceneIndexProviderService
indexProvider = null;
}
+ //Close the copier first i.e. before executorService
+ if (indexCopier != null){
+ indexCopier.close();
+ }
+
if (executorService != null){
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
Modified: jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1691172&r1=1691171&r2=1691172&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java (original)
+++ jackrabbit/oak/branches/1.2/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java Wed Jul 15 11:35:16 2015
@@ -166,6 +166,11 @@ class OakDirectory extends Directory {
return lockFactory;
}
+ @Override
+ public String toString() {
+ return "Directory for " + definition.getIndexName();
+ }
+
private Set<String> getListing(){
long start = PERF_LOGGER.start();
Iterable<String> fileNames = null;
Modified: jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691172&r1=1691171&r2=1691172&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Wed Jul 15 11:35:16 2015
@@ -24,7 +24,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@@ -37,6 +36,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.TabularData;
@@ -69,6 +70,7 @@ import static org.apache.jackrabbit.oak.
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@@ -838,8 +840,58 @@ public class IndexCopierTest {
}
@Test
- public void cowIndexPathNotDefined() throws Exception{
+ public void cowPoolClosedWithTaskInQueue() throws Exception{
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ Directory baseDir = new CloseSafeDir();
+ IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+ IndexCopier copier = new RAMIndexCopier(baseDir, executorService, getWorkDir());
+
+ final Set<String> toPause = Sets.newHashSet();
+ final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
+ Directory remote = new CloseSafeDir() {
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ if (toPause.contains(name)){
+ try {
+ pauseCopyLatch.await();
+ } catch (InterruptedException ignore) {
+
+ }
+ }
+ return super.createOutput(name, context);
+ }
+ };
+
+ final Directory local = copier.wrapForWrite(defn, remote, false);
+ toPause.add("t2");
+ byte[] t1 = writeFile(local, "t1");
+ byte[] t2 = writeFile(local, "t2");
+ byte[] t3 = writeFile(local, "t3");
+ byte[] t4 = writeFile(local, "t4");
+
+ final AtomicReference<Throwable> error =
+ new AtomicReference<Throwable>();
+ Thread closer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ local.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ error.set(e);
+ }
+ }
+ });
+
+ closer.start();
+
+ copier.close();
+ executorService.shutdown();
+ executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+ pauseCopyLatch.countDown();
+ closer.join();
+ assertNotNull("Close should have thrown an exception", error.get());
}
private byte[] writeFile(Directory dir, String name) throws IOException {