You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ju...@apache.org on 2011/01/17 19:50:37 UTC

svn commit: r1060042 - in /tika/trunk/tika-core/src/main/java/org/apache/tika: fork/ parser/

Author: jukka
Date: Mon Jan 17 18:50:37 2011
New Revision: 1060042

URL: http://svn.apache.org/viewvc?rev=1060042&view=rev
Log:
TIKA-416: Out-of-process text extraction

Various cleanups and fixes.

Most of the forked parser functionality is now there, though it still needs some polish.
The most notable missing bit is proper metadata handling across process boundaries.

Modified:
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ClassLoaderResource.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkProxy.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkResource.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkSerializer.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
    tika/trunk/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ClassLoaderResource.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ClassLoaderResource.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ClassLoaderResource.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ClassLoaderResource.java Mon Jan 17 18:50:37 2011
@@ -45,7 +45,6 @@ class ClassLoaderResource implements For
             throws IOException {
         byte type = input.readByte();
         String name = input.readUTF();
-        System.out.println(name);
         if (type == 1) {
             InputStream stream = loader.getResourceAsStream(name);
             if (stream != null) {
@@ -86,7 +85,6 @@ class ClassLoaderResource implements For
             byte[] buffer = new byte[0x10000 - 1];
             int n;
             while ((n = stream.read(buffer)) != -1) {
-                System.out.println(n);
                 output.writeShort(n);
                 output.write(buffer, 0, n);
             }

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkClient.java Mon Jan 17 18:50:37 2011
@@ -16,29 +16,25 @@
  */
 package org.apache.tika.fork;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.URL;
 import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.List;
 
-import org.apache.tika.io.IOExceptionWithCause;
 import org.apache.tika.io.IOUtils;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
 import org.xml.sax.ContentHandler;
 
 class ForkClient {
 
+    private final String java = "java"; // TODO: Make configurable
+
     private final ClassLoader loader;
 
     private final File directory;
@@ -54,7 +50,7 @@ class ForkClient {
     public ForkClient(ClassLoader loader) throws IOException {
         this.loader = loader;
 
-        this.directory = File.createTempFile("apache-tika-", "-oop");
+        this.directory = File.createTempFile("apache-tika-", "-fork");
         directory.delete();
         directory.mkdir();
 
@@ -65,31 +61,18 @@ class ForkClient {
             copyClassToDirectory(ForkProxy.class);
             copyClassToDirectory(ClassLoaderProxy.class);
 
-            System.out.println(directory);
-            //ProcessBuilder builder = new ProcessBuilder();
-            //builder.directory(directory);
-            //builder.command("java", ForkServer.class.getName());
-            //this.process = builder.start();
-            //this.output = new DataOutputStream(process.getOutputStream());
-            //this.input = new DataInputStream(process.getInputStream());
-            //this.error = process.getErrorStream();
-            this.process = null;
-            this.error = System.in;
-            PipedInputStream in = new PipedInputStream();
-            PipedOutputStream out = new PipedOutputStream();
-            this.input = new DataInputStream(new PipedInputStream(out));
-            this.output = new DataOutputStream(new PipedOutputStream(in));
-            final ForkServer server = new ForkServer(in, out);
-            new Thread() {
-                public void run() {
-                    try {server.run();} catch (Exception e) {}
-                }
-            }.start();
+            ProcessBuilder builder = new ProcessBuilder();
+            builder.directory(directory);
+            builder.command(java, ForkServer.class.getName());
+            this.process = builder.start();
+            this.output = new DataOutputStream(process.getOutputStream());
+            this.input = new DataInputStream(process.getInputStream());
+            this.error = process.getErrorStream();
 
             ok = true;
         } finally {
             if (!ok) {
-                // delete(directory);
+                delete(directory);
             }
         }
     }
@@ -121,31 +104,52 @@ class ForkClient {
         }
     }
 
-    public synchronized void parse(
-            Parser parser, InputStream input, ContentHandler handler)
+    public synchronized void call(
+            Object object, String method, Object... args)
             throws IOException {
         List<ForkResource> resources = new ArrayList<ForkResource>();
-        resources.add(new ClassLoaderResource(loader));
-        resources.add(new InputStreamResource(input));
-        resources.add(new ContentHandlerResource(handler));
-        consumeErrorStream();
-        output.write(ForkServer.CALL);
-        ForkSerializer.serialize(output, new ClassLoaderProxy(0));
-        waitForResponse(resources);
-        ForkSerializer.serialize(output, parser);
-        waitForResponse(resources);
+        sendObject(loader, resources);
+        sendObject(object, resources);
         output.writeUTF("parse");
-        ForkSerializer.serialize(output, new InputStreamProxy(1));
-        waitForResponse(resources);
-        ForkSerializer.serialize(output, new ContentHandlerProxy(2));
-        waitForResponse(resources);
-        ForkSerializer.serialize(output, new Metadata());
-        waitForResponse(resources);
-        ForkSerializer.serialize(output, new ParseContext());
-        waitForResponse(resources);
+        for (int i = 0; i < args.length; i++) {
+            sendObject(args[i], resources);
+        }
         waitForResponse(resources);
     }
 
+    /**
+     * Serializes the object first into an in-memory buffer and then
+     * writes it to the output stream with a preceding size integer.
+     *
+     * @param object object to be serialized
+     * @param resources list of fork resources, used when adding proxies
+     * @throws IOException if the object could not be serialized
+     */
+    private void sendObject(Object object, List<ForkResource> resources)
+            throws IOException {
+        int n = resources.size();
+        if (object instanceof InputStream) {
+            resources.add(new InputStreamResource((InputStream) object));
+            object = new InputStreamProxy(n);
+        } else if (object instanceof ContentHandler) {
+            resources.add(new ContentHandlerResource((ContentHandler) object));
+            object = new ContentHandlerProxy(n);
+        } else if (object instanceof ClassLoader) {
+            resources.add(new ClassLoaderResource((ClassLoader) object));
+            object = new ClassLoaderProxy(n);
+        }
+
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        ObjectOutputStream serializer = new ObjectOutputStream(buffer);
+        serializer.writeObject(object);
+        serializer.close();
+
+        byte[] data = buffer.toByteArray();
+        output.writeInt(data.length);
+        output.write(data);
+
+        waitForResponse(resources);
+    }
 
     public synchronized void close() {
         try {
@@ -154,8 +158,8 @@ class ForkClient {
             error.close();
         } catch (IOException ignore) {
         }
-        // process.destroy();
-        // delete(directory);
+        process.destroy();
+        delete(directory);
     }
 
     private byte waitForResponse(List<ForkResource> resources)

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkParser.java Mon Jan 17 18:50:37 2011
@@ -21,21 +21,24 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.Set;
 
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AutoDetectParser;
-import org.apache.tika.parser.DelegatingParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.sax.WriteOutContentHandler;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 
-public class ForkParser extends DelegatingParser {
+public class ForkParser implements Parser {
 
     private final ClassLoader loader;
 
+    private final Parser parser;
+
     private final Queue<ForkClient> pool =
         new LinkedList<ForkClient>();
 
@@ -43,12 +46,12 @@ public class ForkParser extends Delegati
 
     public static void main(String[] args) throws Exception {
         ForkParser parser = new ForkParser(
-                Thread.currentThread().getContextClassLoader());
+                Thread.currentThread().getContextClassLoader(),
+                new AutoDetectParser());
         try {
             InputStream stream =
                 new ByteArrayInputStream("Hello, World!".getBytes());
             ParseContext context = new ParseContext();
-            context.set(Parser.class, new AutoDetectParser());
             parser.parse(
                     stream, new WriteOutContentHandler(System.out),
                     new Metadata(), context);
@@ -57,26 +60,33 @@ public class ForkParser extends Delegati
         }
     }
 
-    public ForkParser(ClassLoader loader) {
+    public ForkParser(ClassLoader loader, Parser parser) {
         this.loader = loader;
+        this.parser = parser;
+    }
+
+    public Set<MediaType> getSupportedTypes(ParseContext context) {
+        return parser.getSupportedTypes(context);
     }
 
-    /**
-     * 
-     */
-    @Override
     public void parse(
             InputStream stream, ContentHandler handler,
             Metadata metadata, ParseContext context)
             throws IOException, SAXException, TikaException {
         ForkClient client = acquireClient();
         try {
-            client.parse(getDelegateParser(context), stream, handler);
+            client.call(parser, "parse", stream, handler, metadata, context);
         } finally {
             releaseClient(client);
         }
     }
 
+    public void parse(
+            InputStream stream, ContentHandler handler, Metadata metadata)
+            throws IOException, SAXException, TikaException {
+        parse(stream, handler, metadata, new ParseContext());
+    }
+
     public synchronized void close() {
         for (ForkClient client : pool) {
             client.close();

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkProxy.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkProxy.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkProxy.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkProxy.java Mon Jan 17 18:50:37 2011
@@ -20,7 +20,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.Serializable;
 
-interface ForkProxy extends Serializable {
+public interface ForkProxy extends Serializable {
 
     void init(DataInputStream input, DataOutputStream output);
 

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkResource.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkResource.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkResource.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkResource.java Mon Jan 17 18:50:37 2011
@@ -21,7 +21,7 @@ import java.io.DataOutputStream;
 
 import java.io.IOException;
 
-interface ForkResource {
+public interface ForkResource {
 
     Throwable process(DataInputStream input, DataOutputStream output)
         throws IOException;

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkSerializer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkSerializer.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkSerializer.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkSerializer.java Mon Jan 17 18:50:37 2011
@@ -54,7 +54,7 @@ class ForkSerializer extends ObjectInput
      * @param loader class loader used when deserializing objects
      * @throws IOException if this stream could not be initiated
      */
-    private ForkSerializer(InputStream input, ClassLoader loader)
+    public ForkSerializer(InputStream input, ClassLoader loader)
             throws IOException {
         super(input);
         this.loader = loader;
@@ -73,55 +73,4 @@ class ForkSerializer extends ObjectInput
         return Class.forName(desc.getName(), false, loader);
     }
 
-    /**
-     * Serializes the object first into an in-memory buffer and then
-     * writes it to the output stream with a preceding size integer.
-     *
-     * @param output output stream to which the serialized object is written
-     * @param object object to be serialized
-     * @throws IOException if the object could not be serialized
-     */
-    static void serialize(DataOutputStream output, Object object)
-            throws IOException {
-        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
-        ObjectOutputStream serializer = new ObjectOutputStream(buffer);
-        serializer.writeObject(object);
-        serializer.close();
-
-        byte[] data = buffer.toByteArray();
-        output.writeInt(data.length);
-        output.write(data);
-    }
-
-    /**
-     * Deserializes an object from the given stream. The serialized object
-     * is expected to be preceded by a size integer, that is used for reading
-     * the entire serialization into a memory before deserializing it.
-     *
-     * @param input input stream from which the serialized object is read
-     * @param loader class loader to be used for loading referenced classes
-     * @throws IOException if the object could not be deserialized
-     * @throws ClassNotFoundException if a referenced class is not found
-     */
-    static Object deserialize(
-            DataInputStream input, DataOutputStream output, ClassLoader loader)
-            throws IOException, ClassNotFoundException {
-        int n = input.readInt();
-        byte[] data = new byte[n];
-        input.readFully(data);
-
-        ObjectInputStream deserializer =
-            new ForkSerializer(new ByteArrayInputStream(data), loader);
-        Object object = deserializer.readObject();
-        if (object instanceof ForkProxy) {
-            ((ForkProxy) object).init(input, output);
-        }
-
-        output.writeByte(ForkServer.REPLY);
-        output.flush();
-
-        return object;
-    }
-
 }

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/fork/ForkServer.java Mon Jan 17 18:50:37 2011
@@ -17,22 +17,15 @@
 package org.apache.tika.fork;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
 
-class ForkServer extends ClassLoader {
+class ForkServer implements Runnable {
 
     public static final byte ERROR = -1;
 
@@ -43,30 +36,25 @@ class ForkServer extends ClassLoader {
     public static final byte RESOURCE = 2;
 
     /**
-     * Starts a forked server process.
+     * Starts a forked server process using the standard input and output
+     * streams for communication with the parent process. Any attempts by
+     * stray code to read from standard input or write to standard output
+     * is redirected to avoid interfering with the communication channel.
      * 
      * @param args command line arguments, ignored
      * @throws Exception if the server could not be started
      */
     public static void main(String[] args) throws Exception {
-        ForkServer server =
-            new ForkServer(System.in, System.out);
-
-        // Set the server instance as the context class loader
-        // to make classes from the parent process available
-        Thread.currentThread().setContextClassLoader(server);
-
-        // Redirect standard input and output streams to prevent
-        // stray code from interfering with the message stream
+        ForkServer server = new ForkServer(System.in, System.out);
         System.setIn(new ByteArrayInputStream(new byte[0]));
         System.setOut(System.err);
-
-        // Start processing request
         server.run();
     }
 
+    /** Input stream for reading from the parent process */
     private final DataInputStream input;
 
+    /** Output stream for writing to the parent process */
     private final DataOutputStream output;
 
     /**
@@ -83,39 +71,29 @@ class ForkServer extends ClassLoader {
         this.output = new DataOutputStream(output);
     }
 
-    public void run() throws IOException {
-        int b;
-        while ((b = input.read()) != -1) {
-            if (b == CALL) {
-                try {
-                    call();
-                } catch (Exception e) {
-                    output.write(ERROR);
-                    ForkSerializer.serialize(output, e);
+    public void run() {
+        try {
+            while (true) {
+                ClassLoader loader = (ClassLoader) readObject(
+                        ForkServer.class.getClassLoader());
+                Thread.currentThread().setContextClassLoader(loader);
+
+                Object object = readObject(loader);
+                Method method = getMethod(object, input.readUTF());
+                Object[] args = new Object[method.getParameterTypes().length];
+                for (int i = 0; i < args.length; i++) {
+                    args[i] = readObject(loader);
                 }
+                method.invoke(object, args);
+
+                output.write(REPLY);
                 output.flush();
             }
+        } catch (Throwable t) {
+            t.printStackTrace();
         }
     }
 
-    private void call() throws Exception {
-        ClassLoader loader = (ClassLoader) ForkSerializer.deserialize(
-                input, output, ForkServer.class.getClassLoader());
-        System.err.println("Loader loaded");
-        Object object = ForkSerializer.deserialize(input, output, loader);
-        System.err.println("Object loaded");
-        Method method = getMethod(object, input.readUTF());
-        System.err.println("Method loaded");
-        int n = method.getParameterTypes().length;
-        Object[] args = new Object[n];
-        for (int i = 0; i < n; i++) {
-            args[i] = ForkSerializer.deserialize(input, output, loader);
-        }
-        method.invoke(object, args);
-        output.write(REPLY);
-        output.flush();
-    }
-
     private Method getMethod(Object object, String name) {
         Class<?> klass = object.getClass();
         for (Method method : klass.getMethods()) {
@@ -126,4 +104,34 @@ class ForkServer extends ClassLoader {
         return null;
     }
 
+    /**
+     * Deserializes an object from the given stream. The serialized object
+     * is expected to be preceded by a size integer, that is used for reading
+     * the entire serialization into a memory before deserializing it.
+     *
+     * @param input input stream from which the serialized object is read
+     * @param loader class loader to be used for loading referenced classes
+     * @throws IOException if the object could not be deserialized
+     * @throws ClassNotFoundException if a referenced class is not found
+     */
+    private Object readObject(ClassLoader loader)
+            throws IOException, ClassNotFoundException {
+        int n = input.readInt();
+        byte[] data = new byte[n];
+        input.readFully(data);
+
+        ObjectInputStream deserializer =
+            new ForkSerializer(new ByteArrayInputStream(data), loader);
+        Object object = deserializer.readObject();
+        if (object instanceof ForkProxy) {
+            ((ForkProxy) object).init(input, output);
+        }
+
+        // Tell the parent process that we successfully received this object
+        output.writeByte(ForkServer.REPLY);
+        output.flush();
+
+        return object;
+    }
+
 }

Modified: tika/trunk/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java?rev=1060042&r1=1060041&r2=1060042&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java (original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java Mon Jan 17 18:50:37 2011
@@ -19,8 +19,6 @@ package org.apache.tika.parser;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.detect.DefaultDetector;
@@ -31,7 +29,6 @@ import org.apache.tika.io.TikaInputStrea
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.mime.MediaType;
 import org.apache.tika.mime.MediaTypeRegistry;
-import org.apache.tika.mime.MimeTypes;
 import org.apache.tika.sax.SecureContentHandler;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;