You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ju...@apache.org on 2011/04/12 14:48:17 UTC

svn commit: r1091388 - in /tika/trunk/tika-core/src: main/java/org/apache/tika/fork/ForkParser.java main/java/org/apache/tika/fork/ForkServer.java test/java/org/apache/tika/fork/ForkParserTest.java test/java/org/apache/tika/fork/ForkTestParser.java

Author: jukka
Date: Tue Apr 12 12:48:17 2011
New Revision: 1091388

URL: http://svn.apache.org/viewvc?rev=1091388&view=rev
Log:
TIKA-639: Maximum pool size for ForkParser

Make the poolSize parameter specify the maximum number of concurrent parser processes.

Modified:
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
    tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
    tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java Tue Apr 12 12:48:17 2011
@@ -47,6 +47,8 @@ public class ForkParser extends Abstract
     /** Process pool size */
     private int poolSize = 5;
 
+    private int currentlyInUse = 0;
+
     private final Queue<ForkClient> pool =
         new LinkedList<ForkClient>();
 
@@ -148,17 +150,40 @@ public class ForkParser extends Abstract
     }
 
     private synchronized ForkClient acquireClient()
-            throws IOException {
-        ForkClient client = pool.poll();
-        if (client == null || !client.ping()) {
-            client = new ForkClient(loader, parser, java);
+            throws IOException, TikaException {
+        while (true) {
+            ForkClient client = pool.poll();
+
+            // Create a new process if there's room in the pool
+            if (client == null && currentlyInUse < poolSize) {
+                client = new ForkClient(loader, parser, java);
+            }
+
+            // Ping the process, and get rid of it if it's inactive
+            if (client != null && !client.ping()) {
+                client.close();
+                client = null;
+            }
+
+            if (client != null) {
+                currentlyInUse++;
+                return client;
+            } else if (currentlyInUse >= poolSize) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new TikaException(
+                            "Interrupted while waiting for a fork parser", e);
+                }
+            }
         }
-        return client;
     }
 
     private synchronized void releaseClient(ForkClient client, boolean alive) {
-        if (pool.size() < poolSize && alive) {
+        currentlyInUse--;
+        if (currentlyInUse + pool.size() < poolSize && alive) {
             pool.offer(client);
+            notifyAll();
         } else {
             client.close();
         }

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java Tue Apr 12 12:48:17 2011
@@ -180,7 +180,7 @@ class ForkServer implements Runnable, Ch
         return object;
     }
 
-    //-------------------------------------------------------------< Checsum >
+    //------------------------------------------------------------< Checksum >
 
     public void update(int b) {
         active = true;

Modified: tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java (original)
+++ tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java Tue Apr 12 12:48:17 2011
@@ -18,6 +18,8 @@ package org.apache.tika.fork;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 
 import junit.framework.TestCase;
 
@@ -25,6 +27,7 @@ import org.apache.tika.metadata.Metadata
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.sax.BodyContentHandler;
 import org.xml.sax.ContentHandler;
+import org.xml.sax.helpers.DefaultHandler;
 
 public class ForkParserTest extends TestCase {
 
@@ -95,4 +98,59 @@ public class ForkParserTest extends Test
         }
     }
 
+    public void testPoolSizeReached() throws Exception {
+        final ForkParser parser = new ForkParser(
+                ForkParserTest.class.getClassLoader(),
+                new ForkTestParser());
+        try {
+            final ParseContext context = new ParseContext();
+
+            Thread[] threads = new Thread[parser.getPoolSize()];
+            PipedOutputStream[] pipes = new PipedOutputStream[threads.length];
+            for (int i = 0; i < threads.length; i++) {
+                final PipedInputStream input = new PipedInputStream();
+                pipes[i] = new PipedOutputStream(input);
+                threads[i] = new Thread() {
+                    public void run() {
+                        try {
+                            ContentHandler o = new DefaultHandler();
+                            parser.parse(input, o, new Metadata(), context);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                };
+                threads[i].start();
+            }
+
+            Thread.sleep(1000);
+            final ContentHandler o = new BodyContentHandler();
+            Thread blocked = new Thread() {
+                public void run() {
+                    try {
+                        InputStream stream =
+                            new ByteArrayInputStream(new byte[0]);
+                        parser.parse(stream, o, new Metadata(), context);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            blocked.start();
+
+            Thread.sleep(1000);
+            assertEquals("", o.toString());
+
+            for (int i = 0; i < threads.length; i++) {
+                pipes[i].close();
+                threads[i].join();
+            }
+
+            blocked.join();
+            assertEquals("Hello, World!", o.toString().trim());
+        } finally {
+            parser.close();
+        }
+    }
+
 }

Modified: tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java (original)
+++ tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java Tue Apr 12 12:48:17 2011
@@ -26,7 +26,6 @@ import org.apache.tika.metadata.Metadata
 import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AbstractParser;
 import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
 import org.apache.tika.sax.XHTMLContentHandler;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
@@ -44,6 +43,8 @@ class ForkTestParser extends AbstractPar
             InputStream stream, ContentHandler handler,
             Metadata metadata, ParseContext context)
             throws IOException, SAXException, TikaException {
+        stream.read();
+
         XHTMLContentHandler xhtml = new XHTMLContentHandler(handler, metadata);
         xhtml.startDocument();
         char[] ch = "Hello, World!".toCharArray();