You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/06/07 20:24:05 UTC

[tika] branch branch_1x updated: TIKA-2662 add streaming json serializer

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch branch_1x
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/branch_1x by this push:
     new 90e3387  TIKA-2662 add streaming json serializer
90e3387 is described below

commit 90e3387883ae36eedc0a1375132949f234b20749
Author: tballison <ta...@mitre.org>
AuthorDate: Thu Jun 7 15:58:26 2018 -0400

    TIKA-2662 add streaming json serializer
---
 .../src/main/resources/tika-app-batch-config.xml   |   1 +
 .../tika/cli/TikaCLIBatchIntegrationTest.java      |  74 +++++++++++
 .../batch/fs/RecursiveParserWrapperFSConsumer.java |  46 ++-----
 ...FSConsumer.java => StreamOutRPWFSConsumer.java} | 141 ++++++++++-----------
 .../fs/builders/BasicTikaFSConsumersBuilder.java   |  24 +++-
 .../tika/batch/fs/default-tika-batch-config.xml    |   4 +
 .../RecursiveParserWrapperFSConsumerTest.java      |   5 +-
 .../apache/tika/parser/RecursiveParserWrapper.java |  60 ++++-----
 .../sax/AbstractRecursiveParserWrapperHandler.java |   2 +-
 .../java/org/apache/tika/utils/ParserUtils.java    |   7 +-
 .../tika/parser/RecursiveParserWrapperTest.java    |   7 +-
 .../metadata/serialization/JsonMetadataList.java   |  15 +++
 .../serialization/JsonStreamingSerializer.java     |  65 ++++++++++
 .../serialization/JsonMetadataListTest.java        |  46 +++++++
 14 files changed, 341 insertions(+), 156 deletions(-)

diff --git a/tika-app/src/main/resources/tika-app-batch-config.xml b/tika-app/src/main/resources/tika-app-batch-config.xml
index 99651a1..12556c7 100644
--- a/tika-app/src/main/resources/tika-app-batch-config.xml
+++ b/tika-app/src/main/resources/tika-app-batch-config.xml
@@ -63,6 +63,7 @@
                 description="output directory for output"/> <!-- do we want to make this mandatory -->
         <option opt="recursiveParserWrapper"
                 description="use the RecursiveParserWrapper or not (default = false)"/>
+        <option opt="streamOut" description="stream the output of the RecursiveParserWrapper (default = false)"/>
         <option opt="handleExisting" hasArg="true"
                 description="if an output file already exists, do you want to: overwrite, rename or skip"/>
         <option opt="basicHandlerType" hasArg="true"
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java
index 60c6d6b..50663f1 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIBatchIntegrationTest.java
@@ -20,25 +20,41 @@ package org.apache.tika.cli;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.Reader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.logging.Handler;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.metadata.serialization.JsonStreamingSerializer;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
 
 public class TikaCLIBatchIntegrationTest {
 
@@ -114,6 +130,27 @@ public class TikaCLIBatchIntegrationTest {
     }
 
     @Test
+    public void testStreamingJsonRecursiveBatchIntegration() throws Exception {
+        String[] params = {"-i", testInputDirForCommandLine,
+                "-o", tempOutputDirForCommandLine,
+                "-numConsumers", "10",
+                "-J", //recursive Json
+                "-t", //plain text in content
+                "-streamOut"
+        };
+        TikaCLI.main(params);
+
+        Path jsonFile = tempOutputDir.resolve("test_recursive_embedded.docx.json");
+        try (Reader reader = Files.newBufferedReader(jsonFile, UTF_8)) {
+            List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
+            assertEquals(12, metadataList.size());
+            assertTrue(metadataList.get(6).get(AbstractRecursiveParserWrapperHandler.TIKA_CONTENT).contains("human events"));
+            //test that the last written object has been bumped to the first by JsonMetadataList.fromJson()
+            assertNull( metadataList.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH));
+        }
+    }
+
+    @Test
     public void testProcessLogFileConfig() throws Exception {
         String[] params = {"-i", testInputDirForCommandLine,
                 "-o", tempOutputDirForCommandLine,
@@ -171,5 +208,42 @@ public class TikaCLIBatchIntegrationTest {
                 Files.isRegularFile(path));
     }
 
+    @Test
+    public void oneOff() throws Exception {
+        Parser p = new AutoDetectParser();
+        RecursiveParserWrapper w = new RecursiveParserWrapper(p);
+        try (JsonStreamingSerializer writer = new JsonStreamingSerializer(new BufferedWriter(new OutputStreamWriter(
+                Files.newOutputStream(Paths.get("C:/data/tika_tmp.json")), StandardCharsets.UTF_8)))) {
+            ContentHandler contentHandler = new WriteoutRPWHandler(
+                    new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
+                    writer);
+            try (InputStream is = getClass().getResourceAsStream("/test-data/test_recursive_embedded.docx")) {
+                w.parse(is, contentHandler, new Metadata(), new ParseContext());
+            }
+        }
+    }
+
+    private class WriteoutRPWHandler extends AbstractRecursiveParserWrapperHandler {
+        private final JsonStreamingSerializer jsonWriter;
 
+        public WriteoutRPWHandler(ContentHandlerFactory contentHandlerFactory, JsonStreamingSerializer writer) {
+            super(contentHandlerFactory);
+            this.jsonWriter = writer;
+        }
+
+        @Override
+        public void endEmbeddedDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+            metadata.add(RecursiveParserWrapperHandler.TIKA_CONTENT, contentHandler.toString());
+            try {
+                jsonWriter.add(metadata);
+            } catch (IOException e) {
+                throw new SAXException(e);
+            }
+        }
+
+        @Override
+        public void endDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+            endEmbeddedDocument(contentHandler, metadata);
+        }
+    }
 }
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java b/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
index 259157f..56b8b58 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
@@ -42,11 +42,8 @@ import org.apache.tika.utils.ExceptionUtils;
 import org.xml.sax.helpers.DefaultHandler;
 
 /**
- * Basic FileResourceConsumer that reads files from an input
- * directory and writes content to the output directory.
- * <p/>
- * This tries to catch most of the common exceptions, log them and
- * store them in the metadata list output.
+ * This runs a RecursiveParserWrapper against an input file
+ * and outputs the json metadata to an output file.
  */
 public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
 
