You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ju...@apache.org on 2011/04/12 14:48:17 UTC
svn commit: r1091388 - in /tika/trunk/tika-core/src:
main/java/org/apache/tika/fork/ForkParser.java
main/java/org/apache/tika/fork/ForkServer.java
test/java/org/apache/tika/fork/ForkParserTest.java
test/java/org/apache/tika/fork/ForkTestParser.java
Author: jukka
Date: Tue Apr 12 12:48:17 2011
New Revision: 1091388
URL: http://svn.apache.org/viewvc?rev=1091388&view=rev
Log:
TIKA-639: Maximum pool size for ForkParser
Make the poolSize parameter specify the maximum number of concurrent parser processes.
Modified:
tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java
Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java Tue Apr 12 12:48:17 2011
@@ -47,6 +47,8 @@ public class ForkParser extends Abstract
/** Process pool size */
private int poolSize = 5;
+ private int currentlyInUse = 0;
+
private final Queue<ForkClient> pool =
new LinkedList<ForkClient>();
@@ -148,17 +150,40 @@ public class ForkParser extends Abstract
}
private synchronized ForkClient acquireClient()
- throws IOException {
- ForkClient client = pool.poll();
- if (client == null || !client.ping()) {
- client = new ForkClient(loader, parser, java);
+ throws IOException, TikaException {
+ while (true) {
+ ForkClient client = pool.poll();
+
+ // Create a new process if there's room in the pool
+ if (client == null && currentlyInUse < poolSize) {
+ client = new ForkClient(loader, parser, java);
+ }
+
+ // Ping the process, and get rid of it if it's inactive
+ if (client != null && !client.ping()) {
+ client.close();
+ client = null;
+ }
+
+ if (client != null) {
+ currentlyInUse++;
+ return client;
+ } else if (currentlyInUse >= poolSize) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new TikaException(
+ "Interrupted while waiting for a fork parser", e);
+ }
+ }
}
- return client;
}
private synchronized void releaseClient(ForkClient client, boolean alive) {
- if (pool.size() < poolSize && alive) {
+ currentlyInUse--;
+ if (currentlyInUse + pool.size() < poolSize && alive) {
pool.offer(client);
+ notifyAll();
} else {
client.close();
}
Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java Tue Apr 12 12:48:17 2011
@@ -180,7 +180,7 @@ class ForkServer implements Runnable, Ch
return object;
}
- //-------------------------------------------------------------< Checsum >
+ //------------------------------------------------------------< Checksum >
public void update(int b) {
active = true;
Modified: tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java (original)
+++ tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkParserTest.java Tue Apr 12 12:48:17 2011
@@ -18,6 +18,8 @@ package org.apache.tika.fork;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import junit.framework.TestCase;
@@ -25,6 +27,7 @@ import org.apache.tika.metadata.Metadata
import org.apache.tika.parser.ParseContext;
import org.apache.tika.sax.BodyContentHandler;
import org.xml.sax.ContentHandler;
+import org.xml.sax.helpers.DefaultHandler;
public class ForkParserTest extends TestCase {
@@ -95,4 +98,59 @@ public class ForkParserTest extends Test
}
}
+ public void testPoolSizeReached() throws Exception {
+ final ForkParser parser = new ForkParser(
+ ForkParserTest.class.getClassLoader(),
+ new ForkTestParser());
+ try {
+ final ParseContext context = new ParseContext();
+
+ Thread[] threads = new Thread[parser.getPoolSize()];
+ PipedOutputStream[] pipes = new PipedOutputStream[threads.length];
+ for (int i = 0; i < threads.length; i++) {
+ final PipedInputStream input = new PipedInputStream();
+ pipes[i] = new PipedOutputStream(input);
+ threads[i] = new Thread() {
+ public void run() {
+ try {
+ ContentHandler o = new DefaultHandler();
+ parser.parse(input, o, new Metadata(), context);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ threads[i].start();
+ }
+
+ Thread.sleep(1000);
+ final ContentHandler o = new BodyContentHandler();
+ Thread blocked = new Thread() {
+ public void run() {
+ try {
+ InputStream stream =
+ new ByteArrayInputStream(new byte[0]);
+ parser.parse(stream, o, new Metadata(), context);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ blocked.start();
+
+ Thread.sleep(1000);
+ assertEquals("", o.toString());
+
+ for (int i = 0; i < threads.length; i++) {
+ pipes[i].close();
+ threads[i].join();
+ }
+
+ blocked.join();
+ assertEquals("Hello, World!", o.toString().trim());
+ } finally {
+ parser.close();
+ }
+ }
+
}
Modified: tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java?rev=1091388&r1=1091387&r2=1091388&view=diff
==============================================================================
--- tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java (original)
+++ tika/trunk/tika-core/src/test/java/org/apache/tika/fork/ForkTestParser.java Tue Apr 12 12:48:17 2011
@@ -26,7 +26,6 @@ import org.apache.tika.metadata.Metadata
import org.apache.tika.mime.MediaType;
import org.apache.tika.parser.AbstractParser;
import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
import org.apache.tika.sax.XHTMLContentHandler;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
@@ -44,6 +43,8 @@ class ForkTestParser extends AbstractPar
InputStream stream, ContentHandler handler,
Metadata metadata, ParseContext context)
throws IOException, SAXException, TikaException {
+ stream.read();
+
XHTMLContentHandler xhtml = new XHTMLContentHandler(handler, metadata);
xhtml.startDocument();
char[] ch = "Hello, World!".toCharArray();