You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/15 13:19:59 UTC

svn commit: r1691167 - in /jackrabbit/oak/trunk/oak-lucene/src: main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ test/java/org/apache/jackrabbit/oak/plugins/index/lucene/

Author: chetanm
Date: Wed Jul 15 11:19:59 2015
New Revision: 1691167

URL: http://svn.apache.org/r1691167
Log:
OAK-3098 - CopyOnWrite might block Async indexer thread indefinitely

Modified logic to enable better coordination between IndexCopier close and CopyOnWrite directory. Also the latch would be awaited with a timeout. This would ensure that any bug should not cause async indexer thread to get stuck indefinitely

Added:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Wed Jul 15 11:19:59 2015
@@ -19,6 +19,7 @@
 
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,13 +70,14 @@ import org.apache.lucene.store.NoLockFac
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Maps.newConcurrentMap;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 
-public class IndexCopier implements CopyOnReadStatsMBean {
+public class IndexCopier implements CopyOnReadStatsMBean, Closeable {
     private static final Set<String> REMOTE_ONLY = ImmutableSet.of("segments.gen");
     private static final int MAX_FAILURE_ENTRIES = 10000;
     private static final AtomicInteger UNIQUE_COUNTER = new AtomicInteger();
@@ -112,6 +115,7 @@ public class IndexCopier implements Copy
     private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
     private final boolean prefetchEnabled;
+    private volatile boolean closed;
 
     public IndexCopier(Executor executor, File indexRootDir) throws IOException {
         this(executor, indexRootDir, false);
@@ -135,6 +139,11 @@ public class IndexCopier implements Copy
         return new CopyOnWriteDirectory(remote, local, reindexMode);
     }
 
+    @Override
+    public void close() throws IOException {
+        this.closed = true;
+    }
+
     File getIndexWorkDir() {
         return indexWorkDir;
     }
@@ -457,6 +466,11 @@ public class IndexCopier implements Copy
             });
         }
 
+        @Override
+        public String toString() {
+            return String.format("[COR] Local %s, Remote %s", local, remote);
+        }
+
         private void removeDeletedFiles() throws IOException {
             //Files present in dest but not present in source have to be deleted
             Set<String> filesToBeDeleted = Sets.difference(
@@ -567,7 +581,12 @@ public class IndexCopier implements Copy
             @Override
             public void run() {
                 currentTask = new NotifyingFutureTask(task);
-                executor.execute(currentTask);
+                try {
+                    executor.execute(currentTask);
+                } catch (RejectedExecutionException e){
+                    checkIfClosed(false);
+                    throw e;
+                }
             }
         };
 
@@ -645,7 +664,16 @@ public class IndexCopier implements Copy
             //Wait for all pending copy task to finish
             try {
                 long start = PERF_LOGGER.start();
-                copyDone.await();
+
+                //Loop untill queue finished or IndexCopier
+                //found to be closed. Doing it with timeout to
+                //prevent any bug causing the thread to wait indefinitely
+                while (!copyDone.await(10, TimeUnit.SECONDS)) {
+                    if (closed) {
+                        throw new IndexCopierClosedException("IndexCopier found to be closed " +
+                                "while processing copy task for" + remote.toString());
+                    }
+                }
                 PERF_LOGGER.end(start, -1, "Completed pending copying task {}", pendingCopies);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -657,6 +685,10 @@ public class IndexCopier implements Copy
                 throw new IOException("Error occurred while copying files", t);
             }
 
+            //Sanity check
+            checkArgument(queue.isEmpty(), "Copy queue still " +
+                    "has pending task left [%d]. %s", queue.size(), queue);
+
             long skippedFilesSize = getSkippedFilesSize();
 
             for (String fileName : deletedFilesLocal){
@@ -680,6 +712,11 @@ public class IndexCopier implements Copy
             remote.close();
         }
 
+        @Override
+        public String toString() {
+            return String.format("[COW] Local %s, Remote %s", local, remote);
+        }
+
         private long getSkippedFilesSize() {
             long size = 0;
             for (String name : skippedFiles){
@@ -757,10 +794,24 @@ public class IndexCopier implements Copy
         }
 
         private void addTask(Callable<Void> task){
+            checkIfClosed(true);
             queue.add(task);
             currentTask.onComplete(completionHandler);
         }
 
+        private void checkIfClosed(boolean throwException) {
+            if (closed) {
+                IndexCopierClosedException e = new IndexCopierClosedException("IndexCopier found to be closed " +
+                        "while processing" +remote.toString());
+                errorInCopy.set(e);
+                copyDone.countDown();
+
+                if (throwException) {
+                    throw e;
+                }
+            }
+        }
+
         private abstract class COWFileReference {
             protected final String name;
 
@@ -1000,6 +1051,11 @@ public class IndexCopier implements Copy
                 }
             }
         }