@@ -55,34 +52,21 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
     private final OutputStreamFactory fsOSFactory;
     private String outputEncoding = "UTF-8";
 
-
     /**
-     * @deprecated use {@link RecursiveParserWrapperFSConsumer#RecursiveParserWrapperFSConsumer(ArrayBlockingQueue, Parser, ContentHandlerFactory, OutputStreamFactory)}
+     *
      * @param queue
-     * @param parserFactory
+     * @param parser -- must be RecursiveParserWrapper or a ForkParser that wraps a RecursiveParserWrapper
      * @param contentHandlerFactory
      * @param fsOSFactory
-     * @param config
      */
     public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
-                                            ParserFactory parserFactory,
-                                            ContentHandlerFactory contentHandlerFactory,
-                                            OutputStreamFactory fsOSFactory, TikaConfig config) {
-        super(queue);
-        this.contentHandlerFactory = contentHandlerFactory;
-        this.fsOSFactory = fsOSFactory;
-        Parser parserToWrap = parserFactory.getParser(config);
-        this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
-    }
-
-    public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
-                                            Parser parserToWrap,
+                                            Parser parser,
                                             ContentHandlerFactory contentHandlerFactory,
                                             OutputStreamFactory fsOSFactory) {
         super(queue);
         this.contentHandlerFactory = contentHandlerFactory;
         this.fsOSFactory = fsOSFactory;
-        this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
+        this.parser = parser;
     }
 
     @Override
@@ -115,24 +99,10 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
         try {
             parse(fileResource.getResourceId(), parser, is, handler,
                     containerMetadata, context);
-            metadataList = handler.getMetadataList();
         } catch (Throwable t) {
             thrown = t;
-            metadataList = handler.getMetadataList();
-            if (metadataList == null) {
-                metadataList = new LinkedList<>();
-            }
-            Metadata m = null;
-            if (metadataList.size() == 0) {
-                m = containerMetadata;
-            } else {
-                //take the top metadata item
-                m = metadataList.remove(0);
-            }
-            String stackTrace = ExceptionUtils.getFilteredStackTrace(t);
-            m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
-            metadataList.add(0, m);
         } finally {
+            metadataList = handler.getMetadataList();
             IOUtils.closeQuietly(is);
         }
 
