You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/06/07 20:24:05 UTC
[tika] branch branch_1x updated: TIKA-2662 add streaming json
serializer
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch branch_1x
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/branch_1x by this push:
new 90e3387 TIKA-2662 add streaming json serializer
90e3387 is described below
commit 90e3387883ae36eedc0a1375132949f234b20749
Author: tballison <ta...@mitre.org>
AuthorDate: Thu Jun 7 15:58:26 2018 -0400
TIKA-2662 add streaming json serializer
---
.../src/main/resources/tika-app-batch-config.xml | 1 +
.../tika/cli/TikaCLIBatchIntegrationTest.java | 74 +++++++++++
.../batch/fs/RecursiveParserWrapperFSConsumer.java | 46 ++-----
...FSConsumer.java => StreamOutRPWFSConsumer.java} | 141 ++++++++++-----------
.../fs/builders/BasicTikaFSConsumersBuilder.java | 24 +++-
.../tika/batch/fs/default-tika-batch-config.xml | 4 +
.../RecursiveParserWrapperFSConsumerTest.java | 5 +-
.../apache/tika/parser/RecursiveParserWrapper.java | 60 ++++-----
.../sax/AbstractRecursiveParserWrapperHandler.java | 2 +-
.../java/org/apache/tika/utils/ParserUtils.java | 7 +-
.../tika/parser/RecursiveParserWrapperTest.java | 7 +-
.../metadata/serialization/JsonMetadataList.java | 15 +++
.../serialization/JsonStreamingSerializer.java | 65 ++++++++++
.../serialization/JsonMetadataListTest.java | 46 +++++++
14 files changed, 341 insertions(+), 156 deletions(-)
diff --git a/tika-app/src/main/resources/tika-app-batch-config.xml b/tika-app/src/main/resources/tika-app-batch-config.xml
index 99651a1..12556c7 100644
--- a/tika-app/src/main/resources/tika-app-batch-config.xml
+++ b/tika-app/src/main/resources/tika-app-batch-config.xml
@@ -63,6 +63,7 @@
description="output directory for output"/> <!-- do we want to make this mandatory -->
<option opt="recursiveParserWrapper"
description="use the RecursiveParserWrapper or not (default = false)"/>
+ <option opt="streamOut" description="stream the output of the RecursiveParserWrapper (default = false)"/>
<option opt="handleExisting" hasArg="true"
description="if an output file already exists, do you want to: overwrite, rename or skip"/>
<option opt="basicHandlerType" hasArg="true"
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java
index 60c6d6b..50663f1 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java
@@ -20,25 +20,41 @@ package org.apache.tika.cli;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
+import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.Reader;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
+import java.util.logging.Handler;
import org.apache.commons.io.FileUtils;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.metadata.serialization.JsonStreamingSerializer;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
public class TikaCLIBatchIntegrationTest {
@@ -114,6 +130,27 @@ public class TikaCLIBatchIntegrationTest {
}
@Test
+ public void testStreamingJsonRecursiveBatchIntegration() throws Exception {
+ String[] params = {"-i", testInputDirForCommandLine,
+ "-o", tempOutputDirForCommandLine,
+ "-numConsumers", "10",
+ "-J", //recursive Json
+ "-t", //plain text in content
+ "-streamOut"
+ };
+ TikaCLI.main(params);
+
+ Path jsonFile = tempOutputDir.resolve("test_recursive_embedded.docx.json");
+ try (Reader reader = Files.newBufferedReader(jsonFile, UTF_8)) {
+ List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
+ assertEquals(12, metadataList.size());
+ assertTrue(metadataList.get(6).get(AbstractRecursiveParserWrapperHandler.TIKA_CONTENT).contains("human events"));
+ //test that the last written object has been bumped to the first by JsonMetadataList.fromJson()
+ assertNull( metadataList.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH));
+ }
+ }
+
+ @Test
public void testProcessLogFileConfig() throws Exception {
String[] params = {"-i", testInputDirForCommandLine,
"-o", tempOutputDirForCommandLine,
@@ -171,5 +208,42 @@ public class TikaCLIBatchIntegrationTest {
Files.isRegularFile(path));
}
+ @Test
+ public void oneOff() throws Exception {
+ Parser p = new AutoDetectParser();
+ RecursiveParserWrapper w = new RecursiveParserWrapper(p);
+ try (JsonStreamingSerializer writer = new JsonStreamingSerializer(new BufferedWriter(new OutputStreamWriter(
+ Files.newOutputStream(Paths.get("C:/data/tika_tmp.json")), StandardCharsets.UTF_8)))) {
+ ContentHandler contentHandler = new WriteoutRPWHandler(
+ new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
+ writer);
+ try (InputStream is = getClass().getResourceAsStream("/test-data/test_recursive_embedded.docx")) {
+ w.parse(is, contentHandler, new Metadata(), new ParseContext());
+ }
+ }
+ }
+
+ private class WriteoutRPWHandler extends AbstractRecursiveParserWrapperHandler {
+ private final JsonStreamingSerializer jsonWriter;
+ public WriteoutRPWHandler(ContentHandlerFactory contentHandlerFactory, JsonStreamingSerializer writer) {
+ super(contentHandlerFactory);
+ this.jsonWriter = writer;
+ }
+
+ @Override
+ public void endEmbeddedDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+ metadata.add(RecursiveParserWrapperHandler.TIKA_CONTENT, contentHandler.toString());
+ try {
+ jsonWriter.add(metadata);
+ } catch (IOException e) {
+ throw new SAXException(e);
+ }
+ }
+
+ @Override
+ public void endDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+ endEmbeddedDocument(contentHandler, metadata);
+ }
+ }
}
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java b/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
index 259157f..56b8b58 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
@@ -42,11 +42,8 @@ import org.apache.tika.utils.ExceptionUtils;
import org.xml.sax.helpers.DefaultHandler;
/**
- * Basic FileResourceConsumer that reads files from an input
- * directory and writes content to the output directory.
- * <p/>
- * This tries to catch most of the common exceptions, log them and
- * store them in the metadata list output.
+ * This runs a RecursiveParserWrapper against an input file
+ * and outputs the json metadata to an output file.
*/
public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
@@ -55,34 +52,21 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
private final OutputStreamFactory fsOSFactory;
private String outputEncoding = "UTF-8";
-
/**
- * @deprecated use {@link RecursiveParserWrapperFSConsumer#RecursiveParserWrapperFSConsumer(ArrayBlockingQueue, Parser, ContentHandlerFactory, OutputStreamFactory)}
+ *
* @param queue
- * @param parserFactory
+ * @param parser -- must be RecursiveParserWrapper or a ForkParser that wraps a RecursiveParserWrapper
* @param contentHandlerFactory
* @param fsOSFactory
- * @param config
*/
public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
- ParserFactory parserFactory,
- ContentHandlerFactory contentHandlerFactory,
- OutputStreamFactory fsOSFactory, TikaConfig config) {
- super(queue);
- this.contentHandlerFactory = contentHandlerFactory;
- this.fsOSFactory = fsOSFactory;
- Parser parserToWrap = parserFactory.getParser(config);
- this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
- }
-
- public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
- Parser parserToWrap,
+ Parser parser,
ContentHandlerFactory contentHandlerFactory,
OutputStreamFactory fsOSFactory) {
super(queue);
this.contentHandlerFactory = contentHandlerFactory;
this.fsOSFactory = fsOSFactory;
- this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
+ this.parser = parser;
}
@Override
@@ -115,24 +99,10 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
try {
parse(fileResource.getResourceId(), parser, is, handler,
containerMetadata, context);
- metadataList = handler.getMetadataList();
} catch (Throwable t) {
thrown = t;
- metadataList = handler.getMetadataList();
- if (metadataList == null) {
- metadataList = new LinkedList<>();
- }
- Metadata m = null;
- if (metadataList.size() == 0) {
- m = containerMetadata;
- } else {
- //take the top metadata item
- m = metadataList.remove(0);
- }
- String stackTrace = ExceptionUtils.getFilteredStackTrace(t);
- m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
- metadataList.add(0, m);
} finally {
+ metadataList = handler.getMetadataList();
IOUtils.closeQuietly(is);
}
@@ -152,6 +122,8 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
if (thrown != null) {
if (thrown instanceof Error) {
throw (Error) thrown;
+ } else if (thrown instanceof SecurityException) {
+ throw (SecurityException)thrown;
} else {
return false;
}
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java b/tika-batch/src/main/java/org/apache/tika/batch/fs/StreamOutRPWFSConsumer.java
similarity index 53%
copy from tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
copy to tika-batch/src/main/java/org/apache/tika/batch/fs/StreamOutRPWFSConsumer.java
index 259157f..018c1a9 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/fs/StreamOutRPWFSConsumer.java
@@ -1,5 +1,3 @@
-package org.apache.tika.batch.fs;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,13 +15,9 @@ package org.apache.tika.batch.fs;
* limitations under the License.
*/
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+package org.apache.tika.batch.fs;
+
+
import org.apache.commons.io.IOUtils;
import org.apache.tika.batch.FileResource;
@@ -32,23 +26,29 @@ import org.apache.tika.batch.ParserFactory;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.metadata.serialization.JsonStreamingSerializer;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.apache.tika.utils.ExceptionUtils;
-import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ArrayBlockingQueue;
/**
- * Basic FileResourceConsumer that reads files from an input
- * directory and writes content to the output directory.
- * <p/>
- * This tries to catch most of the common exceptions, log them and
- * store them in the metadata list output.
+ * This uses the {@link JsonStreamingSerializer} to write out a
+ * single metadata object at a time.
*/
-public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
+public class StreamOutRPWFSConsumer extends AbstractFSConsumer {
private final Parser parser;
private final ContentHandlerFactory contentHandlerFactory;
@@ -56,33 +56,14 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
private String outputEncoding = "UTF-8";
- /**
- * @deprecated use {@link RecursiveParserWrapperFSConsumer#RecursiveParserWrapperFSConsumer(ArrayBlockingQueue, Parser, ContentHandlerFactory, OutputStreamFactory)}
- * @param queue
- * @param parserFactory
- * @param contentHandlerFactory
- * @param fsOSFactory
- * @param config
- */
- public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
- ParserFactory parserFactory,
- ContentHandlerFactory contentHandlerFactory,
- OutputStreamFactory fsOSFactory, TikaConfig config) {
- super(queue);
- this.contentHandlerFactory = contentHandlerFactory;
- this.fsOSFactory = fsOSFactory;
- Parser parserToWrap = parserFactory.getParser(config);
- this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
- }
-
- public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
- Parser parserToWrap,
- ContentHandlerFactory contentHandlerFactory,
- OutputStreamFactory fsOSFactory) {
+ public StreamOutRPWFSConsumer(ArrayBlockingQueue<FileResource> queue,
+ Parser parser,
+ ContentHandlerFactory contentHandlerFactory,
+ OutputStreamFactory fsOSFactory) {
super(queue);
this.contentHandlerFactory = contentHandlerFactory;
this.fsOSFactory = fsOSFactory;
- this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
+ this.parser = parser;
}
@Override
@@ -108,55 +89,37 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
return false;
}
- Throwable thrown = null;
- List<Metadata> metadataList = null;
Metadata containerMetadata = fileResource.getMetadata();
- RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(contentHandlerFactory, -1);
+ JsonStreamingSerializer writer = new JsonStreamingSerializer(
+ new OutputStreamWriter(os, StandardCharsets.UTF_8));
+
+ WriteoutRPWHandler handler = new WriteoutRPWHandler(contentHandlerFactory, writer);
+ Throwable thrown = null;
try {
parse(fileResource.getResourceId(), parser, is, handler,
containerMetadata, context);
- metadataList = handler.getMetadataList();
} catch (Throwable t) {
thrown = t;
- metadataList = handler.getMetadataList();
- if (metadataList == null) {
- metadataList = new LinkedList<>();
- }
- Metadata m = null;
- if (metadataList.size() == 0) {
- m = containerMetadata;
- } else {
- //take the top metadata item
- m = metadataList.remove(0);
- }
- String stackTrace = ExceptionUtils.getFilteredStackTrace(t);
- m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
- metadataList.add(0, m);
} finally {
- IOUtils.closeQuietly(is);
- }
-
- Writer writer = null;
-
- try {
- writer = new OutputStreamWriter(os, getOutputEncoding());
- JsonMetadataList.toJson(metadataList, writer);
- } catch (Exception e) {
- //this is a stop the world kind of thing
- LOG.error("{}", getXMLifiedLogMsg(IO_OS + "json", fileResource.getResourceId(), e));
- throw new RuntimeException(e);
- } finally {
- flushAndClose(writer);
+ try {
+ writer.close();
+ } catch (IOException e) {
+ //this is a stop the world kind of thing
+ LOG.error("{}", getXMLifiedLogMsg(IO_OS + "json", fileResource.getResourceId(), e));
+ throw new RuntimeException(e);
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
}
-
if (thrown != null) {
if (thrown instanceof Error) {
throw (Error) thrown;
+ } else if (thrown instanceof SecurityException) {
+ throw (SecurityException)thrown;
} else {
return false;
}
}
-
return true;
}
@@ -167,4 +130,32 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
public void setOutputEncoding(String outputEncoding) {
this.outputEncoding = outputEncoding;
}
+
+ //extend AbstractRPWH instead of RecursiveParserWrapperHandler so that
+ //if we use the ForkParser, the output will not have to be streamed
+ //back to the proxy, but can
+ //be written straight to disk.
+ private class WriteoutRPWHandler extends AbstractRecursiveParserWrapperHandler {
+ private final JsonStreamingSerializer jsonWriter;
+
+ public WriteoutRPWHandler(ContentHandlerFactory contentHandlerFactory, JsonStreamingSerializer writer) {
+ super(contentHandlerFactory);
+ this.jsonWriter = writer;
+ }
+
+ @Override
+ public void endEmbeddedDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+ metadata.add(RecursiveParserWrapperHandler.TIKA_CONTENT, contentHandler.toString());
+ try {
+ jsonWriter.add(metadata);
+ } catch (IOException e) {
+ throw new SAXException(e);
+ }
+ }
+
+ @Override
+ public void endDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+ endEmbeddedDocument(contentHandler, metadata);
+ }
+ }
}
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java b/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
index d55f3be..88171ee 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
@@ -40,8 +40,10 @@ import org.apache.tika.batch.fs.FSConsumersManager;
import org.apache.tika.batch.fs.FSOutputStreamFactory;
import org.apache.tika.batch.fs.FSUtil;
import org.apache.tika.batch.fs.RecursiveParserWrapperFSConsumer;
+import org.apache.tika.batch.fs.StreamOutRPWFSConsumer;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.util.ClassLoaderUtil;
@@ -68,6 +70,17 @@ public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder {
}
}
+ boolean streamOut = false;
+ String streamOutString = runtimeAttributes.get("streamOut");
+ if (streamOutString != null){
+ streamOut = PropsUtil.getBoolean(streamOutString, streamOut);
+ } else {
+ Node streamOutNode = node.getAttributes().getNamedItem("streamout");
+ if (streamOutNode != null) {
+ streamOut = PropsUtil.getBoolean(streamOutNode.getNodeValue(), streamOut);
+ }
+ }
+
//how long to let the consumersManager run on init() and shutdown()
Long consumersManagerMaxMillis = null;
String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis");
@@ -132,9 +145,16 @@ public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder {
contentHandlerFactory, recursiveParserWrapper);
Parser parser = parserFactory.getParser(config);
if (recursiveParserWrapper) {
+ parser = new RecursiveParserWrapper(parser);
for (int i = 0; i < numConsumers; i++) {
- FileResourceConsumer c = new RecursiveParserWrapperFSConsumer(queue,
- parser, contentHandlerFactory, outputStreamFactory);
+ FileResourceConsumer c = null;
+ if (streamOut){
+ c = new StreamOutRPWFSConsumer(queue,
+ parser, contentHandlerFactory, outputStreamFactory);
+ } else {
+ c = new RecursiveParserWrapperFSConsumer(queue,
+ parser, contentHandlerFactory, outputStreamFactory);
+ }
consumers.add(c);
}
} else {
diff --git a/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml b/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
index 1b71152..51cbe69 100644
--- a/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
+++ b/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
@@ -63,6 +63,7 @@
description="output directory for output"/> <!-- do we want to make this mandatory -->
<option opt="recursiveParserWrapper"
description="use the RecursiveParserWrapper or not (default = false)"/>
+ <option opt="streamOut" description="stream the output of the RecursiveParserWrapper (default = false)"/>
<option opt="handleExisting" hasArg="true"
description="if an output file already exists, do you want to: overwrite, rename or skip"/>
<option opt="basicHandlerType" hasArg="true"
@@ -77,6 +78,7 @@
description="regex that specifies which files to avoid processing"/>
<option opt="reporterSleepMillis" hasArg="true"
description="millisecond between reports by the reporter"/>
+
</commandline>
@@ -108,6 +110,8 @@
<!--
To wrap parser in RecursiveParserWrapper (tika-app's -J or tika-server's /rmeta),
add attribute recursiveParserWrapper="true" to consumers element.
+ To stream the output of the RecursiveParserWrapper set "streamout" = true
+ in consumers element.
-->
<consumers builderClass="org.apache.tika.batch.fs.builders.BasicTikaFSConsumersBuilder"
recursiveParserWrapper="false" consumersManagerMaxMillis="60000">
diff --git a/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java b/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java
index 32128b2..44629ac 100644
--- a/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java
+++ b/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java
@@ -35,7 +35,6 @@ import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
-import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
@@ -72,7 +71,7 @@ public class RecursiveParserWrapperFSConsumerTest extends TikaTest {
queue.add(new PoisonFileResource());
MockOSFactory mockOSFactory = new MockOSFactory();
- Parser p = new AutoDetectParserFactory().getParser(new TikaConfig());
+ Parser p = new RecursiveParserWrapper(new AutoDetectParserFactory().getParser(new TikaConfig()));
RecursiveParserWrapperFSConsumer consumer = new RecursiveParserWrapperFSConsumer(
queue, p, new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
mockOSFactory);
@@ -120,7 +119,7 @@ public class RecursiveParserWrapperFSConsumerTest extends TikaTest {
queue.add(new PoisonFileResource());
MockOSFactory mockOSFactory = new MockOSFactory();
- Parser p = new AutoDetectParserFactory().getParser(new TikaConfig());
+ Parser p = new RecursiveParserWrapper(new AutoDetectParserFactory().getParser(new TikaConfig()));
RecursiveParserWrapperFSConsumer consumer = new RecursiveParserWrapperFSConsumer(
queue, p, new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
mockOSFactory);
diff --git a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
index be0bf0a..434bb4e 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
@@ -26,6 +26,7 @@ import org.apache.tika.mime.MediaType;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.ParserUtils;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
@@ -37,10 +38,10 @@ import java.util.Set;
/**
* This is a helper class that wraps a parser in a recursive handler.
- * It takes care of setting the embedded parser in the ParseContext
+ * It takes care of setting the embedded parser in the ParseContext
* and handling the embedded path calculations.
* <p>
- * After parsing a document, call getMetadata() to retrieve a list of
+ * After parsing a document, call getMetadata() to retrieve a list of
* Metadata objects, one for each embedded resource. The first item
* in the list will contain the Metadata for the outer container file.
* <p>
@@ -50,18 +51,18 @@ import java.util.Set;
* <p>
* If a WriteLimitReachedException is encountered, the wrapper will stop
* processing the current resource, and it will not process
- * any of the child resources for the given resource. However, it will try to
- * parse as much as it can. If a WLRE is reached in the parent document,
+ * any of the child resources for the given resource. However, it will try to
+ * parse as much as it can. If a WLRE is reached in the parent document,
* no child resources will be parsed.
* <p>
* The implementation is based on Jukka's RecursiveMetadataParser
- * and Nick's additions. See:
+ * and Nick's additions. See:
* <a href="http://wiki.apache.org/tika/RecursiveMetadata#Jukka.27s_RecursiveMetadata_Parser">RecursiveMetadataParser</a>.
* <p>
* Note that this wrapper holds all data in memory and is not appropriate
* for files with content too large to be held in memory.
* <p>
- * Note, too, that this wrapper is not thread safe because it stores state.
+ * Note, too, that this wrapper is not thread safe because it stores state.
* The client must initialize a new wrapper for each thread, and the client
* is responsible for calling {@link #reset()} after each parse.
* <p>
@@ -69,7 +70,7 @@ import java.util.Set;
* </p>
*/
public class RecursiveParserWrapper extends ParserDecorator {
-
+
/**
* Generated serial version
*/
@@ -192,7 +193,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
/**
* Acts like a regular parser except it ignores the ContentHandler
- * and it automatically sets/overwrites the embedded Parser in the
+ * and it automatically sets/overwrites the embedded Parser in the
* ParseContext object.
* <p>
* To retrieve the results of the parse, use {@link #getMetadata()}.
@@ -201,7 +202,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
*/
@Override
public void parse(InputStream stream, ContentHandler recursiveParserWrapperHandler,
- Metadata metadata, ParseContext context) throws IOException,
+ Metadata metadata, ParseContext context) throws IOException,
SAXException, TikaException {
//this tracks the state of the parent parser, per call to #parse
//in future versions, we can remove lastParseState, and this will be thread-safe
@@ -225,6 +226,12 @@ public class RecursiveParserWrapper extends ParserDecorator {
throw e;
}
metadata.set(RecursiveParserWrapperHandler.WRITE_LIMIT_REACHED, "true");
+ } catch (Throwable e) {
+ //try our best to record the problem in the metadata object
+ //then rethrow
+ String stackTrace = ExceptionUtils.getFilteredStackTrace(e);
+ metadata.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
+ throw e;
} finally {
long elapsedMillis = System.currentTimeMillis() - started;
metadata.set(RecursiveParserWrapperHandler.PARSE_TIME_MILLIS, Long.toString(elapsedMillis));
@@ -259,7 +266,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
* Set the maximum number of embedded resources to store.
* If the max is hit during parsing, the {@link #EMBEDDED_RESOURCE_LIMIT_REACHED}
* property will be added to the container document's Metadata.
- *
+ *
* <p>
* If this value is < 0 (the default), the wrapper will store all Metadata.
* @deprecated set this on a {@link RecursiveParserWrapperHandler}
@@ -269,7 +276,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
public void setMaxEmbeddedResources(int max) {
maxEmbeddedResources = max;
}
-
+
/**
* This clears the last parser state (metadata list, unknown count, hit embeddedresource count)
@@ -286,10 +293,10 @@ public class RecursiveParserWrapper extends ParserDecorator {
throw new IllegalStateException("This is deprecated; please use a RecursiveParserWrapperHandler instead");
}
}
-
+
/**
- * Copied/modified from WriteOutContentHandler. Couldn't make that
- * static, and we need to have something that will work
+ * Copied/modified from WriteOutContentHandler. Couldn't make that
+ * static, and we need to have something that will work
* with exceptions thrown from both BodyContentHandler and WriteOutContentHandler
* @param t
* @return
@@ -318,27 +325,27 @@ public class RecursiveParserWrapper extends ParserDecorator {
return objectName;
}
-
+
private class EmbeddedParserDecorator extends ParserDecorator {
-
+
private static final long serialVersionUID = 207648200464263337L;
-
+
private String location = null;
private final ParserState parserState;
-
+
private EmbeddedParserDecorator(Parser parser, String location, ParserState parseState) {
super(parser);
this.location = location;
if (! this.location.endsWith("/")) {
- this.location += "/";
+ this.location += "/";
}
this.parserState = parseState;
}
@Override
public void parse(InputStream stream, ContentHandler ignore,
- Metadata metadata, ParseContext context) throws IOException,
+ Metadata metadata, ParseContext context) throws IOException,
SAXException, TikaException {
//Test to see if we should avoid parsing
if (parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
@@ -347,7 +354,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
// Work out what this thing is
String objectName = getResourceName(metadata, parserState);
String objectLocation = this.location + objectName;
-
+
metadata.add(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH, objectLocation);
@@ -380,16 +387,9 @@ public class RecursiveParserWrapper extends ParserDecorator {
} finally {
context.set(Parser.class, preContextParser);
long elapsedMillis = System.currentTimeMillis() - started;
- metadata.set(PARSE_TIME_MILLIS, Long.toString(elapsedMillis));
- }
-
- //Because of recursion, we need
- //to re-test to make sure that we limit the
- //number of stored resources
- if (parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
- return;
+ metadata.set(RecursiveParserWrapperHandler.PARSE_TIME_MILLIS, Long.toString(elapsedMillis));
+ parserState.recursiveParserWrapperHandler.endEmbeddedDocument(localHandler, metadata);
}
- parserState.recursiveParserWrapperHandler.endEmbeddedDocument(localHandler, metadata);
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
index 11f7ff6..d53f18e 100644
--- a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
+++ b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
@@ -111,7 +111,7 @@ public abstract class AbstractRecursiveParserWrapperHandler extends DefaultHandl
* @return whether this handler has hit the maximum embedded resources during the parse
*/
public boolean hasHitMaximumEmbeddedResources() {
- if (maxEmbeddedResources > -1 && embeddedResources > maxEmbeddedResources) {
+ if (maxEmbeddedResources > -1 && embeddedResources >= maxEmbeddedResources) {
return true;
}
return false;
diff --git a/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java b/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java
index 2598c99..5e238c3 100644
--- a/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java
+++ b/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.utils;
+
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.Property;
import org.apache.tika.metadata.TikaCoreProperties;
@@ -78,13 +79,11 @@ public class ParserUtils {
* {@link Exception} wasn't immediately thrown (eg when several different
* Parsers are used)
*/
- public static void recordParserFailure(Parser parser, Exception failure,
+ public static void recordParserFailure(Parser parser, Throwable failure,
Metadata metadata) {
String trace = ExceptionUtils.getStackTrace(failure);
metadata.add(EMBEDDED_EXCEPTION, trace);
metadata.add(EMBEDDED_PARSER, getParserClassname(parser));
}
-
-
-}
+ }
diff --git a/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java b/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java
index 6acd190..ff6f8ef 100644
--- a/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java
+++ b/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java
@@ -151,7 +151,7 @@ public class RecursiveParserWrapperTest {
list = wrapper.getMetadata();
//add 1 for outer container file
- assertEquals(maxEmbedded, list.size());
+ assertEquals(maxEmbedded+1, list.size());
limitReached = list.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED);
assertEquals("true", limitReached);
@@ -205,9 +205,8 @@ public class RecursiveParserWrapperTest {
new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1), maxEmbedded);
wrapper.parse(stream, handler, metadata, context);
list = handler.getMetadataList();
-
//add 1 for outer container file
- assertEquals(maxEmbedded, list.size());
+ assertEquals(maxEmbedded+1, list.size());
limitReached = list.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED);
assertEquals("true", limitReached);
@@ -282,7 +281,7 @@ public class RecursiveParserWrapperTest {
//Composite parser swallows caught TikaExceptions, IOExceptions and SAXExceptions
//and just doesn't bother to report that there was an exception.
- assertEquals(12, list.size());
+ assertEquals(13, list.size());
}
@Test
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
index a832231..0fd8098 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
@@ -28,6 +28,7 @@ import com.google.gson.JsonIOException;
import com.google.gson.reflect.TypeToken;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
public class JsonMetadataList extends JsonMetadataBase {
@@ -70,6 +71,20 @@ public class JsonMetadataList extends JsonMetadataBase {
//covers both io and parse exceptions
throw new TikaException(e.getMessage());
}
+ if (ms == null) {
+ return null;
+ }
+ //if the last object is the main document,
+ //as happens with the streaming serializer,
+ //flip it to be the first element.
+ if (ms.size() > 1) {
+ Metadata last = ms.get(ms.size()-1);
+ String embResourcePath = last.get(RecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH);
+ if (embResourcePath == null &&
+ ms.get(0).get(RecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH) != null) {
+ ms.add(0, ms.remove(ms.size()-1));
+ }
+ }
return ms;
}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonStreamingSerializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonStreamingSerializer.java
new file mode 100644
index 0000000..f01df6f
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonStreamingSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import com.google.gson.stream.JsonWriter;
+import org.apache.tika.metadata.Metadata;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Arrays;
+
+
+public class JsonStreamingSerializer implements AutoCloseable {
+
+ private final JsonWriter jsonWriter;
+ boolean hasStartedArray = false;
+ public JsonStreamingSerializer(Writer writer) {
+ this.jsonWriter = new JsonWriter(writer);
+ }
+
+ public void add(Metadata metadata) throws IOException {
+ if (!hasStartedArray) {
+ jsonWriter.beginArray();
+ hasStartedArray = true;
+ }
+ String[] names = metadata.names();
+ Arrays.sort(names);
+ jsonWriter.beginObject();
+ for (String n : names) {
+ jsonWriter.name(n);
+ String[] values = metadata.getValues(n);
+ if (values.length == 1) {
+ jsonWriter.value(values[0]);
+ } else {
+ jsonWriter.beginArray();
+ for (String v : values) {
+ jsonWriter.value(v);
+ }
+ jsonWriter.endArray();
+ }
+ }
+ jsonWriter.endObject();
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonWriter.endArray();
+ jsonWriter.flush();
+ jsonWriter.close();
+ }
+}
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
index 0553f78..427e598 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
@@ -21,12 +21,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.junit.Test;
public class JsonMetadataListTest {
@@ -57,6 +60,16 @@ public class JsonMetadataListTest {
JsonMetadataList.toJson(metadataList, writer);
List<Metadata> deserialized = JsonMetadataList.fromJson(new StringReader(writer.toString()));
assertEquals(metadataList, deserialized);
+
+ //now test streaming serializer
+ writer = new StringWriter();
+ try(JsonStreamingSerializer streamingSerializer = new JsonStreamingSerializer(writer)) {
+ streamingSerializer.add(m1);
+ streamingSerializer.add(m2);
+ }
+ deserialized = JsonMetadataList.fromJson(new StringReader(writer.toString()));
+ assertEquals(metadataList, deserialized);
+
}
@Test
@@ -120,4 +133,37 @@ public class JsonMetadataListTest {
JsonMetadataList.toJson(metadataList, writer);
assertTrue(writer.toString().startsWith("[{\"tika:content\":\"this is the content\",\"zk1\":[\"v1\",\"v2\","));
}
+
+ @Test
+ public void testSwitchingOrderOfMainDoc() throws Exception {
+ Metadata m1 = new Metadata();
+ m1.add("k1", "v1");
+ m1.add("k1", "v2");
+ m1.add("k1", "v3");
+ m1.add("k1", "v4");
+ m1.add("k1", "v4");
+ m1.add("k2", "v1");
+ m1.add(RecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH, "/embedded-1");
+
+ Metadata m2 = new Metadata();
+ m2.add("k3", "v1");
+ m2.add("k3", "v2");
+ m2.add("k3", "v3");
+ m2.add("k3", "v4");
+ m2.add("k3", "v4");
+ m2.add("k4", "v1");
+
+ List<Metadata> truth = new ArrayList<>();
+ truth.add(m2);
+ truth.add(m1);
+ StringWriter stringWriter = new StringWriter();
+ try(JsonStreamingSerializer serializer = new JsonStreamingSerializer(stringWriter)) {
+ serializer.add(m1);
+ serializer.add(m2);
+ }
+ Reader reader = new StringReader(stringWriter.toString());
+ List<Metadata> deserialized = JsonMetadataList.fromJson(reader);
+ assertEquals(truth, deserialized);
+
+ }
}
--
To stop receiving notification emails like this one, please contact
tallison@apache.org.