You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2016/01/27 08:48:49 UTC

svn commit: r1726948 - /manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java

Author: kwright
Date: Wed Jan 27 07:48:49 2016
New Revision: 1726948

URL: http://svn.apache.org/viewvc?rev=1726948&view=rev
Log:
Restructure the connector to allow for proper memory bounding; not yet completed (need rolling buffer implementation first).

Modified:
    manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java

Modified: manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java?rev=1726948&r1=1726947&r2=1726948&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java (original)
+++ manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java Wed Jan 27 07:48:49 2016
@@ -16,8 +16,8 @@
  */
 package org.apache.manifoldcf.agents.transformation.opennlp;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
+import java.io.*;
+
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,6 +59,9 @@ public class OpenNlpExtractor extends Ba
 
   protected static final String[] activitiesList = new String[] { ACTIVITY_EXTRACT };
 
+  /** We handle up to 64K in memory; after that we go to disk. */
+  protected static final long inMemoryMaximumFile = 65536;
+
   /**
    * Return a list of activities that this connector generates. The connector
    * does NOT need to be connected before this method is called.
@@ -133,63 +136,128 @@ public class OpenNlpExtractor extends Ba
    */
   @Override
   public int addOrReplaceDocumentWithException(String documentURI, VersionContext pipelineDescription,
-      RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
-          throws ManifoldCFException, ServiceInterruption, IOException {
+    RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+    throws ManifoldCFException, ServiceInterruption, IOException {
     // assumes use of Tika extractor before using this connector
     Logging.agents.debug("Starting OpenNlp extraction");
 
     SpecPacker sp = new SpecPacker(pipelineDescription.getSpecification());
 
-    byte[] bytes = IOUtils.toByteArray(document.getBinaryStream());
-
-    SentenceDetector sentenceDetector = OpenNlpExtractorConfig.sentenceDetector(sp.getSModelPath());
-    Tokenizer tokenizer = OpenNlpExtractorConfig.tokenizer(sp.getTModelPath());
-    NameFinderME peopleFinder = OpenNlpExtractorConfig.peopleFinder(sp.getPModelPath());
-    NameFinderME locationFinder = OpenNlpExtractorConfig.locationFinder(sp.getLModelPath());
-    NameFinderME organizationFinder = OpenNlpExtractorConfig.organizationFinder(sp.getOModelPath());
-
-    // create a duplicate
-    RepositoryDocument docCopy = document.duplicate();
-    Map<String, List<String>> nerMap = new HashMap<>();
-
-    if (document.getBinaryLength() > 0) {
-      String textContent = new String(bytes, StandardCharsets.UTF_8);
-      List<String> peopleList = new ArrayList<>();
-      List<String> locationsList = new ArrayList<>();
-      List<String> organizationsList = new ArrayList<>();
-
-      String[] sentences = sentenceDetector.sentDetect(textContent);
-      for (String sentence : sentences) {
-        String[] tokens = tokenizer.tokenize(sentence);
-
-        Span[] spans = peopleFinder.find(tokens);
-        peopleList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+    // In order to be able to replay the input stream both for extraction and for downstream use,
+    // we need to page through it, some number of characters at a time, and write those into a local buffer.
+    // We can do this at the same time we're extracting, if we're clever.
+      
+    // Set up to spool back the original content, using either memory or disk, whichever makes sense.
+    DestinationStorage ds;
+    if (document.getBinaryLength() <= inMemoryMaximumFile) {
+      ds = new MemoryDestinationStorage((int)document.getBinaryLength());
+    } else {
+      ds = new FileDestinationStorage();
+    }
+    
+    try {
 
-        spans = locationFinder.find(tokens);
-        locationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+      // For logging, we'll need all of this
+      long startTime = System.currentTimeMillis();
+      String resultCode = "OK";
+      String description = null;
+      Long length = null;
+
+      final MetadataAccumulator ma = new MetadataAccumulator(sp);
+      
+      try {
+
+        // Page through document content, saving it aside into destination storage, while also extracting the content
+        final OutputStream os = ds.getOutputStream();
+        try {
+          // We presume that the content is utf-8!!  Thus it has to have been run through the TikaExtractor, or equivalent.
+          //
+          // We're going to be paging through the input stream by chunks of characters.  Each chunk will then be passed to the
+          // output stream (os) via a writer, as well as to the actual code that invokes the nlp sentence extraction.  
+          
+          // We need an output writer that converts the input into characters.  
+          // 
+          Writer w = new OutputStreamWriter(os, "utf-8");
+          try {
+            Reader r = new InputStreamReader(document.getBinaryStream(), "utf-8");
+            try {
+              // Now, page through!
+              // It's too bad we have to convert FROM utf-8 and then back TO utf-8, but that can't be helped.
+              char[] characterBuffer = new char[65536];
+              while (true) {
+                int amt = r.read(characterBuffer);
+                if (amt == -1) {
+                  break;
+                }
+                // Write into the copy buffer
+                w.write(characterBuffer,0,amt);
+                // Also do the processing
+                ma.acceptCharacters(characterBuffer,amt);
+              }
+              // Do not close the reader; the underlying stream will be closed by our caller when the RepositoryDocument is done with
+            } catch (IOException e) {
+              // These are errors from reading the RepositoryDocument input stream; we handle them accordingly.
+              resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
+              description = e.getMessage();
+              throw e;
+            }
+          } finally {
+            w.flush();
+          }
+        }
+        finally
+        {
+          os.close();
+          length = new Long(ds.getBinaryLength());
+        }
 
-        spans = organizationFinder.find(tokens);
-        organizationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+        // Check to be sure downstream pipeline will accept document of specified length
+        if (!activities.checkLengthIndexable(ds.getBinaryLength()))
+        {
+          activities.noDocument();
+          resultCode = activities.EXCLUDED_LENGTH;
+          description = "Downstream pipeline rejected document with length "+ds.getBinaryLength();
+          return DOCUMENTSTATUS_REJECTED;
+        }
 
       }
+      finally
+      {
+        // Log the extraction processing
+        activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT, length, documentURI,
+          resultCode, description);
+      }
+      
+      // Parsing complete!
+      // Create a copy of Repository Document
+      RepositoryDocument docCopy = document.duplicate();
+        
+      // Get new stream length
+      long newBinaryLength = ds.getBinaryLength();
+      // Open new input stream
+      InputStream is = ds.getInputStream();
+      try
+      {
+        docCopy.setBinary(is,newBinaryLength);
+
+        // add named entity meta-data
+        Map<String,List<String>> nerMap = ma.getMetadata();
+        if (!nerMap.isEmpty()) {
+          for (Entry<String, List<String>> entry : nerMap.entrySet()) {
+            List<String> neList = entry.getValue();
+            String[] neArray = neList.toArray(new String[neList.size()]);
+            docCopy.addField(entry.getKey(), neArray);
+          }
+        }
 
-      nerMap.put(PERSONS, peopleList);
-      nerMap.put(LOCATIONS, locationsList);
-      nerMap.put(ORGANIZATIONS, organizationsList);
-    }
-    // reset original stream
-    docCopy.setBinary(new ByteArrayInputStream(bytes), bytes.length);
-
-    // add named entity meta-data
-    if (!nerMap.isEmpty()) {
-      for (Entry<String, List<String>> entry : nerMap.entrySet()) {
-        List<String> neList = entry.getValue();
-        String[] neArray = neList.toArray(new String[neList.size()]);
-        docCopy.addField(entry.getKey(), neArray);
+        // Send new document downstream
+        return activities.sendDocument(documentURI,docCopy);
+      } finally {
+        is.close();
       }
+    } finally {
+      ds.close();
     }
-
-    return activities.sendDocument(documentURI, docCopy);
   }
 
   // ////////////////////////
@@ -428,6 +496,218 @@ public class OpenNlpExtractor extends Ba
     paramMap.put("OMODELPATH", oModelPath);
   }
 
+  protected static int handleIOException(IOException e)
+    throws ManifoldCFException
+  {
+    // IOException reading from our local storage...
+    if (e instanceof InterruptedIOException)
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    throw new ManifoldCFException(e.getMessage(),e);
+  }
+
+  /** An instance of this class receives characters in 64K chunks, and needs to accumulate
+  * extracted metadata that this transformer will pass down.
+  */
+  protected class MetadataAccumulator {
+
+    final SentenceDetector sentenceDetector;
+    final Tokenizer tokenizer;
+    final NameFinderME peopleFinder;
+    final NameFinderME locationFinder;
+    final NameFinderME organizationFinder;
+    
+    final List<String> peopleList = new ArrayList<>();
+    final List<String> locationsList = new ArrayList<>();
+    final List<String> organizationsList = new ArrayList<>();
+    
+    public MetadataAccumulator(final SpecPacker sp)
+      throws ManifoldCFException {
+      try {
+        sentenceDetector = OpenNlpExtractorConfig.sentenceDetector(sp.getSModelPath());
+        tokenizer = OpenNlpExtractorConfig.tokenizer(sp.getTModelPath());
+        peopleFinder = OpenNlpExtractorConfig.peopleFinder(sp.getPModelPath());
+        locationFinder = OpenNlpExtractorConfig.locationFinder(sp.getLModelPath());
+        organizationFinder = OpenNlpExtractorConfig.organizationFinder(sp.getOModelPath());
+      } catch (IOException e) {
+        throw new ManifoldCFException(e.getMessage(), e);
+      }
+    }
+    
+    /** Accept characters, including actual count.
+    */
+    public void acceptCharacters(final char[] buffer, int amt) {
+      // MHL
+    }
+    
+    public Map<String,List<String>> getMetadata() {
+      final Map<String, List<String>> nerMap = new HashMap<>();
+      nerMap.put(PERSONS, peopleList);
+      nerMap.put(LOCATIONS, locationsList);
+      nerMap.put(ORGANIZATIONS, organizationsList);
+      return nerMap;
+    }
+    
+  }
+  
+  /*
+      The following logic needs to be added back in, but with rolling character buffers and duplicate sentence detection...
+  
+      List<String> peopleList = new ArrayList<>();
+      List<String> locationsList = new ArrayList<>();
+      List<String> organizationsList = new ArrayList<>();
+
+      String[] sentences = sentenceDetector.sentDetect(textContent);
+      for (String sentence : sentences) {
+        String[] tokens = tokenizer.tokenize(sentence);
+
+        Span[] spans = peopleFinder.find(tokens);
+        peopleList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+
+        spans = locationFinder.find(tokens);
+        locationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+
+        spans = organizationFinder.find(tokens);
+        organizationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+
+      }
+
+  */
+  
+  protected static interface DestinationStorage {
+    /** Get the output stream to write to.  Caller should explicitly close this stream when done writing.
+    */
+    public OutputStream getOutputStream()
+      throws ManifoldCFException;
+    
+    /** Get new binary length.
+    */
+    public long getBinaryLength()
+      throws ManifoldCFException;
+
+    /** Get the input stream to read from.  Caller should explicitly close this stream when done reading.
+    */
+    public InputStream getInputStream()
+      throws ManifoldCFException;
+    
+    /** Close the object and clean up everything.
+    * This should be called when the data is no longer needed.
+    */
+    public void close()
+      throws ManifoldCFException;
+  }
+  
+  protected static class FileDestinationStorage implements DestinationStorage {
+    protected final File outputFile;
+    protected final OutputStream outputStream;
+
+    public FileDestinationStorage()
+      throws ManifoldCFException
+    {
+      File outputFile;
+      OutputStream outputStream;
+      try
+      {
+        outputFile = File.createTempFile("mcftika","tmp");
+        outputStream = new FileOutputStream(outputFile);
+      }
+      catch (IOException e)
+      {
+        handleIOException(e);
+        outputFile = null;
+        outputStream = null;
+      }
+      this.outputFile = outputFile;
+      this.outputStream = outputStream;
+    }
+    
+    @Override
+    public OutputStream getOutputStream()
+      throws ManifoldCFException
+    {
+      return outputStream;
+    }
+    
+    /** Get new binary length.
+    */
+    @Override
+    public long getBinaryLength()
+      throws ManifoldCFException
+    {
+      return outputFile.length();
+    }
+
+    /** Get the input stream to read from.  Caller should explicitly close this stream when done reading.
+    */
+    @Override
+    public InputStream getInputStream()
+      throws ManifoldCFException
+    {
+      try
+      {
+        return new FileInputStream(outputFile);
+      }
+      catch (IOException e)
+      {
+        handleIOException(e);
+        return null;
+      }
+    }
+    
+    /** Close the object and clean up everything.
+    * This should be called when the data is no longer needed.
+    */
+    @Override
+    public void close()
+      throws ManifoldCFException
+    {
+      outputFile.delete();
+    }
+
+  }
+  
+  protected static class MemoryDestinationStorage implements DestinationStorage {
+    protected final ByteArrayOutputStream outputStream;
+    
+    public MemoryDestinationStorage(int sizeHint)
+    {
+      outputStream = new ByteArrayOutputStream(sizeHint);
+    }
+    
+    @Override
+    public OutputStream getOutputStream()
+      throws ManifoldCFException
+    {
+      return outputStream;
+    }
+
+    /** Get new binary length.
+    */
+    @Override
+    public long getBinaryLength()
+      throws ManifoldCFException
+    {
+      return outputStream.size();
+    }
+    
+    /** Get the input stream to read from.  Caller should explicitly close this stream when done reading.
+    */
+    @Override
+    public InputStream getInputStream()
+      throws ManifoldCFException
+    {
+      return new ByteArrayInputStream(outputStream.toByteArray());
+    }
+    
+    /** Close the object and clean up everything.
+    * This should be called when the data is no longer needed.
+    */
+    public void close()
+      throws ManifoldCFException
+    {
+    }
+
+  }
+
   protected static class SpecPacker {
 
     private final String sModelPath;