You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by al...@apache.org on 2011/11/15 14:53:42 UTC

svn commit: r1202192 - in /jackrabbit/trunk/jackrabbit-core/src: main/java/org/apache/jackrabbit/core/ main/java/org/apache/jackrabbit/core/query/lucene/ test/java/org/apache/jackrabbit/core/ test/java/org/apache/jackrabbit/core/query/ test/java/org/ap...

Author: alexparvulescu
Date: Tue Nov 15 13:53:42 2011
New Revision: 1202192

URL: http://svn.apache.org/viewvc?rev=1202192&view=rev
Log:
JCR-3146 Text extraction may congest thread pool in the repository

Added:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java   (with props)
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java Tue Nov 15 13:53:42 2011
@@ -17,17 +17,29 @@
 package org.apache.jackrabbit.core;
 
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Thread pool used by the repository.
  */
 class JackrabbitThreadPool extends ScheduledThreadPoolExecutor {
 
     /**
+     * The logger instance for this class.
+     */
+    private static final Logger log = LoggerFactory
+            .getLogger(JackrabbitThreadPool.class);
+
+    /**
      * Size of the per-repository thread pool.
      */
     private static final int size =
@@ -64,14 +76,145 @@ class JackrabbitThreadPool extends Sched
     /**
      * Handler for tasks for which no free thread is found within the pool.
      */
-    private static final RejectedExecutionHandler handler =
-            new ThreadPoolExecutor.CallerRunsPolicy();
+    private static final RejectedExecutionHandler handler = new CallerRunsPolicy();
+
+    /**
+     * Property to control the value at which the thread pool starts to schedule
+     * the {@link LowPriorityTask} tasks for later execution.
+     * 
+     * Set to <code>0</code> to disable the check
+     * 
+     * Default value is 0 (check is disabled).
+     * 
+     */
+    public static final String MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY = "org.apache.jackrabbit.core.JackrabbitThreadPool.maxLoadForLowPriorityTasks";
+
+    /**
+     * @see #MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY
+     */
+    private final static Integer maxLoadForLowPriorityTasks = getMaxLoadForLowPriorityTasks();
+
+    private static int getMaxLoadForLowPriorityTasks() {
+        final int defaultMaxLoad = 75;
+        int max = Integer.getInteger(MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY,
+                defaultMaxLoad);
+        if (max < 0 || max > 100) {
+            return defaultMaxLoad;
+        }
+        return max;
+    }
+
+    /**
+     * Queue where all the {@link LowPriorityTask} tasks go for later execution
+     */
+    private final BlockingQueue<Runnable> lowPriorityTasksQueue = new LinkedBlockingQueue<Runnable>();
+
+    /**
+     * Tasks that handles the scheduling and the execution of
+     * {@link LowPriorityTask} tasks
+     */
+    private final RetryLowPriorityTask retryTask;
 
     /**
      * Creates a new thread pool.
      */
     public JackrabbitThreadPool() {
         super(size, factory, handler);
+        retryTask = new RetryLowPriorityTask(this, lowPriorityTasksQueue);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        if (command instanceof LowPriorityTask) {
+            scheduleLowPriority(command);
+            return;
+        }
+        super.execute(command);
     }
 
+    private void scheduleLowPriority(Runnable command) {
+        if (isOverDefinedMaxLoad()) {
+            lowPriorityTasksQueue.add(command);
+            retryTask.retryLater();
+            return;
+        }
+        super.execute(command);
+    }
+
+    /**
+     * compares the current load of the executor with the defined
+     * <code>{@link #maxLoadForLowPriorityTasks}</code> parameter.
+     * 
+     * Used to determine if the executor can handle additional
+     * {@link LowPriorityTask} tasks.
+     * 
+     * @return true if the load is under the
+     *         <code>{@link #maxLoadForLowPriorityTasks}</code> parameter
+     */
+    private boolean isOverDefinedMaxLoad() {
+        if (maxLoadForLowPriorityTasks == 0) {
+            return false;
+        }
+        double currentLoad = ((double) getActiveCount()) / getPoolSize() * 100;
+        return currentLoad > maxLoadForLowPriorityTasks;
+    }
+
+    /**
+     * TEST ONLY
+     * 
+     * @return the number of low priority tasks that are waiting in the queue
+     */
+    int getPendingLowPriorityTaskCount() {
+        return lowPriorityTasksQueue.size();
+    }
+
+    private static final class RetryLowPriorityTask implements Runnable {
+
+        /**
+         * schedule interval in ms for delayed tasks
+         */
+        private static final int LATER_MS = 50;
+
+        private final JackrabbitThreadPool executor;
+        private final BlockingQueue<Runnable> lowPriorityTasksQueue;
+
+        /**
+         * flag to indicate that another execute has been scheduled or is
+         * currently running.
+         */
+        private final AtomicBoolean retryPending;
+
+        public RetryLowPriorityTask(JackrabbitThreadPool executor,
+                BlockingQueue<Runnable> lowPriorityTasksQueue) {
+            this.executor = executor;
+            this.lowPriorityTasksQueue = lowPriorityTasksQueue;
+            this.retryPending = new AtomicBoolean(false);
+        }
+
+        public void retryLater() {
+            if (!retryPending.getAndSet(true)) {
+                executor.schedule(this, LATER_MS, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        public void run() {
+            int count = 0;
+            while (!executor.isOverDefinedMaxLoad()) {
+                Runnable r = lowPriorityTasksQueue.poll();
+                if (r == null) {
+                    log.debug("Executed {} low priority tasks.", count);
+                    break;
+                }
+                count++;
+                executor.execute(r);
+            }
+            retryPending.set(false);
+            if (!lowPriorityTasksQueue.isEmpty()) {
+                log.debug(
+                        "Executor is under load, will schedule {} remaining tasks for {} ms later",
+                        lowPriorityTasksQueue.size(), LATER_MS);
+                retryLater();
+            }
+        }
+    }
 }

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java?rev=1202192&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java Tue Nov 15 13:53:42 2011
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core;
+
+/**
+ * Interface for low priority tasks (like text extraction) that can be scheduled
+ * later based on the extractor's load
+ * 
+ * @see <a href="https://issues.apache.org/jira/browse/JCR-3146">JCR-3146</a>.
+ */
+public interface LowPriorityTask extends Runnable {
+}
\ No newline at end of file

Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java Tue Nov 15 13:53:42 2011
@@ -20,6 +20,7 @@ import java.io.InputStream;
 import java.io.Reader;
 import java.util.concurrent.Executor;
 
+import org.apache.jackrabbit.core.LowPriorityTask;
 import org.apache.jackrabbit.core.value.InternalValue;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.document.AbstractField;
@@ -147,7 +148,7 @@ public class LazyTextExtractorField exte
     /**
      * The background task for extracting text from a binary value.
      */
-    private class ParsingTask extends DefaultHandler implements Runnable {
+    private class ParsingTask extends DefaultHandler implements LowPriorityTask {
 
         private final Parser parser;
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Tue Nov 15 13:53:42 2011
@@ -1011,7 +1011,9 @@ public class MultiIndex {
         synchronized (iq) {
             while (iq.getNumPendingDocuments() > 0 || indexingQueueCommitPending) {
                 try {
-                    log.debug("waiting for indexing queue to become empty");
+                    log.debug(
+                            "waiting for indexing queue to become empty. {} pending docs.",
+                            iq.getNumPendingDocuments());
                     iq.wait();
                     log.debug("notified");
                 } catch (InterruptedException e) {

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java Tue Nov 15 13:53:42 2011
@@ -52,8 +52,7 @@ public class Util {
      * @param old the document to dispose.
      */
     public static void disposeDocument(Document old) {
-        for (Object o : old.getFields()) {
-            Fieldable f = (Fieldable) o;
+        for (Fieldable f : old.getFields()) {
             try {
                 if (f.readerValue() != null) {
                     f.readerValue().close();
@@ -76,8 +75,7 @@ public class Util {
      *         otherwise.
      */
     public static boolean isDocumentReady(Document doc) {
-        for (Object o : doc.getFields()) {
-            Fieldable f = (Fieldable) o;
+        for (Fieldable f : doc.getFields()) {
             if (f instanceof LazyTextExtractorField) {
                 LazyTextExtractorField field = (LazyTextExtractorField) f;
                 if (!field.isExtractorFinished()) {

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java Tue Nov 15 13:53:42 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.core;
 
+import java.util.concurrent.TimeUnit;
+
 import javax.jcr.Repository;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
@@ -97,4 +99,17 @@ public class TestHelper {
             }
         }
     }
+
+    /**
+     * wait for async text-extraction tasks to finish
+     */
+    public static void waitForTextExtractionTasksToFinish(Session session) throws Exception {
+        final RepositoryContext context = JackrabbitRepositoryStub
+                .getRepositoryContext(session.getRepository());
+        JackrabbitThreadPool jtp = ((JackrabbitThreadPool) context
+                .getExecutor());
+        while (jtp.getPendingLowPriorityTaskCount() != 0) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+    }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java Tue Nov 15 13:53:42 2011
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.core.query
 import javax.jcr.Node;
 import javax.jcr.Session;
 
+import org.apache.jackrabbit.core.TestHelper;
+
 /**
  * <code>AbstractIndexingTest</code> is a base class for all indexing
  * configuration tests.
@@ -52,4 +54,12 @@ public class AbstractIndexingTest extend
     protected String getWorkspaceName() {
         return WORKSPACE_NAME;
     }
+
+    /**
+     * wait for async text-extraction tasks to finish
+     */
+    protected void waitForTextExtractionTasksToFinish() throws Exception {
+        TestHelper.waitForTextExtractionTasksToFinish(session);
+        getSearchIndex().flush();
+    }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java Tue Nov 15 13:53:42 2011
@@ -40,7 +40,7 @@ import org.apache.jackrabbit.core.query.
 public class IndexingQueueTest extends AbstractIndexingTest {
 
     private static final File TEMP_DIR =
-        new File(System.getProperty("java.io.tmpdir")); 
+        new File(System.getProperty("java.io.tmpdir"));
 
     public void testQueue() throws Exception {
         SearchIndex index = getSearchIndex();
@@ -63,7 +63,7 @@ public class IndexingQueueTest extends A
         assertFalse(nodes.hasNext());
 
         BlockingParser.unblock();
-        index.flush();
+        waitForTextExtractionTasksToFinish();
         assertEquals(0, queue.getNumPendingDocuments());
 
         q = qm.createQuery(testPath + "/*[jcr:contains(., 'fox')]", Query.XPATH);
@@ -121,7 +121,7 @@ public class IndexingQueueTest extends A
         }
 
         qm = session.getWorkspace().getQueryManager();
-        getSearchIndex().flush();
+        waitForTextExtractionTasksToFinish();
 
         String stmt = testPath + "//element(*, nt:resource)[jcr:contains(., 'fox')] order by @jcr:score descending";
         Query q = qm.createQuery(stmt, Query.XPATH);

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java Tue Nov 15 13:53:42 2011
@@ -131,6 +131,7 @@ public class SQL2IndexingAggregateTest2 
                 .createBinary(new ByteArrayInputStream(out.toByteArray())));
 
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, expectedNodes.toArray(new Node[] {}));
 
         // update jcr:data
@@ -140,6 +141,7 @@ public class SQL2IndexingAggregateTest2 
         resource.setProperty("jcr:data", session.getValueFactory()
                 .createBinary(new ByteArrayInputStream(out.toByteArray())));
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, new Node[] {});
         executeSQL2Query(sqlCat, expectedNodes.toArray(new Node[] {}));
 
@@ -149,12 +151,14 @@ public class SQL2IndexingAggregateTest2 
         Node foo = unstrContent.addNode("foo");
         foo.setProperty("text", "the quick brown fox jumps over the lazy dog.");
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, expectedNodes.toArray(new Node[] {}));
         executeSQL2Query(sqlCat, new Node[] {});
 
         // remove foo
         foo.remove();
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, new Node[] {});
         executeSQL2Query(sqlCat, new Node[] {});