You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2010/09/09 17:01:05 UTC
svn commit: r995451 - in
/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene:
AbstractIndex.java DynamicPooledExecutor.java
Author: jukka
Date: Thu Sep 9 15:01:04 2010
New Revision: 995451
URL: http://svn.apache.org/viewvc?rev=995451&view=rev
Log:
JCR-2089: Use java.util.concurrent
Use java.util.concurrent constructs in DynamicPooledExecutor.
Modified:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java?rev=995451&r1=995450&r2=995451&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java Thu Sep 9 15:01:04 2010
@@ -16,26 +16,30 @@
*/
package org.apache.jackrabbit.core.query.lucene;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.store.Directory;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.tika.io.IOExceptionWithCause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.BitSet;
-import java.util.List;
-
/**
* Implements common functionality for a lucene index.
* <p/>
@@ -171,44 +175,48 @@ abstract class AbstractIndex {
* @throws IOException if an error occurs while writing to the index.
*/
void addDocuments(Document[] docs) throws IOException {
+ final List<IOException> exceptions =
+ Collections.synchronizedList(new ArrayList<IOException>());
+ final CountDownLatch latch = new CountDownLatch(docs.length);
+
final IndexWriter writer = getIndexWriter();
- DynamicPooledExecutor.Command[] commands =
- new DynamicPooledExecutor.Command[docs.length];
- for (int i = 0; i < docs.length; i++) {
- // check if text extractor completed its work
- final Document doc = getFinishedDocument(docs[i]);
- // create a command for inverting the document
- commands[i] = new DynamicPooledExecutor.Command() {
- public Object call() throws Exception {
- long time = System.currentTimeMillis();
- writer.addDocument(doc);
- return System.currentTimeMillis() - time;
- }
- };
- }
- DynamicPooledExecutor.Result[] results = EXECUTOR.executeAndWait(commands);
- invalidateSharedReader();
- IOException ex = null;
- for (DynamicPooledExecutor.Result result : results) {
- if (result.getException() != null) {
- Throwable cause = result.getException().getCause();
- if (ex == null) {
- // only throw the first exception
- if (cause instanceof IOException) {
- ex = (IOException) cause;
- } else {
- throw Util.createIOException(cause);
+ for (final Document doc : docs) {
+ EXECUTOR.execute(new Runnable() {
+ public void run() {
+ try {
+ // check if text extractor completed its work
+ Document document = getFinishedDocument(doc);
+ if (log.isDebugEnabled()) {
+ long start = System.nanoTime();
+ writer.addDocument(document);
+ log.debug("Inverted a document in {}us",
+ (System.nanoTime() - start) / 1000);
+ } else {
+ writer.addDocument(document);
+ }
+ } catch (IOException e) {
+ log.warn("Exception while inverting a document", e);
+ exceptions.add(e);
+ } finally {
+ latch.countDown();
}
- } else {
- // all others are logged
- log.warn("Exception while inverting document", cause);
}
- } else {
- log.debug("Inverted document in {} ms", result.get());
- }
+ });
+ }
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new IOExceptionWithCause(
+ "Wait for background indexing tasks was interrupted", e);
+ } finally {
+ invalidateSharedReader();
}
- if (ex != null) {
- throw ex;
+
+ if (!exceptions.isEmpty()) {
+ throw new IOExceptionWithCause(
+ exceptions.size() + " of " + docs.length
+ + " background indexer tasks failed", exceptions.get(0));
}
}
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java?rev=995451&r1=995450&r2=995451&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java Thu Sep 9 15:01:04 2010
@@ -16,41 +16,51 @@
*/
package org.apache.jackrabbit.core.query.lucene;
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import EDU.oswego.cs.dl.util.concurrent.FutureResult;
-import EDU.oswego.cs.dl.util.concurrent.Callable;
-
-import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* <code>DynamicPooledExecutor</code> implements an executor, which dynamically
* adjusts its maximum number of threads according to the number of available
* processors returned by {@link Runtime#availableProcessors()}.
*/
-public class DynamicPooledExecutor {
+public class DynamicPooledExecutor implements Executor {
/**
* The underlying pooled executor.
*/
- private final PooledExecutor executor;
+ private final ThreadPoolExecutor executor;
/**
- * Timestamp when the pool size was last checked.
+ * The time (in milliseconds) when the pool size was last checked.
*/
- private volatile long lastCheck;
+ private long lastCheck;
/**
- * The number of processors.
+ * Creates a new DynamicPooledExecutor.
*/
- private volatile int numProcessors;
+ public DynamicPooledExecutor() {
+ this.executor = new ThreadPoolExecutor(
+ 0, Runtime.getRuntime().availableProcessors(),
+ 500, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ this.lastCheck = System.currentTimeMillis();
+ }
/**
- * Creates a new DynamicPooledExecutor.
+ * Adjusts the pool size at most once every second.
*/
- public DynamicPooledExecutor() {
- executor = new PooledExecutor();
- executor.setKeepAliveTime(500);
- adjustPoolSize();
+ private synchronized void adjustPoolSize() {
+ long now = System.currentTimeMillis();
+ if (lastCheck + 1000 < now) {
+ int n = Runtime.getRuntime().availableProcessors();
+ if (n != executor.getMaximumPoolSize()) {
+ executor.setMaximumPoolSize(n);
+ }
+ lastCheck = now;
+ }
}
/**
@@ -64,139 +74,12 @@ public class DynamicPooledExecutor {
*/
public void execute(Runnable command) {
adjustPoolSize();
- if (numProcessors == 1) {
+ if (executor.getMaximumPoolSize() == 1) {
// if there is only one processor execute with current thread
command.run();
} else {
- try {
- executor.execute(command);
- } catch (InterruptedException e) {
- // run with current thread instead
- command.run();
- }
+ executor.execute(command);
}
}
- /**
- * Executes a set of commands and waits until all commands have been
- * executed. The results of the commands are returned in the same order as
- * the commands.
- *
- * @param commands the commands to execute.
- * @return the results.
- */
- public Result[] executeAndWait(Command[] commands) {
- Result[] results = new Result[commands.length];
- if (numProcessors == 1) {
- // optimize for one processor
- for (int i = 0; i < commands.length; i++) {
- Object obj = null;
- InvocationTargetException ex = null;
- try {
- obj = commands[i].call();
- } catch (Exception e) {
- ex = new InvocationTargetException(e);
- }
- results[i] = new Result(obj, ex);
- }
- } else {
- FutureResult[] futures = new FutureResult[commands.length];
- for (int i = 0; i < commands.length; i++) {
- final Command c = commands[i];
- futures[i] = new FutureResult();
- Runnable r = futures[i].setter(new Callable() {
- public Object call() throws Exception {
- return c.call();
- }
- });
- try {
- executor.execute(r);
- } catch (InterruptedException e) {
- // run with current thread instead
- r.run();
- }
- }
- // wait for all results
- boolean interrupted = false;
- for (int i = 0; i < futures.length; i++) {
- Object obj = null;
- InvocationTargetException ex = null;
- for (;;) {
- try {
- obj = futures[i].get();
- } catch (InterruptedException e) {
- interrupted = true;
- // reset interrupted status and try again
- Thread.interrupted();
- continue;
- } catch (InvocationTargetException e) {
- ex = e;
- }
- results[i] = new Result(obj, ex);
- break;
- }
- }
- if (interrupted) {
- // restore interrupt status again
- Thread.currentThread().interrupt();
- }
- }
- return results;
- }
-
- /**
- * Adjusts the pool size at most once every second.
- */
- private void adjustPoolSize() {
- if (lastCheck + 1000 < System.currentTimeMillis()) {
- int n = Runtime.getRuntime().availableProcessors();
- if (numProcessors != n) {
- executor.setMaximumPoolSize(n);
- numProcessors = n;
- }
- lastCheck = System.currentTimeMillis();
- }
- }
-
- public interface Command {
-
- /**
- * Perform some action that returns a result or throws an exception
- */
- Object call() throws Exception;
- }
-
- public static class Result {
-
- /**
- * The result object or <code>null</code> if an exception was thrown.
- */
- private final Object object;
-
- /**
- * The exception or <code>null</code> if no exception was thrown.
- */
- private final InvocationTargetException exception;
-
- private Result(Object object, InvocationTargetException exception) {
- this.object = object;
- this.exception = exception;
- }
-
- /**
- * @return the result object or <code>null</code> if an exception was
- * thrown.
- */
- public Object get() {
- return object;
- }
-
- /**
- * @return the exception or <code>null</code> if no exception was
- * thrown.
- */
- public InvocationTargetException getException() {
- return exception;
- }
- }
}