@@ -152,6 +122,8 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
         if (thrown != null) {
             if (thrown instanceof Error) {
                 throw (Error) thrown;
+            } else if (thrown instanceof SecurityException) {
+                throw (SecurityException)thrown;
             } else {
                 return false;
             }
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java b/tika-batch/src/main/java/org/apache/tika/batch/fs/StreamOutRPWFSConsumer.java
similarity index 53%
copy from tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
copy to tika-batch/src/main/java/org/apache/tika/batch/fs/StreamOutRPWFSConsumer.java
index 259157f..018c1a9 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/fs/StreamOutRPWFSConsumer.java
@@ -1,5 +1,3 @@
-package org.apache.tika.batch.fs;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,13 +15,9 @@ package org.apache.tika.batch.fs;
  * limitations under the License.
  */
 
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+package org.apache.tika.batch.fs;
+
+
 
 import org.apache.commons.io.IOUtils;
 import org.apache.tika.batch.FileResource;
@@ -32,23 +26,29 @@ import org.apache.tika.batch.ParserFactory;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.metadata.serialization.JsonStreamingSerializer;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
 import org.apache.tika.sax.ContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
 import org.apache.tika.utils.ExceptionUtils;
-import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ArrayBlockingQueue;
 
 /**
- * Basic FileResourceConsumer that reads files from an input
- * directory and writes content to the output directory.
- * <p/>
- * This tries to catch most of the common exceptions, log them and
- * store them in the metadata list output.
+ * This uses the {@link JsonStreamingSerializer} to write out a
+ * single metadata object at a time.
  */
-public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
+public class StreamOutRPWFSConsumer extends AbstractFSConsumer {
 
     private final Parser parser;
     private final ContentHandlerFactory contentHandlerFactory;
@@ -56,33 +56,14 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
     private String outputEncoding = "UTF-8";
 
 
-    /**
-     * @deprecated use {@link RecursiveParserWrapperFSConsumer#RecursiveParserWrapperFSConsumer(ArrayBlockingQueue, Parser, ContentHandlerFactory, OutputStreamFactory)}
-     * @param queue
-     * @param parserFactory
-     * @param contentHandlerFactory
-     * @param fsOSFactory
-     * @param config
-     */
-    public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
-                                            ParserFactory parserFactory,
-                                            ContentHandlerFactory contentHandlerFactory,
-                                            OutputStreamFactory fsOSFactory, TikaConfig config) {
-        super(queue);
-        this.contentHandlerFactory = contentHandlerFactory;
-        this.fsOSFactory = fsOSFactory;
-        Parser parserToWrap = parserFactory.getParser(config);
-        this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
-    }
-
-    public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
-                                            Parser parserToWrap,
-                                            ContentHandlerFactory contentHandlerFactory,
-                                            OutputStreamFactory fsOSFactory) {
+    public StreamOutRPWFSConsumer(ArrayBlockingQueue<FileResource> queue,
+                                  Parser parser,
+                                  ContentHandlerFactory contentHandlerFactory,
+                                  OutputStreamFactory fsOSFactory) {
         super(queue);
         this.contentHandlerFactory = contentHandlerFactory;
         this.fsOSFactory = fsOSFactory;
-        this.parser = new RecursiveParserWrapper(parserToWrap, contentHandlerFactory);
+        this.parser = parser;
     }
 
     @Override
@@ -108,55 +89,37 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
             return false;
         }
 
-        Throwable thrown = null;
-        List<Metadata> metadataList = null;
         Metadata containerMetadata = fileResource.getMetadata();
-        RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(contentHandlerFactory, -1);
+        JsonStreamingSerializer writer = new JsonStreamingSerializer(
+                new OutputStreamWriter(os, StandardCharsets.UTF_8));
+
+        WriteoutRPWHandler handler = new WriteoutRPWHandler(contentHandlerFactory, writer);
+        Throwable thrown = null;
         try {
             parse(fileResource.getResourceId(), parser, is, handler,
                     containerMetadata, context);
-            metadataList = handler.getMetadataList();
         } catch (Throwable t) {
             thrown = t;
-            metadataList = handler.getMetadataList();
-            if (metadataList == null) {
-                metadataList = new LinkedList<>();
-            }
-            Metadata m = null;
-            if (metadataList.size() == 0) {
-                m = containerMetadata;
-            } else {
-                //take the top metadata item
-                m = metadataList.remove(0);
-            }
-            String stackTrace = ExceptionUtils.getFilteredStackTrace(t);
-            m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
-            metadataList.add(0, m);
         } finally {
-            IOUtils.closeQuietly(is);
-        }
-
-        Writer writer = null;
-
-        try {
-            writer = new OutputStreamWriter(os, getOutputEncoding());
-            JsonMetadataList.toJson(metadataList, writer);
-        } catch (Exception e) {
-            //this is a stop the world kind of thing
-            LOG.error("{}", getXMLifiedLogMsg(IO_OS + "json", fileResource.getResourceId(), e));
-            throw new RuntimeException(e);
-        } finally {
-            flushAndClose(writer);
+            try {
+                writer.close();
+            } catch (IOException e) {
+                //this is a stop the world kind of thing
+                LOG.error("{}", getXMLifiedLogMsg(IO_OS + "json", fileResource.getResourceId(), e));
+                throw new RuntimeException(e);
+            } finally {
+                IOUtils.closeQuietly(is);
+            }
         }
