You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/18 23:32:24 UTC

svn commit: r1603640 - /manifoldcf/branches/CONNECTORS-954/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformer/tika/TikaExtractor.java

Author: kwright
Date: Wed Jun 18 21:32:24 2014
New Revision: 1603640

URL: http://svn.apache.org/r1603640
Log:
Finish first rev of tika extractor

Modified:
    manifoldcf/branches/CONNECTORS-954/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformer/tika/TikaExtractor.java

Modified: manifoldcf/branches/CONNECTORS-954/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformer/tika/TikaExtractor.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformer/tika/TikaExtractor.java?rev=1603640&r1=1603639&r2=1603640&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformer/tika/TikaExtractor.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/tika/connector/src/main/java/org/apache/manifoldcf/agents/transformer/tika/TikaExtractor.java Wed Jun 18 21:32:24 2014
@@ -20,10 +20,20 @@ package org.apache.manifoldcf.agents.tra
 
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.agents.system.Logging;
 
 import java.io.*;
 import java.util.*;
 
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.sax.BodyContentHandler;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+
 /** This connector works as a transformation connector, but does nothing other than logging.
 *
 */
@@ -35,6 +45,9 @@ public class TikaExtractor extends org.a
 
   protected static final String[] activitiesList = new String[]{ACTIVITY_EXTRACT};
   
