You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by al...@apache.org on 2011/11/15 14:53:42 UTC
svn commit: r1202192 - in /jackrabbit/trunk/jackrabbit-core/src:
main/java/org/apache/jackrabbit/core/
main/java/org/apache/jackrabbit/core/query/lucene/
test/java/org/apache/jackrabbit/core/
test/java/org/apache/jackrabbit/core/query/ test/java/org/ap...
Author: alexparvulescu
Date: Tue Nov 15 13:53:42 2011
New Revision: 1202192
URL: http://svn.apache.org/viewvc?rev=1202192&view=rev
Log:
JCR-3146 Text extraction may congest thread pool in the repository
Added:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java (with props)
Modified:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java Tue Nov 15 13:53:42 2011
@@ -17,17 +17,29 @@
package org.apache.jackrabbit.core;
import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Thread pool used by the repository.
*/
class JackrabbitThreadPool extends ScheduledThreadPoolExecutor {
/**
+ * The logger instance for this class.
+ */
+ private static final Logger log = LoggerFactory
+ .getLogger(JackrabbitThreadPool.class);
+
+ /**
* Size of the per-repository thread pool.
*/
private static final int size =
@@ -64,14 +76,145 @@ class JackrabbitThreadPool extends Sched
/**
* Handler for tasks for which no free thread is found within the pool.
*/
- private static final RejectedExecutionHandler handler =
- new ThreadPoolExecutor.CallerRunsPolicy();
+ private static final RejectedExecutionHandler handler = new CallerRunsPolicy();
+
+ /**
+ * Property to control the value at which the thread pool starts to schedule
+ * the {@link LowPriorityTask} tasks for later execution.
+ *
+ * Set to <code>0</code> to disable the check
+ *
+ * Default value is 0 (check is disabled).
+ *
+ */
+ public static final String MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY = "org.apache.jackrabbit.core.JackrabbitThreadPool.maxLoadForLowPriorityTasks";
+
+ /**
+ * @see #MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY
+ */
+ private final static Integer maxLoadForLowPriorityTasks = getMaxLoadForLowPriorityTasks();
+
+ private static int getMaxLoadForLowPriorityTasks() {
+ final int defaultMaxLoad = 75;
+ int max = Integer.getInteger(MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY,
+ defaultMaxLoad);
+ if (max < 0 || max > 100) {
+ return defaultMaxLoad;
+ }
+ return max;
+ }
+
+ /**
+ * Queue where all the {@link LowPriorityTask} tasks go for later execution
+ */
+ private final BlockingQueue<Runnable> lowPriorityTasksQueue = new LinkedBlockingQueue<Runnable>();
+
+ /**
+ * Tasks that handles the scheduling and the execution of
+ * {@link LowPriorityTask} tasks
+ */
+ private final RetryLowPriorityTask retryTask;
/**
* Creates a new thread pool.
*/
public JackrabbitThreadPool() {
super(size, factory, handler);
+ retryTask = new RetryLowPriorityTask(this, lowPriorityTasksQueue);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (command instanceof LowPriorityTask) {
+ scheduleLowPriority(command);
+ return;
+ }
+ super.execute(command);
}
+ private void scheduleLowPriority(Runnable command) {
+ if (isOverDefinedMaxLoad()) {
+ lowPriorityTasksQueue.add(command);
+ retryTask.retryLater();
+ return;
+ }
+ super.execute(command);
+ }
+
+ /**
+ * compares the current load of the executor with the defined
+ * <code>{@link #maxLoadForLowPriorityTasks}</code> parameter.
+ *
+ * Used to determine if the executor can handle additional
+ * {@link LowPriorityTask} tasks.
+ *
+ * @return true if the load is under the
+ * <code>{@link #maxLoadForLowPriorityTasks}</code> parameter
+ */
+ private boolean isOverDefinedMaxLoad() {
+ if (maxLoadForLowPriorityTasks == 0) {
+ return false;
+ }
+ double currentLoad = ((double) getActiveCount()) / getPoolSize() * 100;
+ return currentLoad > maxLoadForLowPriorityTasks;
+ }
+
+ /**
+ * TEST ONLY
+ *
+ * @return the number of low priority tasks that are waiting in the queue
+ */
+ int getPendingLowPriorityTaskCount() {
+ return lowPriorityTasksQueue.size();
+ }
+
+ private static final class RetryLowPriorityTask implements Runnable {
+
+ /**
+ * schedule interval in ms for delayed tasks
+ */
+ private static final int LATER_MS = 50;
+
+ private final JackrabbitThreadPool executor;
+ private final BlockingQueue<Runnable> lowPriorityTasksQueue;
+
+ /**
+ * flag to indicate that another execute has been scheduled or is
+ * currently running.
+ */
+ private final AtomicBoolean retryPending;
+
+ public RetryLowPriorityTask(JackrabbitThreadPool executor,
+ BlockingQueue<Runnable> lowPriorityTasksQueue) {
+ this.executor = executor;
+ this.lowPriorityTasksQueue = lowPriorityTasksQueue;
+ this.retryPending = new AtomicBoolean(false);
+ }
+
+ public void retryLater() {
+ if (!retryPending.getAndSet(true)) {
+ executor.schedule(this, LATER_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void run() {
+ int count = 0;
+ while (!executor.isOverDefinedMaxLoad()) {
+ Runnable r = lowPriorityTasksQueue.poll();
+ if (r == null) {
+ log.debug("Executed {} low priority tasks.", count);
+ break;
+ }
+ count++;
+ executor.execute(r);
+ }
+ retryPending.set(false);
+ if (!lowPriorityTasksQueue.isEmpty()) {
+ log.debug(
+ "Executor is under load, will schedule {} remaining tasks for {} ms later",
+ lowPriorityTasksQueue.size(), LATER_MS);
+ retryLater();
+ }
+ }
+ }
}
Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java?rev=1202192&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java Tue Nov 15 13:53:42 2011
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core;
+
+/**
+ * Interface for low priority tasks (like text extraction) that can be scheduled
+ * later based on the extractor's load
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/JCR-3146">JCR-3146</a>.
+ */
+public interface LowPriorityTask extends Runnable {
+}
\ No newline at end of file
Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/LowPriorityTask.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java Tue Nov 15 13:53:42 2011
@@ -20,6 +20,7 @@ import java.io.InputStream;
import java.io.Reader;
import java.util.concurrent.Executor;
+import org.apache.jackrabbit.core.LowPriorityTask;
import org.apache.jackrabbit.core.value.InternalValue;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.AbstractField;
@@ -147,7 +148,7 @@ public class LazyTextExtractorField exte
/**
* The background task for extracting text from a binary value.
*/
- private class ParsingTask extends DefaultHandler implements Runnable {
+ private class ParsingTask extends DefaultHandler implements LowPriorityTask {
private final Parser parser;
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Tue Nov 15 13:53:42 2011
@@ -1011,7 +1011,9 @@ public class MultiIndex {
synchronized (iq) {
while (iq.getNumPendingDocuments() > 0 || indexingQueueCommitPending) {
try {
- log.debug("waiting for indexing queue to become empty");
+ log.debug(
+ "waiting for indexing queue to become empty. {} pending docs.",
+ iq.getNumPendingDocuments());
iq.wait();
log.debug("notified");
} catch (InterruptedException e) {
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java Tue Nov 15 13:53:42 2011
@@ -52,8 +52,7 @@ public class Util {
* @param old the document to dispose.
*/
public static void disposeDocument(Document old) {
- for (Object o : old.getFields()) {
- Fieldable f = (Fieldable) o;
+ for (Fieldable f : old.getFields()) {
try {
if (f.readerValue() != null) {
f.readerValue().close();
@@ -76,8 +75,7 @@ public class Util {
* otherwise.
*/
public static boolean isDocumentReady(Document doc) {
- for (Object o : doc.getFields()) {
- Fieldable f = (Fieldable) o;
+ for (Fieldable f : doc.getFields()) {
if (f instanceof LazyTextExtractorField) {
LazyTextExtractorField field = (LazyTextExtractorField) f;
if (!field.isExtractorFinished()) {
Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/TestHelper.java Tue Nov 15 13:53:42 2011
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.core;
+import java.util.concurrent.TimeUnit;
+
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
@@ -97,4 +99,17 @@ public class TestHelper {
}
}
}
+
+ /**
+ * wait for async text-extraction tasks to finish
+ */
+ public static void waitForTextExtractionTasksToFinish(Session session) throws Exception {
+ final RepositoryContext context = JackrabbitRepositoryStub
+ .getRepositoryContext(session.getRepository());
+ JackrabbitThreadPool jtp = ((JackrabbitThreadPool) context
+ .getExecutor());
+ while (jtp.getPendingLowPriorityTaskCount() != 0) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
}
Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java Tue Nov 15 13:53:42 2011
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.core.query
import javax.jcr.Node;
import javax.jcr.Session;
+import org.apache.jackrabbit.core.TestHelper;
+
/**
* <code>AbstractIndexingTest</code> is a base class for all indexing
* configuration tests.
@@ -52,4 +54,12 @@ public class AbstractIndexingTest extend
protected String getWorkspaceName() {
return WORKSPACE_NAME;
}
+
+ /**
+ * wait for async text-extraction tasks to finish
+ */
+ protected void waitForTextExtractionTasksToFinish() throws Exception {
+ TestHelper.waitForTextExtractionTasksToFinish(session);
+ getSearchIndex().flush();
+ }
}
Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java Tue Nov 15 13:53:42 2011
@@ -40,7 +40,7 @@ import org.apache.jackrabbit.core.query.
public class IndexingQueueTest extends AbstractIndexingTest {
private static final File TEMP_DIR =
- new File(System.getProperty("java.io.tmpdir"));
+ new File(System.getProperty("java.io.tmpdir"));
public void testQueue() throws Exception {
SearchIndex index = getSearchIndex();
@@ -63,7 +63,7 @@ public class IndexingQueueTest extends A
assertFalse(nodes.hasNext());
BlockingParser.unblock();
- index.flush();
+ waitForTextExtractionTasksToFinish();
assertEquals(0, queue.getNumPendingDocuments());
q = qm.createQuery(testPath + "/*[jcr:contains(., 'fox')]", Query.XPATH);
@@ -121,7 +121,7 @@ public class IndexingQueueTest extends A
}
qm = session.getWorkspace().getQueryManager();
- getSearchIndex().flush();
+ waitForTextExtractionTasksToFinish();
String stmt = testPath + "//element(*, nt:resource)[jcr:contains(., 'fox')] order by @jcr:score descending";
Query q = qm.createQuery(stmt, Query.XPATH);
Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java?rev=1202192&r1=1202191&r2=1202192&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java Tue Nov 15 13:53:42 2011
@@ -131,6 +131,7 @@ public class SQL2IndexingAggregateTest2
.createBinary(new ByteArrayInputStream(out.toByteArray())));
testRootNode.getSession().save();
+ waitForTextExtractionTasksToFinish();
executeSQL2Query(sqlDog, expectedNodes.toArray(new Node[] {}));
// update jcr:data
@@ -140,6 +141,7 @@ public class SQL2IndexingAggregateTest2
resource.setProperty("jcr:data", session.getValueFactory()
.createBinary(new ByteArrayInputStream(out.toByteArray())));
testRootNode.getSession().save();
+ waitForTextExtractionTasksToFinish();
executeSQL2Query(sqlDog, new Node[] {});
executeSQL2Query(sqlCat, expectedNodes.toArray(new Node[] {}));
@@ -149,12 +151,14 @@ public class SQL2IndexingAggregateTest2
Node foo = unstrContent.addNode("foo");
foo.setProperty("text", "the quick brown fox jumps over the lazy dog.");
testRootNode.getSession().save();
+ waitForTextExtractionTasksToFinish();
executeSQL2Query(sqlDog, expectedNodes.toArray(new Node[] {}));
executeSQL2Query(sqlCat, new Node[] {});
// remove foo
foo.remove();
testRootNode.getSession().save();
+ waitForTextExtractionTasksToFinish();
executeSQL2Query(sqlDog, new Node[] {});
executeSQL2Query(sqlCat, new Node[] {});