+
+        @Override
+        public String toString() {
+            return "DeleteOldDirOnClose wrapper for " + getDelegate();
+        }
     }
     
     static final class LocalIndexFile {

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java?rev=1691167&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java Wed Jul 15 11:19:59 2015
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+public class IndexCopierClosedException extends RuntimeException{
+    public IndexCopierClosedException(String message) {
+        super(message);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierClosedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Wed Jul 15 11:19:59 2015
@@ -184,7 +184,7 @@ public class LuceneIndexProviderService
     }
 
     @Deactivate
-    private void deactivate() throws InterruptedException {
+    private void deactivate() throws InterruptedException, IOException {
         for (ServiceRegistration reg : regs) {
             reg.unregister();
         }
@@ -202,6 +202,11 @@ public class LuceneIndexProviderService
             indexProvider = null;
         }
 
+        //Close the copier first i.e. before executorService
+        if (indexCopier != null){
+            indexCopier.close();
+        }
+
         if (executorService != null){
             executorService.shutdown();
             executorService.awaitTermination(1, TimeUnit.MINUTES);

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java Wed Jul 15 11:19:59 2015
@@ -166,6 +166,11 @@ class OakDirectory extends Directory {
         return lockFactory;
     }
 
+    @Override
+    public String toString() {
+        return "Directory for " + definition.getIndexName();
+    }
+
     private Set<String> getListing(){
         long start = PERF_LOGGER.start();
         Iterable<String> fileNames = null;

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1691167&r1=1691166&r2=1691167&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Wed Jul 15 11:19:59 2015
@@ -24,7 +24,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
@@ -37,6 +36,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.openmbean.TabularData;
 
@@ -69,6 +70,7 @@ import static org.apache.jackrabbit.oak.
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
@@ -838,8 +840,58 @@ public class IndexCopierTest {
     }
 
     @Test
-    public void cowIndexPathNotDefined() throws Exception{
+    public void cowPoolClosedWithTaskInQueue() throws Exception{
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        Directory baseDir = new CloseSafeDir();
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, executorService, getWorkDir());
+
+        final Set<String> toPause = Sets.newHashSet();
+        final CountDownLatch pauseCopyLatch = new CountDownLatch(1);
+        Directory remote = new CloseSafeDir() {
+            @Override
+            public IndexOutput createOutput(String name, IOContext context) throws IOException {
+                if (toPause.contains(name)){
+                    try {
+                        pauseCopyLatch.await();
+                    } catch (InterruptedException ignore) {
+
+                    }
+                }
+                return super.createOutput(name, context);
+            }
+        };
+
+        final Directory local = copier.wrapForWrite(defn, remote, false);
+        toPause.add("t2");
+        byte[] t1 = writeFile(local, "t1");
+        byte[] t2 = writeFile(local, "t2");
+        byte[] t3 = writeFile(local, "t3");
+        byte[] t4 = writeFile(local, "t4");
+
+        final AtomicReference<Throwable> error =
+                new AtomicReference<Throwable>();
+        Thread closer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    local.close();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                    error.set(e);
+                }
+            }
+        });
+
+        closer.start();
+
+        copier.close();
+        executorService.shutdown();
+        executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
 
+        pauseCopyLatch.countDown();
+        closer.join();
+        assertNotNull("Close should have thrown an exception", error.get());
     }
 
     private byte[] writeFile(Directory dir, String name) throws IOException {