-
         if (thrown != null) {
             if (thrown instanceof Error) {
                 throw (Error) thrown;
+            } else if (thrown instanceof SecurityException) {
+                throw (SecurityException)thrown;
             } else {
                 return false;
             }
         }
-
         return true;
     }
 
@@ -167,4 +130,32 @@ public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
     public void setOutputEncoding(String outputEncoding) {
         this.outputEncoding = outputEncoding;
     }
+
+    //extend AbstractRPWH instead of RecursiveParserWrapperHandler so that
+    //if we use the ForkParser, the output will not have to be streamed
+    //back to the proxy, but can
+    //be written straight to disk.
+    private class WriteoutRPWHandler extends AbstractRecursiveParserWrapperHandler {
+        private final JsonStreamingSerializer jsonWriter;
+
+        public WriteoutRPWHandler(ContentHandlerFactory contentHandlerFactory, JsonStreamingSerializer writer) {
+            super(contentHandlerFactory);
+            this.jsonWriter = writer;
+        }
+
+        @Override
+        public void endEmbeddedDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+            metadata.add(RecursiveParserWrapperHandler.TIKA_CONTENT, contentHandler.toString());
+            try {
+                jsonWriter.add(metadata);
+            } catch (IOException e) {
+                throw new SAXException(e);
+            }
+        }
+
+        @Override
+        public void endDocument(ContentHandler contentHandler, Metadata metadata) throws SAXException {
+            endEmbeddedDocument(contentHandler, metadata);
+        }
+    }
 }
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java b/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
index d55f3be..88171ee 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
@@ -40,8 +40,10 @@ import org.apache.tika.batch.fs.FSConsumersManager;
 import org.apache.tika.batch.fs.FSOutputStreamFactory;
 import org.apache.tika.batch.fs.FSUtil;
 import org.apache.tika.batch.fs.RecursiveParserWrapperFSConsumer;
+import org.apache.tika.batch.fs.StreamOutRPWFSConsumer;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.ContentHandlerFactory;
 import org.apache.tika.util.ClassLoaderUtil;
@@ -68,6 +70,17 @@ public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder {
             }
         }
 
+        boolean streamOut = false;
+        String streamOutString = runtimeAttributes.get("streamOut");
+        if (streamOutString != null){
+            streamOut = PropsUtil.getBoolean(streamOutString, streamOut);
+        } else {
+            Node streamOutNode = node.getAttributes().getNamedItem("streamout");
+            if (streamOutNode != null) {
+                streamOut = PropsUtil.getBoolean(streamOutNode.getNodeValue(), streamOut);
+            }
+        }
+
         //how long to let the consumersManager run on init() and shutdown()
         Long consumersManagerMaxMillis = null;
         String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis");
