You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2010/09/09 17:01:05 UTC

svn commit: r995451 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: AbstractIndex.java DynamicPooledExecutor.java

Author: jukka
Date: Thu Sep  9 15:01:04 2010
New Revision: 995451

URL: http://svn.apache.org/viewvc?rev=995451&view=rev
Log:
JCR-2089: Use java.util.concurrent

Use java.util.concurrent constructs in DynamicPooledExecutor.

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java?rev=995451&r1=995450&r2=995451&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java Thu Sep  9 15:01:04 2010
@@ -16,26 +16,30 @@
  */
 package org.apache.jackrabbit.core.query.lucene;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.store.Directory;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
 import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.tika.io.IOExceptionWithCause;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.BitSet;
-import java.util.List;
-
 /**
  * Implements common functionality for a lucene index.
  * <p/>
@@ -171,44 +175,48 @@ abstract class AbstractIndex {
      * @throws IOException if an error occurs while writing to the index.
      */
     void addDocuments(Document[] docs) throws IOException {
+        final List<IOException> exceptions =
+            Collections.synchronizedList(new ArrayList<IOException>());
+        final CountDownLatch latch = new CountDownLatch(docs.length);
+
         final IndexWriter writer = getIndexWriter();
-        DynamicPooledExecutor.Command[] commands =
-                new DynamicPooledExecutor.Command[docs.length];
-        for (int i = 0; i < docs.length; i++) {
-            // check if text extractor completed its work
-            final Document doc = getFinishedDocument(docs[i]);
-            // create a command for inverting the document
-            commands[i] = new DynamicPooledExecutor.Command() {
-                public Object call() throws Exception {
-                    long time = System.currentTimeMillis();
-                    writer.addDocument(doc);
-                    return System.currentTimeMillis() - time;
-                }
-            };
-        }
-        DynamicPooledExecutor.Result[] results = EXECUTOR.executeAndWait(commands);
-        invalidateSharedReader();
-        IOException ex = null;
-        for (DynamicPooledExecutor.Result result : results) {
-            if (result.getException() != null) {
-                Throwable cause = result.getException().getCause();
-                if (ex == null) {
-                    // only throw the first exception
-                    if (cause instanceof IOException) {
-                        ex = (IOException) cause;
-                    } else {
-                        throw Util.createIOException(cause);
+        for (final Document doc : docs) {
+            EXECUTOR.execute(new Runnable() {
+                public void run() {
+                    try {
+                        // check if text extractor completed its work
+                        Document document = getFinishedDocument(doc);
+                        if (log.isDebugEnabled()) {
+                            long start = System.nanoTime();
+                            writer.addDocument(document);
+                            log.debug("Inverted a document in {}us",
+                                    (System.nanoTime() - start) / 1000);
+                        } else {
+                            writer.addDocument(document);
+                        }
+                    } catch (IOException e) {
+                        log.warn("Exception while inverting a document", e);
+                        exceptions.add(e);
+                    } finally {
+                        latch.countDown();
                     }
-                } else {
-                    // all others are logged
-                    log.warn("Exception while inverting document", cause);
                 }
-            } else {
-                log.debug("Inverted document in {} ms", result.get());
-            }
+            });
+        }
+
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new IOExceptionWithCause(
+                    "Wait for background indexing tasks was interrupted", e);
+        } finally {
+            invalidateSharedReader();
         }
-        if (ex != null) {
-            throw ex;
+
+        if (!exceptions.isEmpty()) {
+            throw new IOExceptionWithCause(
+                    exceptions.size() + " of " + docs.length
+                    + " background indexer tasks failed", exceptions.get(0));
         }
     }
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java?rev=995451&r1=995450&r2=995451&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java Thu Sep  9 15:01:04 2010
@@ -16,41 +16,51 @@
  */
 package org.apache.jackrabbit.core.query.lucene;
 
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import EDU.oswego.cs.dl.util.concurrent.FutureResult;
-import EDU.oswego.cs.dl.util.concurrent.Callable;
-
-import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <code>DynamicPooledExecutor</code> implements an executor, which dynamically
  * adjusts its maximum number of threads according to the number of available
  * processors returned by {@link Runtime#availableProcessors()}.
  */
-public class DynamicPooledExecutor {
+public class DynamicPooledExecutor implements Executor {
 
     /**
      * The underlying pooled executor.
      */
-    private final PooledExecutor executor;
+    private final ThreadPoolExecutor executor;
 
     /**
-     * Timestamp when the pool size was last checked.
+     * The time (in milliseconds) when the pool size was last checked.
      */
-    private volatile long lastCheck;
+    private long lastCheck;
 
     /**
-     * The number of processors.
+     * Creates a new DynamicPooledExecutor.
      */
-    private volatile int numProcessors;
+    public DynamicPooledExecutor() {
+        this.executor = new ThreadPoolExecutor(
+                0, Runtime.getRuntime().availableProcessors(),
+                500, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>());
+        this.lastCheck = System.currentTimeMillis();
+    }
 
     /**
-     * Creates a new DynamicPooledExecutor.
+     * Adjusts the pool size at most once every second.
      */
-    public DynamicPooledExecutor() {
-        executor = new PooledExecutor();
-        executor.setKeepAliveTime(500);
-        adjustPoolSize();
+    private synchronized void adjustPoolSize() {
+        long now = System.currentTimeMillis();
+        if (lastCheck + 1000 < now) {
+            int n = Runtime.getRuntime().availableProcessors();
+            if (n != executor.getMaximumPoolSize()) {
+                executor.setMaximumPoolSize(n);
+            }
+            lastCheck = now;
+        }
     }
 
     /**
@@ -64,139 +74,12 @@ public class DynamicPooledExecutor {
      */
     public void execute(Runnable command) {
         adjustPoolSize();
-        if (numProcessors == 1) {
+        if (executor.getMaximumPoolSize() == 1) {
             // if there is only one processor execute with current thread
             command.run();
         } else {
-            try {
-                executor.execute(command);
-            } catch (InterruptedException e) {
-                // run with current thread instead
-                command.run();
-            }
+            executor.execute(command);
         }
     }
 
-    /**
-     * Executes a set of commands and waits until all commands have been
-     * executed. The results of the commands are returned in the same order as
-     * the commands.
-     *
-     * @param commands the commands to execute.
-     * @return the results.
-     */
-    public Result[] executeAndWait(Command[] commands) {
-        Result[] results = new Result[commands.length];
-        if (numProcessors == 1) {
-            // optimize for one processor
-            for (int i = 0; i < commands.length; i++) {
-                Object obj = null;
-                InvocationTargetException ex = null;
-                try {
-                    obj = commands[i].call();
-                } catch (Exception e) {
-                    ex = new InvocationTargetException(e);
-                }
-                results[i] = new Result(obj, ex);
-            }
-        } else {
-            FutureResult[] futures = new FutureResult[commands.length];
-            for (int i = 0; i < commands.length; i++) {
-                final Command c = commands[i];
-                futures[i] = new FutureResult();
-                Runnable r = futures[i].setter(new Callable() {
-                    public Object call() throws Exception {
-                        return c.call();
-                    }
-                });
-                try {
-                    executor.execute(r);
-                } catch (InterruptedException e) {
-                    // run with current thread instead
-                    r.run();
-                }
-            }
-            // wait for all results
-            boolean interrupted = false;
-            for (int i = 0; i < futures.length; i++) {
-                Object obj = null;
-                InvocationTargetException ex = null;
-                for (;;) {
-                    try {
-                        obj = futures[i].get();
-                    } catch (InterruptedException e) {
-                        interrupted = true;
-                        // reset interrupted status and try again
-                        Thread.interrupted();
-                        continue;
-                    } catch (InvocationTargetException e) {
-                        ex = e;
-                    }
-                    results[i] = new Result(obj, ex);
-                    break;
-                }
-            }
-            if (interrupted) {
-                // restore interrupt status again
-                Thread.currentThread().interrupt();
-            }
-        }
-        return results;
-    }
-
-    /**
-     * Adjusts the pool size at most once every second.
-     */
-    private void adjustPoolSize() {
-        if (lastCheck + 1000 < System.currentTimeMillis()) {
-            int n = Runtime.getRuntime().availableProcessors();
-            if (numProcessors != n) {
-                executor.setMaximumPoolSize(n);
-                numProcessors = n;
-            }
-            lastCheck = System.currentTimeMillis();
-        }
-    }
-
-    public interface Command {
-
-        /**
-         * Perform some action that returns a result or throws an exception
-         */
-        Object call() throws Exception;
-    }
-
-    public static class Result {
-
-        /**
-         * The result object or <code>null</code> if an exception was thrown.
-         */
-        private final Object object;
-
-        /**
-         * The exception or <code>null</code> if no exception was thrown.
-         */
-        private final InvocationTargetException exception;
-
-        private Result(Object object, InvocationTargetException exception) {
-            this.object = object;
-            this.exception = exception;
-        }
-
-        /**
-         * @return the result object or <code>null</code> if an exception was
-         *         thrown.
-         */
-        public Object get() {
-            return object;
-        }
-
-        /**
-         * @return the exception or <code>null</code> if no exception was
-         *         thrown.
-         */
-        public InvocationTargetException getException() {
-            return exception;
-        }
-    }
 }