+  /** We handle up to a megabyte in memory; after that we go to disk. */
+  protected static final long inMemoryMaximumFile = 1000000;
+  
   /** Return a list of activities that this connector generates.
   * The connector does NOT need to be connected before this method is called.
   *@return the set of activities.
@@ -65,45 +78,291 @@ public class TikaExtractor extends org.a
   public int addOrReplaceDocumentWithException(String documentURI, String pipelineDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    long startTime = System.currentTimeMillis();
-    String resultCode = "OK";
-    String description = null;
-    Long length = null;
+    // Tika's API reads from an input stream and writes to an output Writer.
+    // Since a RepositoryDocument includes readers and inputstreams exclusively, AND all downstream
+    // processing needs to occur in a ManifoldCF thread, we have some constraints on the architecture we need to get this done:
+    // (1) The principle worker thread must call the downstream pipeline send() method.
+    // (2) The callee of the send() method must call a reader in the Repository Document.
+    // (3) The Reader, if its databuffer is empty, must pull more data from the original input stream and hand it to Tika, which populates the Reader's databuffer.
+    // So all this can be done in one thread, with some work, and the creation of a special InputStream or Reader implementation.  Where it fails, though, is the
+    // requirement that tika-extracted metadata be included in the RepositoryDocument right from the beginning.  Effectively this means that the entire document
+    // must be parsed before it is handed downstream -- so basically a temporary file (or in-memory buffer if small enough) must be created.
+    // Instead of the elegant flow above, we have the following:
+    // (1) Create a temporary file (or in-memory buffer if file is small enough)
+    // (2) Run Tika to completion, streaming content output to temporary file
+    // (3) Modify RepositoryDocument to read from temporary file, and include Tika-extracted metadata
+    // (4) Call downstream document processing
+      
+    DestinationStorage ds;
+      
+    if (document.getBinaryLength() <= inMemoryMaximumFile)
+    {
+      ds = new MemoryDestinationStorage((int)document.getBinaryLength());
+    }
+    else
+    {
+      ds = new FileDestinationStorage();
+    }
     try
     {
-      // MHL to actually hook up tika
-      long binaryLength = document.getBinaryLength();
-      int rval = activities.sendDocument(documentURI,document,authorityNameString);
-      length =  new Long(binaryLength);
-      resultCode = (rval == DOCUMENTSTATUS_ACCEPTED)?"ACCEPTED":"REJECTED";
-      return rval;
-    }
-    catch (ServiceInterruption e)
-    {
-      resultCode = "SERVICEINTERRUPTION";
-      description = e.getMessage();
-      throw e;
-    }
-    catch (ManifoldCFException e)
-    {
-      resultCode = "EXCEPTION";
-      description = e.getMessage();
-      throw e;
-    }
-    catch (IOException e)
-    {
-      resultCode = "IOEXCEPTION";
-      description = e.getMessage();
-      throw e;
+      Metadata metadata = new Metadata();
+      // We only log the extraction
+      long startTime = System.currentTimeMillis();
+      String resultCode = "OK";
+      String description = null;
+      Long length = null;
+      try
+      {
+        OutputStream os = ds.getOutputStream();
+        try
+        {
+          Writer w = new OutputStreamWriter(os,"utf-8");
+          try
+          {
+            // Use tika to parse stuff
+            Parser parser = new AutoDetectParser();
+            ContentHandler handler = new BodyContentHandler(w);
+            ParseContext pc = new ParseContext();
+            try
+            {
+              parser.parse(document.getBinaryStream(), handler, metadata, pc);
+            }
+            catch (TikaException e)
+            {
+              resultCode = "TIKAEXCEPTION";
+              description = e.getMessage();
+              return handleTikaException(e);
+            }
+            catch (SAXException e)
+            {
+              resultCode = "SAXEXCEPTION";
+              description = e.getMessage();
+              return handleSaxException(e);
+            }
+            catch (IOException e)
+            {
+              resultCode = "IOEXCEPTION";
+              description = e.getMessage();
+              throw e;
+            }
+          }
+          finally
+          {
+            w.flush();
+          }
+        }
+        finally
+        {
+          os.close();
+        }
+      }
+      finally
+      {
+        // Log the extraction processing
+        activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT, length, documentURI,
+          resultCode, description);
+      }
+        
+      // Parsing complete!
+      // Create a copy of Repository Document
+      RepositoryDocument docCopy = document.duplicate();
+        
+      // Get new stream length
+      long newBinaryLength = ds.getBinaryLength();
+      // Open new input stream
+      InputStream is = ds.getInputStream();
+      try
+      {
+        docCopy.setBinary(is,newBinaryLength);
+
+        // Set up all metadata from Tika.  We may want to run this through a mapper eventually...
+        String[] metaNames = metadata.names();
+        for(String mName : metaNames){
+          String value = metadata.get(mName);
+          docCopy.addField(mName,value);
+        }
+
+        // Send new document downstream
+        int rval = activities.sendDocument(documentURI,docCopy,authorityNameString);
+        length =  new Long(newBinaryLength);
+        resultCode = (rval == DOCUMENTSTATUS_ACCEPTED)?"ACCEPTED":"REJECTED";
+        return rval;
+      }
+      finally
+      {
+        is.close();
+      }
     }
     finally
     {
-      activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT, length, documentURI,
-        resultCode, description);
+      ds.close();
+    }
+
+  }
+
+  protected static int handleTikaException(TikaException e)
+    throws IOException, ManifoldCFException, ServiceInterruption
+  {
+    // MHL - what does Tika throw if it gets an IOException reading the stream??
+    Logging.ingest.warn("Tika: Tika exception extracting: "+e.getMessage(),e);
+    return DOCUMENTSTATUS_REJECTED;
+  }
+  
+  protected static int handleSaxException(SAXException e)
+    throws IOException, ManifoldCFException, ServiceInterruption
+  {
+    // MHL - what does this mean?
+    Logging.ingest.warn("Tika: SAX exception extracting: "+e.getMessage(),e);
+    return DOCUMENTSTATUS_REJECTED;
+  }
+  
+  protected static int handleIOException(IOException e)
+    throws ManifoldCFException
+  {
+    // IOException reading from our local storage...
+    if (e instanceof InterruptedIOException)
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    throw new ManifoldCFException(e.getMessage(),e);
+  }
+  
+  protected static interface DestinationStorage
+  {
+    /** Get the output stream to write to.  Caller should explicitly close this stream when done writing.
+    */
+    public OutputStream getOutputStream()
+      throws ManifoldCFException;
+    
+    /** Get new binary length.
+    */
+    public long getBinaryLength()
+      throws ManifoldCFException;
+
+    /** Get the input stream to read from.  Caller should explicitly close this stream when done reading.
+    */
+    public InputStream getInputStream()
+      throws ManifoldCFException;
+    
+    /** Close the object and clean up everything.
+    * This should be called when the data is no longer needed.
+    */
+    public void close()
+      throws ManifoldCFException;
+  }
+  
+  protected static class FileDestinationStorage implements DestinationStorage
+  {
+    protected final File outputFile;
+    protected final OutputStream outputStream;
+
+    public FileDestinationStorage()
+      throws ManifoldCFException
+    {
+      File outputFile;
+      OutputStream outputStream;
+      try
+      {
+        outputFile = File.createTempFile("mcftika","tmp");
+        outputStream = new FileOutputStream(outputFile);
+      }
+      catch (IOException e)
+      {
+        handleIOException(e);
+        outputFile = null;
+        outputStream = null;
+      }
+      this.outputFile = outputFile;
+      this.outputStream = outputStream;
+    }
+    
+    @Override
+    public OutputStream getOutputStream()
+      throws ManifoldCFException
+    {
+      return outputStream;
+    }
+    
+    /** Get new binary length.
+    */
+    @Override
+    public long getBinaryLength()
+      throws ManifoldCFException
+    {
+      return outputFile.length();
+    }
+
+    /** Get the input stream to read from.  Caller should explicitly close this stream when done reading.
+    */
+    @Override
+    public InputStream getInputStream()
+      throws ManifoldCFException
+    {
+      try
+      {
+        return new FileInputStream(outputFile);
+      }
+      catch (IOException e)
+      {
+        handleIOException(e);
+        return null;
+      }
+    }
+    
+    /** Close the object and clean up everything.
+    * This should be called when the data is no longer needed.
+    */
+    @Override
+    public void close()
+      throws ManifoldCFException
+    {
+      outputFile.delete();
     }
 
   }
+  
+  protected static class MemoryDestinationStorage implements DestinationStorage
+  {
+    protected final ByteArrayOutputStream outputStream;
+    
+    public MemoryDestinationStorage(int sizeHint)
+    {
+      outputStream = new ByteArrayOutputStream(sizeHint);
+    }
+    
+    @Override
+    public OutputStream getOutputStream()
+      throws ManifoldCFException
+    {
+      return outputStream;
+    }
 
+    /** Get new binary length.
+    */
+    @Override
+    public long getBinaryLength()
+      throws ManifoldCFException
+    {
+      return outputStream.size();
+    }
+    
+    /** Get the input stream to read from.  Caller should explicitly close this stream when done reading.
+    */
+    @Override
+    public InputStream getInputStream()
+      throws ManifoldCFException
+    {
+      return new ByteArrayInputStream(outputStream.toByteArray());
+    }
+    
+    /** Close the object and clean up everything.
+    * This should be called when the data is no longer needed.
+    */
+    public void close()
+      throws ManifoldCFException
+    {
+    }
+
+  }
+  
 }