@@ -132,9 +145,16 @@ public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder {
                 contentHandlerFactory, recursiveParserWrapper);
         Parser parser = parserFactory.getParser(config);
         if (recursiveParserWrapper) {
+            parser = new RecursiveParserWrapper(parser);
             for (int i = 0; i < numConsumers; i++) {
-                FileResourceConsumer c = new RecursiveParserWrapperFSConsumer(queue,
-                        parser, contentHandlerFactory, outputStreamFactory);
+                FileResourceConsumer c = null;
+                if (streamOut){
+                    c = new StreamOutRPWFSConsumer(queue,
+                            parser, contentHandlerFactory, outputStreamFactory);
+                } else {
+                    c = new RecursiveParserWrapperFSConsumer(queue,
+                            parser, contentHandlerFactory, outputStreamFactory);
+                }
                 consumers.add(c);
             }
         } else {
diff --git a/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml b/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
index 1b71152..51cbe69 100644
--- a/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
+++ b/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
@@ -63,6 +63,7 @@
                 description="output directory for output"/> <!-- do we want to make this mandatory -->
         <option opt="recursiveParserWrapper"
                 description="use the RecursiveParserWrapper or not (default = false)"/>
+        <option opt="streamOut" description="stream the output of the RecursiveParserWrapper (default = false)"/>
         <option opt="handleExisting" hasArg="true"
                 description="if an output file already exists, do you want to: overwrite, rename or skip"/>
         <option opt="basicHandlerType" hasArg="true"
@@ -77,6 +78,7 @@
                 description="regex that specifies which files to avoid processing"/>
         <option opt="reporterSleepMillis" hasArg="true"
                 description="millisecond between reports by the reporter"/>
+
     </commandline>
 
 
@@ -108,6 +110,8 @@
     <!--
         To wrap parser in RecursiveParserWrapper (tika-app's -J or tika-server's /rmeta),
         add attribute recursiveParserWrapper="true" to consumers element.
+        To stream the output of the RecursiveParserWrapper set "streamout" = true
+        in consumers element.
         -->
     <consumers builderClass="org.apache.tika.batch.fs.builders.BasicTikaFSConsumersBuilder"
                recursiveParserWrapper="false" consumersManagerMaxMillis="60000">
diff --git a/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java b/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java
index 32128b2..44629ac 100644
--- a/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java
+++ b/tika-batch/src/test/java/org/apache/tika/batch/RecursiveParserWrapperFSConsumerTest.java
@@ -35,7 +35,6 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
-import org.apache.tika.parser.AutoDetectParser;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
@@ -72,7 +71,7 @@ public class RecursiveParserWrapperFSConsumerTest extends TikaTest {
         queue.add(new PoisonFileResource());
 
         MockOSFactory mockOSFactory = new MockOSFactory();
-        Parser p = new AutoDetectParserFactory().getParser(new TikaConfig());
+        Parser p = new RecursiveParserWrapper(new AutoDetectParserFactory().getParser(new TikaConfig()));
         RecursiveParserWrapperFSConsumer consumer = new RecursiveParserWrapperFSConsumer(
                 queue, p, new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
                 mockOSFactory);
@@ -120,7 +119,7 @@ public class RecursiveParserWrapperFSConsumerTest extends TikaTest {
         queue.add(new PoisonFileResource());
 
         MockOSFactory mockOSFactory = new MockOSFactory();
-        Parser p = new AutoDetectParserFactory().getParser(new TikaConfig());
+        Parser p = new RecursiveParserWrapper(new AutoDetectParserFactory().getParser(new TikaConfig()));
         RecursiveParserWrapperFSConsumer consumer = new RecursiveParserWrapperFSConsumer(
                 queue, p, new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1),
                 mockOSFactory);
diff --git a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
index be0bf0a..434bb4e 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
@@ -26,6 +26,7 @@ import org.apache.tika.mime.MediaType;
 import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
 import org.apache.tika.sax.ContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.ParserUtils;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
@@ -37,10 +38,10 @@ import java.util.Set;
 
 /**
  * This is a helper class that wraps a parser in a recursive handler.
- * It takes care of setting the embedded parser in the ParseContext
+ * It takes care of setting the embedded parser in the ParseContext 
  * and handling the embedded path calculations.
  * <p>
- * After parsing a document, call getMetadata() to retrieve a list of
+ * After parsing a document, call getMetadata() to retrieve a list of 
  * Metadata objects, one for each embedded resource.  The first item
  * in the list will contain the Metadata for the outer container file.
  * <p>
@@ -50,18 +51,18 @@ import java.util.Set;
  * <p>
  * If a WriteLimitReachedException is encountered, the wrapper will stop
  * processing the current resource, and it will not process
- * any of the child resources for the given resource.  However, it will try to
- * parse as much as it can.  If a WLRE is reached in the parent document,
+ * any of the child resources for the given resource.  However, it will try to 
+ * parse as much as it can.  If a WLRE is reached in the parent document, 
  * no child resources will be parsed.
  * <p>
  * The implementation is based on Jukka's RecursiveMetadataParser
- * and Nick's additions. See:
+ * and Nick's additions. See: 
  * <a href="http://wiki.apache.org/tika/RecursiveMetadata#Jukka.27s_RecursiveMetadata_Parser">RecursiveMetadataParser</a>.
  * <p>
  * Note that this wrapper holds all data in memory and is not appropriate
  * for files with content too large to be held in memory.
  * <p>
- * Note, too, that this wrapper is not thread safe because it stores state.
+ * Note, too, that this wrapper is not thread safe because it stores state.  
  * The client must initialize a new wrapper for each thread, and the client
  * is responsible for calling {@link #reset()} after each parse.
  * <p>
@@ -69,7 +70,7 @@ import java.util.Set;
  * </p>
  */
 public class RecursiveParserWrapper extends ParserDecorator {
-
+    
     /**
      * Generated serial version
      */
@@ -192,7 +193,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
 
     /**
      * Acts like a regular parser except it ignores the ContentHandler
-     * and it automatically sets/overwrites the embedded Parser in the
+     * and it automatically sets/overwrites the embedded Parser in the 
      * ParseContext object.
      * <p>
      * To retrieve the results of the parse, use {@link #getMetadata()}.
@@ -201,7 +202,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
      */
     @Override
     public void parse(InputStream stream, ContentHandler recursiveParserWrapperHandler,
-                      Metadata metadata, ParseContext context) throws IOException,
+            Metadata metadata, ParseContext context) throws IOException,
             SAXException, TikaException {
         //this tracks the state of the parent parser, per call to #parse
         //in future versions, we can remove lastParseState, and this will be thread-safe
@@ -225,6 +226,12 @@ public class RecursiveParserWrapper extends ParserDecorator {
                 throw e;
             }
             metadata.set(RecursiveParserWrapperHandler.WRITE_LIMIT_REACHED, "true");
+        } catch (Throwable e) {
+            //try our best to record the problem in the metadata object
+            //then rethrow
+            String stackTrace = ExceptionUtils.getFilteredStackTrace(e);
+            metadata.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
+            throw e;
         } finally {
             long elapsedMillis = System.currentTimeMillis() - started;
             metadata.set(RecursiveParserWrapperHandler.PARSE_TIME_MILLIS, Long.toString(elapsedMillis));
@@ -259,7 +266,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
      * Set the maximum number of embedded resources to store.
      * If the max is hit during parsing, the {@link #EMBEDDED_RESOURCE_LIMIT_REACHED}
      * property will be added to the container document's Metadata.
-     *
+     * 
      * <p>
      * If this value is < 0 (the default), the wrapper will store all Metadata.
      * @deprecated set this on a {@link RecursiveParserWrapperHandler}
@@ -269,7 +276,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
     public void setMaxEmbeddedResources(int max) {
         maxEmbeddedResources = max;
     }
-
+    
 
     /**
      * This clears the last parser state (metadata list, unknown count, hit embeddedresource count)
@@ -286,10 +293,10 @@ public class RecursiveParserWrapper extends ParserDecorator {
             throw new IllegalStateException("This is deprecated; please use a RecursiveParserWrapperHandler instead");
         }
     }
-
+    
     /**
-     * Copied/modified from WriteOutContentHandler.  Couldn't make that
-     * static, and we need to have something that will work
+     * Copied/modified from WriteOutContentHandler.  Couldn't make that 
+     * static, and we need to have something that will work 
      * with exceptions thrown from both BodyContentHandler and WriteOutContentHandler
      * @param t
      * @return
@@ -318,27 +325,27 @@ public class RecursiveParserWrapper extends ParserDecorator {
         return objectName;
     }
 
-
+    
     private class EmbeddedParserDecorator extends ParserDecorator {
-
+        
         private static final long serialVersionUID = 207648200464263337L;
-
+        
         private String location = null;
         private final ParserState parserState;
 
-
+        
         private EmbeddedParserDecorator(Parser parser, String location, ParserState parseState) {
             super(parser);
             this.location = location;
             if (! this.location.endsWith("/")) {
-                this.location += "/";
+               this.location += "/";
             }
             this.parserState = parseState;
         }
 
         @Override
         public void parse(InputStream stream, ContentHandler ignore,
-                          Metadata metadata, ParseContext context) throws IOException,
+                Metadata metadata, ParseContext context) throws IOException,
                 SAXException, TikaException {
             //Test to see if we should avoid parsing
             if (parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
@@ -347,7 +354,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
             // Work out what this thing is
             String objectName = getResourceName(metadata, parserState);
             String objectLocation = this.location + objectName;
-
+      
             metadata.add(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH, objectLocation);
 
 
@@ -380,16 +387,9 @@ public class RecursiveParserWrapper extends ParserDecorator {
             } finally {
                 context.set(Parser.class, preContextParser);
                 long elapsedMillis = System.currentTimeMillis() - started;
-                metadata.set(PARSE_TIME_MILLIS, Long.toString(elapsedMillis));
-            }
-
-            //Because of recursion, we need
-            //to re-test to make sure that we limit the
-            //number of stored resources
-            if (parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
-                return;
+                metadata.set(RecursiveParserWrapperHandler.PARSE_TIME_MILLIS, Long.toString(elapsedMillis));
+                parserState.recursiveParserWrapperHandler.endEmbeddedDocument(localHandler, metadata);
             }
-            parserState.recursiveParserWrapperHandler.endEmbeddedDocument(localHandler, metadata);
         }
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
index 11f7ff6..d53f18e 100644
--- a/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
+++ b/tika-core/src/main/java/org/apache/tika/sax/AbstractRecursiveParserWrapperHandler.java
@@ -111,7 +111,7 @@ public abstract class AbstractRecursiveParserWrapperHandler extends DefaultHandl
      * @return whether this handler has hit the maximum embedded resources during the parse
      */
     public boolean hasHitMaximumEmbeddedResources() {
-        if (maxEmbeddedResources > -1 && embeddedResources > maxEmbeddedResources) {
+        if (maxEmbeddedResources > -1 && embeddedResources >= maxEmbeddedResources) {
             return true;
         }
         return false;
diff --git a/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java b/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java
index 2598c99..5e238c3 100644
--- a/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java
+++ b/tika-core/src/main/java/org/apache/tika/utils/ParserUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.utils;
 
+
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.Property;
 import org.apache.tika.metadata.TikaCoreProperties;
@@ -78,13 +79,11 @@ public class ParserUtils {
      *  {@link Exception} wasn't immediately thrown (eg when several different
      *  Parsers are used)
      */
-    public static void recordParserFailure(Parser parser, Exception failure,
+    public static void recordParserFailure(Parser parser, Throwable failure,
                                            Metadata metadata) {
         String trace = ExceptionUtils.getStackTrace(failure);
         metadata.add(EMBEDDED_EXCEPTION, trace);
         metadata.add(EMBEDDED_PARSER, getParserClassname(parser));
     }
 
-
-
-}
+  }
diff --git a/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java b/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java
index 6acd190..ff6f8ef 100644
--- a/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java
+++ b/tika-parsers/src/test/java/org/apache/tika/parser/RecursiveParserWrapperTest.java
@@ -151,7 +151,7 @@ public class RecursiveParserWrapperTest {
         list = wrapper.getMetadata();
 
         //add 1 for outer container file
-        assertEquals(maxEmbedded, list.size());
+        assertEquals(maxEmbedded+1, list.size());
 
         limitReached = list.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED);
         assertEquals("true", limitReached);
@@ -205,9 +205,8 @@ public class RecursiveParserWrapperTest {
                 new BasicContentHandlerFactory(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1), maxEmbedded);
         wrapper.parse(stream, handler, metadata, context);
         list = handler.getMetadataList();
-
         //add 1 for outer container file
-        assertEquals(maxEmbedded, list.size());
+        assertEquals(maxEmbedded+1, list.size());
 
         limitReached = list.get(0).get(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED);
         assertEquals("true", limitReached);
@@ -282,7 +281,7 @@ public class RecursiveParserWrapperTest {
 
         //Composite parser swallows caught TikaExceptions, IOExceptions and SAXExceptions
         //and just doesn't bother to report that there was an exception.
-        assertEquals(12, list.size());
+        assertEquals(13, list.size());
     }
 
     @Test
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
index a832231..0fd8098 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
@@ -28,6 +28,7 @@ import com.google.gson.JsonIOException;
 import com.google.gson.reflect.TypeToken;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
 
 public class JsonMetadataList extends JsonMetadataBase {
     
@@ -70,6 +71,20 @@ public class JsonMetadataList extends JsonMetadataBase {
             //covers both io and parse exceptions
             throw new TikaException(e.getMessage());
         }
+        if (ms == null) {
+            return null;
+        }
+        //if the last object is the main document,
+        //as happens with the streaming serializer,
+        //flip it to be the first element.
+        if (ms.size() > 1) {
+            Metadata last = ms.get(ms.size()-1);
+            String embResourcePath = last.get(RecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH);
+            if (embResourcePath == null &&
+                    ms.get(0).get(RecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH) != null) {
+                ms.add(0, ms.remove(ms.size()-1));
+            }
+        }
         return ms;
     }
 
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonStreamingSerializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonStreamingSerializer.java
new file mode 100644
index 0000000..f01df6f
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonStreamingSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import com.google.gson.stream.JsonWriter;
+import org.apache.tika.metadata.Metadata;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Arrays;
+
+
+public class JsonStreamingSerializer implements AutoCloseable {
+
+    private final JsonWriter jsonWriter;
+    boolean hasStartedArray = false;
+    public JsonStreamingSerializer(Writer writer) {
+        this.jsonWriter = new JsonWriter(writer);
+    }
+
+    public void add(Metadata metadata) throws IOException {
+        if (!hasStartedArray) {
+            jsonWriter.beginArray();
+            hasStartedArray = true;
+        }
+        String[] names = metadata.names();
+        Arrays.sort(names);
+        jsonWriter.beginObject();
+        for (String n : names) {
+            jsonWriter.name(n);
+            String[] values = metadata.getValues(n);
+            if (values.length == 1) {
+                jsonWriter.value(values[0]);
+            } else {
+                jsonWriter.beginArray();
+                for (String v : values) {
+                    jsonWriter.value(v);
+                }
+                jsonWriter.endArray();
+            }
+        }
+        jsonWriter.endObject();
+    }
+
+    @Override
+    public void close() throws IOException {
+        jsonWriter.endArray();
+        jsonWriter.flush();
+        jsonWriter.close();
+    }
+}
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
index 0553f78..427e598 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
@@ -21,12 +21,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
 import org.junit.Test;
 
 public class JsonMetadataListTest {
@@ -57,6 +60,16 @@ public class JsonMetadataListTest {
         JsonMetadataList.toJson(metadataList, writer);
         List<Metadata> deserialized = JsonMetadataList.fromJson(new StringReader(writer.toString()));
         assertEquals(metadataList, deserialized);
+
+        //now test streaming serializer
+        writer = new StringWriter();
+        try(JsonStreamingSerializer streamingSerializer = new JsonStreamingSerializer(writer)) {
+            streamingSerializer.add(m1);
+            streamingSerializer.add(m2);
+        }
+        deserialized = JsonMetadataList.fromJson(new StringReader(writer.toString()));
+        assertEquals(metadataList, deserialized);
+
     }
 
     @Test
@@ -120,4 +133,37 @@ public class JsonMetadataListTest {
         JsonMetadataList.toJson(metadataList, writer);
         assertTrue(writer.toString().startsWith("[{\"tika:content\":\"this is the content\",\"zk1\":[\"v1\",\"v2\","));
     }
+
+    @Test
+    public void testSwitchingOrderOfMainDoc() throws Exception {
+        Metadata m1 = new Metadata();
+        m1.add("k1", "v1");
+        m1.add("k1", "v2");
+        m1.add("k1", "v3");
+        m1.add("k1", "v4");
+        m1.add("k1", "v4");
+        m1.add("k2", "v1");
+        m1.add(RecursiveParserWrapperHandler.EMBEDDED_RESOURCE_PATH, "/embedded-1");
+
+        Metadata m2 = new Metadata();
+        m2.add("k3", "v1");
+        m2.add("k3", "v2");
+        m2.add("k3", "v3");
+        m2.add("k3", "v4");
+        m2.add("k3", "v4");
+        m2.add("k4", "v1");
+
+        List<Metadata> truth = new ArrayList<>();
+        truth.add(m2);
+        truth.add(m1);
+        StringWriter stringWriter = new StringWriter();
+        try(JsonStreamingSerializer serializer = new JsonStreamingSerializer(stringWriter)) {
+            serializer.add(m1);
+            serializer.add(m2);
+        }
+        Reader reader = new StringReader(stringWriter.toString());
+        List<Metadata> deserialized = JsonMetadataList.fromJson(reader);
+        assertEquals(truth, deserialized);
+
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
tallison@apache.org.