You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2016/01/27 08:48:49 UTC
svn commit: r1726948 -
/manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java
Author: kwright
Date: Wed Jan 27 07:48:49 2016
New Revision: 1726948
URL: http://svn.apache.org/viewvc?rev=1726948&view=rev
Log:
Restructure the connector to allow for proper memory bounding; not yet completed (need rolling buffer implementation first).
Modified:
manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java
Modified: manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java?rev=1726948&r1=1726947&r2=1726948&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java (original)
+++ manifoldcf/branches/CONNECTORS-1270/connectors/opennlp/connector/src/main/java/org/apache/manifoldcf/agents/transformation/opennlp/OpenNlpExtractor.java Wed Jan 27 07:48:49 2016
@@ -16,8 +16,8 @@
*/
package org.apache.manifoldcf.agents.transformation.opennlp;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
+import java.io.*;
+
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -59,6 +59,9 @@ public class OpenNlpExtractor extends Ba
protected static final String[] activitiesList = new String[] { ACTIVITY_EXTRACT };
+ /** We handle up to 64K in memory; after that we go to disk. */
+ protected static final long inMemoryMaximumFile = 65536;
+
/**
* Return a list of activities that this connector generates. The connector
* does NOT need to be connected before this method is called.
@@ -133,63 +136,128 @@ public class OpenNlpExtractor extends Ba
*/
@Override
public int addOrReplaceDocumentWithException(String documentURI, VersionContext pipelineDescription,
- RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
- throws ManifoldCFException, ServiceInterruption, IOException {
+ RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+ throws ManifoldCFException, ServiceInterruption, IOException {
// assumes use of Tika extractor before using this connector
Logging.agents.debug("Starting OpenNlp extraction");
SpecPacker sp = new SpecPacker(pipelineDescription.getSpecification());
- byte[] bytes = IOUtils.toByteArray(document.getBinaryStream());
-
- SentenceDetector sentenceDetector = OpenNlpExtractorConfig.sentenceDetector(sp.getSModelPath());
- Tokenizer tokenizer = OpenNlpExtractorConfig.tokenizer(sp.getTModelPath());
- NameFinderME peopleFinder = OpenNlpExtractorConfig.peopleFinder(sp.getPModelPath());
- NameFinderME locationFinder = OpenNlpExtractorConfig.locationFinder(sp.getLModelPath());
- NameFinderME organizationFinder = OpenNlpExtractorConfig.organizationFinder(sp.getOModelPath());
-
- // create a duplicate
- RepositoryDocument docCopy = document.duplicate();
- Map<String, List<String>> nerMap = new HashMap<>();
-
- if (document.getBinaryLength() > 0) {
- String textContent = new String(bytes, StandardCharsets.UTF_8);
- List<String> peopleList = new ArrayList<>();
- List<String> locationsList = new ArrayList<>();
- List<String> organizationsList = new ArrayList<>();
-
- String[] sentences = sentenceDetector.sentDetect(textContent);
- for (String sentence : sentences) {
- String[] tokens = tokenizer.tokenize(sentence);
-
- Span[] spans = peopleFinder.find(tokens);
- peopleList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+ // In order to be able to replay the input stream both for extraction and for downstream use,
+ // we need to page through it, some number of characters at a time, and write those into a local buffer.
+ // We can do this at the same time we're extracting, if we're clever.
+
+ // Set up to spool back the original content, using either memory or disk, whichever makes sense.
+ DestinationStorage ds;
+ if (document.getBinaryLength() <= inMemoryMaximumFile) {
+ ds = new MemoryDestinationStorage((int)document.getBinaryLength());
+ } else {
+ ds = new FileDestinationStorage();
+ }
+
+ try {
- spans = locationFinder.find(tokens);
- locationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+ // For logging, we'll need all of this
+ long startTime = System.currentTimeMillis();
+ String resultCode = "OK";
+ String description = null;
+ Long length = null;
+
+ final MetadataAccumulator ma = new MetadataAccumulator(sp);
+
+ try {
+
+ // Page through document content, saving it aside into destination storage, while also extracting the content
+ final OutputStream os = ds.getOutputStream();
+ try {
+ // We presume that the content is utf-8!! Thus it has to have been run through the TikaExtractor, or equivalent.
+ //
+ // We're going to be paging through the input stream by chunks of characters. Each chunk will then be passed to the
+ // output stream (os) via a writer, as well as to the actual code that invokes the nlp sentence extraction.
+
+ // We need an output writer that converts the input into characters.
+ //
+ Writer w = new OutputStreamWriter(os, "utf-8");
+ try {
+ Reader r = new InputStreamReader(document.getBinaryStream(), "utf-8");
+ try {
+ // Now, page through!
+ // It's too bad we have to convert FROM utf-8 and then back TO utf-8, but that can't be helped.
+ char[] characterBuffer = new char[65536];
+ while (true) {
+ int amt = r.read(characterBuffer);
+ if (amt == -1) {
+ break;
+ }
+ // Write into the copy buffer
+ w.write(characterBuffer,0,amt);
+ // Also do the processing
+ ma.acceptCharacters(characterBuffer,amt);
+ }
+ // Do not close the reader; the underlying stream will be closed by our caller when the RepositoryDocument is done with
+ } catch (IOException e) {
+ // These are errors from reading the RepositoryDocument input stream; we handle them accordingly.
+ resultCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
+ description = e.getMessage();
+ throw e;
+ }
+ } finally {
+ w.flush();
+ }
+ }
+ finally
+ {
+ os.close();
+ length = new Long(ds.getBinaryLength());
+ }
- spans = organizationFinder.find(tokens);
- organizationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+ // Check to be sure downstream pipeline will accept document of specified length
+ if (!activities.checkLengthIndexable(ds.getBinaryLength()))
+ {
+ activities.noDocument();
+ resultCode = activities.EXCLUDED_LENGTH;
+ description = "Downstream pipeline rejected document with length "+ds.getBinaryLength();
+ return DOCUMENTSTATUS_REJECTED;
+ }
}
+ finally
+ {
+ // Log the extraction processing
+ activities.recordActivity(new Long(startTime), ACTIVITY_EXTRACT, length, documentURI,
+ resultCode, description);
+ }
+
+ // Parsing complete!
+ // Create a copy of Repository Document
+ RepositoryDocument docCopy = document.duplicate();
+
+ // Get new stream length
+ long newBinaryLength = ds.getBinaryLength();
+ // Open new input stream
+ InputStream is = ds.getInputStream();
+ try
+ {
+ docCopy.setBinary(is,newBinaryLength);
+
+ // add named entity meta-data
+ Map<String,List<String>> nerMap = ma.getMetadata();
+ if (!nerMap.isEmpty()) {
+ for (Entry<String, List<String>> entry : nerMap.entrySet()) {
+ List<String> neList = entry.getValue();
+ String[] neArray = neList.toArray(new String[neList.size()]);
+ docCopy.addField(entry.getKey(), neArray);
+ }
+ }
- nerMap.put(PERSONS, peopleList);
- nerMap.put(LOCATIONS, locationsList);
- nerMap.put(ORGANIZATIONS, organizationsList);
- }
- // reset original stream
- docCopy.setBinary(new ByteArrayInputStream(bytes), bytes.length);
-
- // add named entity meta-data
- if (!nerMap.isEmpty()) {
- for (Entry<String, List<String>> entry : nerMap.entrySet()) {
- List<String> neList = entry.getValue();
- String[] neArray = neList.toArray(new String[neList.size()]);
- docCopy.addField(entry.getKey(), neArray);
+ // Send new document downstream
+ return activities.sendDocument(documentURI,docCopy);
+ } finally {
+ is.close();
}
+ } finally {
+ ds.close();
}
-
- return activities.sendDocument(documentURI, docCopy);
}
// ////////////////////////
@@ -428,6 +496,218 @@ public class OpenNlpExtractor extends Ba
paramMap.put("OMODELPATH", oModelPath);
}
+ protected static int handleIOException(IOException e)
+ throws ManifoldCFException
+ {
+ // IOException reading from our local storage...
+ if (e instanceof InterruptedIOException)
+ throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ throw new ManifoldCFException(e.getMessage(),e);
+ }
+
+ /** An instance of this class receives characters in 64K chunks, and needs to accumulate
+ * extracted metadata that this transformer will pass down.
+ */
+ protected class MetadataAccumulator {
+
+ final SentenceDetector sentenceDetector;
+ final Tokenizer tokenizer;
+ final NameFinderME peopleFinder;
+ final NameFinderME locationFinder;
+ final NameFinderME organizationFinder;
+
+ final List<String> peopleList = new ArrayList<>();
+ final List<String> locationsList = new ArrayList<>();
+ final List<String> organizationsList = new ArrayList<>();
+
+ public MetadataAccumulator(final SpecPacker sp)
+ throws ManifoldCFException {
+ try {
+ sentenceDetector = OpenNlpExtractorConfig.sentenceDetector(sp.getSModelPath());
+ tokenizer = OpenNlpExtractorConfig.tokenizer(sp.getTModelPath());
+ peopleFinder = OpenNlpExtractorConfig.peopleFinder(sp.getPModelPath());
+ locationFinder = OpenNlpExtractorConfig.locationFinder(sp.getLModelPath());
+ organizationFinder = OpenNlpExtractorConfig.organizationFinder(sp.getOModelPath());
+ } catch (IOException e) {
+ throw new ManifoldCFException(e.getMessage(), e);
+ }
+ }
+
+ /** Accept characters, including actual count.
+ */
+ public void acceptCharacters(final char[] buffer, int amt) {
+ // MHL
+ }
+
+ public Map<String,List<String>> getMetadata() {
+ final Map<String, List<String>> nerMap = new HashMap<>();
+ nerMap.put(PERSONS, peopleList);
+ nerMap.put(LOCATIONS, locationsList);
+ nerMap.put(ORGANIZATIONS, organizationsList);
+ return nerMap;
+ }
+
+ }
+
+ /*
+ The following logic needs to be added back in, but with rolling character buffers and duplicate sentence detection...
+
+ List<String> peopleList = new ArrayList<>();
+ List<String> locationsList = new ArrayList<>();
+ List<String> organizationsList = new ArrayList<>();
+
+ String[] sentences = sentenceDetector.sentDetect(textContent);
+ for (String sentence : sentences) {
+ String[] tokens = tokenizer.tokenize(sentence);
+
+ Span[] spans = peopleFinder.find(tokens);
+ peopleList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+
+ spans = locationFinder.find(tokens);
+ locationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+
+ spans = organizationFinder.find(tokens);
+ organizationsList.addAll(Arrays.asList(Span.spansToStrings(spans, tokens)));
+
+ }
+
+ */
+
+ protected static interface DestinationStorage {
+ /** Get the output stream to write to. Caller should explicitly close this stream when done writing.
+ */
+ public OutputStream getOutputStream()
+ throws ManifoldCFException;
+
+ /** Get new binary length.
+ */
+ public long getBinaryLength()
+ throws ManifoldCFException;
+
+ /** Get the input stream to read from. Caller should explicitly close this stream when done reading.
+ */
+ public InputStream getInputStream()
+ throws ManifoldCFException;
+
+ /** Close the object and clean up everything.
+ * This should be called when the data is no longer needed.
+ */
+ public void close()
+ throws ManifoldCFException;
+ }
+
+ protected static class FileDestinationStorage implements DestinationStorage {
+ protected final File outputFile;
+ protected final OutputStream outputStream;
+
+ public FileDestinationStorage()
+ throws ManifoldCFException
+ {
+ File outputFile;
+ OutputStream outputStream;
+ try
+ {
+ outputFile = File.createTempFile("mcftika","tmp");
+ outputStream = new FileOutputStream(outputFile);
+ }
+ catch (IOException e)
+ {
+ handleIOException(e);
+ outputFile = null;
+ outputStream = null;
+ }
+ this.outputFile = outputFile;
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public OutputStream getOutputStream()
+ throws ManifoldCFException
+ {
+ return outputStream;
+ }
+
+ /** Get new binary length.
+ */
+ @Override
+ public long getBinaryLength()
+ throws ManifoldCFException
+ {
+ return outputFile.length();
+ }
+
+ /** Get the input stream to read from. Caller should explicitly close this stream when done reading.
+ */
+ @Override
+ public InputStream getInputStream()
+ throws ManifoldCFException
+ {
+ try
+ {
+ return new FileInputStream(outputFile);
+ }
+ catch (IOException e)
+ {
+ handleIOException(e);
+ return null;
+ }
+ }
+
+ /** Close the object and clean up everything.
+ * This should be called when the data is no longer needed.
+ */
+ @Override
+ public void close()
+ throws ManifoldCFException
+ {
+ outputFile.delete();
+ }
+
+ }
+
+ protected static class MemoryDestinationStorage implements DestinationStorage {
+ protected final ByteArrayOutputStream outputStream;
+
+ public MemoryDestinationStorage(int sizeHint)
+ {
+ outputStream = new ByteArrayOutputStream(sizeHint);
+ }
+
+ @Override
+ public OutputStream getOutputStream()
+ throws ManifoldCFException
+ {
+ return outputStream;
+ }
+
+ /** Get new binary length.
+ */
+ @Override
+ public long getBinaryLength()
+ throws ManifoldCFException
+ {
+ return outputStream.size();
+ }
+
+ /** Get the input stream to read from. Caller should explicitly close this stream when done reading.
+ */
+ @Override
+ public InputStream getInputStream()
+ throws ManifoldCFException
+ {
+ return new ByteArrayInputStream(outputStream.toByteArray());
+ }
+
+ /** Close the object and clean up everything.
+ * This should be called when the data is no longer needed.
+ */
+ public void close()
+ throws ManifoldCFException
+ {
+ }
+
+ }
+
protected static class SpecPacker {
private final String sModelPath;