You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2024/03/28 13:27:16 UTC

(tika) branch main updated: TIKA-4207: Add handling of embedded bytes to tika-pipes (#1699)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fe731233 TIKA-4207: Add handling of embedded bytes to tika-pipes (#1699)
4fe731233 is described below

commit 4fe7312330c430f357012f8d0ff886a0fb344783
Author: Tim Allison <ta...@apache.org>
AuthorDate: Thu Mar 28 09:27:09 2024 -0400

    TIKA-4207: Add handling of embedded bytes to tika-pipes (#1699)
    
    * TIKA-4207 -- add handling of embedded bytes to tika-pipes
---
 .../java/org/apache/tika/cli/TikaCLIAsyncTest.java |  89 +++++++
 .../test/java/org/apache/tika/cli/TikaCLITest.java |  59 +---
 .../AbstractEmbeddedDocumentBytesHandler.java      |  69 +++++
 .../tika/extractor/BasicEmbeddedBytesSelector.java |  77 ++++++
 .../BasicEmbeddedDocumentBytesHandler.java         |  58 ++++
 .../tika/extractor/EmbeddedBytesSelector.java      |  31 +--
 .../EmbeddedDocumentByteStoreExtractorFactory.java |  36 +--
 .../extractor/EmbeddedDocumentBytesHandler.java    |  32 +--
 .../ParsingEmbeddedDocumentExtractor.java          |  10 +-
 .../apache/tika/extractor/RUnpackExtractor.java    | 183 +++++++++++++
 .../tika/extractor/RUnpackExtractorFactory.java    | 111 ++++++++
 .../org/apache/tika/io/BoundedInputStream.java     |   4 +
 .../apache/tika/metadata/TikaCoreProperties.java   |   4 +
 .../org/apache/tika/parser/AutoDetectParser.java   |  11 +-
 .../apache/tika/parser/AutoDetectParserConfig.java |   4 +-
 .../apache/tika/parser/RecursiveParserWrapper.java |   2 +
 .../java/org/apache/tika/pipes/FetchEmitTuple.java |  52 +++-
 .../java/org/apache/tika/pipes/PipesServer.java    | 296 +++++++++++++++------
 .../extractor/EmbeddedDocumentBytesConfig.java     | 167 ++++++++++++
 .../EmittingEmbeddedDocumentBytesHandler.java      |  73 +++++
 .../tika/parser/AutoDetectParserConfigTest.java    |  72 +++++
 .../org/apache/tika/parser/mock/MockParser.java    |  26 +-
 .../org/apache/tika/pipes/PipesServerTest.java     | 120 ++++++++-
 ...rocessorTest.java => AsyncChaosMonkeyTest.java} |   2 +-
 .../config/TIKA-4207-embedded-bytes-config.xml     |  13 +-
 .../apache/tika/pipes/TIKA-4207-limit-bytes.xml    |  19 +-
 .../resources/org/apache/tika/pipes/TIKA-4207.xml  |  19 +-
 .../apache/tika/parser/microsoft/WMFParser.java    |   3 +-
 .../resources/configs/tika-config-no-names.xml     |   2 +-
 .../resources/configs/tika-config-with-names.xml   |   2 +-
 tika-pipes/tika-async-cli/pom.xml                  |   7 +
 .../apache/tika/async/cli/AsyncProcessorTest.java  | 140 ++++++++++
 .../apache/tika/async/cli/TikaAsyncCLITest.java    |   2 +-
 .../test/resources/configs/TIKA-4207-emitter.xml   |  28 +-
 .../resources/{ => configs}/tika-config-broken.xml |   0
 .../basic_embedded.xml}                            |  29 +-
 tika-pipes/tika-pipes-iterators/pom.xml            |   1 +
 .../tika-pipes-iterator-json}/pom.xml              |  43 ++-
 .../pipesiterator/json/JsonPipesIterator.java      |  65 +++++
 .../pipesiterator/json/TestJsonPipesIterator.java  |  85 ++++++
 .../test-documents/test-with-embedded-bytes.json   | 100 +++++++
 .../src/test/resources/test-documents/test.json    | 100 +++++++
 .../metadata/serialization/JsonFetchEmitTuple.java |  71 ++++-
 .../serialization/JsonFetchEmitTupleTest.java      |  20 ++
 .../tika/server/core/resource/AsyncResource.java   |  32 ++-
 .../apache/tika/server/standard/TikaPipesTest.java |  93 +++++++
 46 files changed, 2137 insertions(+), 325 deletions(-)

diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
new file mode 100644
index 000000000..d9f6d053f
--- /dev/null
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.cli;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TikaCLIAsyncTest extends TikaCLITest {
+
+    private static Path ASYNC_CONFIG;
+    @TempDir
+    private static Path ASYNC_OUTPUT_DIR;
+
+    @BeforeAll
+    public static void setUpClass() throws Exception {
+        ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
+        String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
+                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
+                "</async>" + "<fetchers>" +
+                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
+                "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
+                "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
+                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+                "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
+                "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
+                "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
+                "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
+                "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
+                "</pipesIterator>" + "</properties>";
+        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
+    }
+
+    @Test
+    public void testAsync() throws Exception {
+        String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
+
+        int json = 0;
+        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
+            if (f.getName().endsWith(".json")) {
+                //check one file for pretty print
+                if (f.getName().equals("coffee.xls.json")) {
+                    checkForPrettyPrint(f);
+                }
+                json++;
+            }
+        }
+        assertEquals(17, json);
+    }
+
+    private void checkForPrettyPrint(File f) throws IOException {
+        String json = FileUtils.readFileToString(f, UTF_8);
+        int previous = json.indexOf("Content-Length");
+        assertTrue(previous > -1);
+        for (String k : new String[]{"Content-Type", "dc:creator",
+                "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
+            int i = json.indexOf(k);
+            assertTrue( i > -1, "should have found " + k);
+            assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
+            previous = i;
+        }
+    }
+
+
+}
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
index ebd1d90b9..fa16e124a 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
@@ -30,9 +30,7 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -45,11 +43,8 @@ import org.apache.tika.utils.ProcessUtils;
  */
 public class TikaCLITest {
 
-    private static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
+    static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
 
-    private static Path ASYNC_CONFIG;
-    @TempDir
-    private static Path ASYNC_OUTPUT_DIR;
 
     @TempDir
     private Path extractDir;
@@ -61,24 +56,7 @@ public class TikaCLITest {
     private PrintStream stderr = null;
     private String resourcePrefix;
 
-    @BeforeAll
-    public static void setUpClass() throws Exception {
-        ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
-        String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
-                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
-                "</async>" + "<fetchers>" +
-                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
-                "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
-                "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
-                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
-                "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
-                "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
-                "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
-                "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
-                "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
-                "</pipesIterator>" + "</properties>";
-        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
-    }
+
 
     protected static void assertExtracted(Path p, String allFiles) throws IOException {
 
@@ -582,42 +560,11 @@ public class TikaCLITest {
         assertTrue(content.contains("application/vnd.oasis.opendocument.text-web"));
     }
 
-    @Test
-    public void testAsync() throws Exception {
-        String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
-
-        int json = 0;
-        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
-            if (f.getName().endsWith(".json")) {
-                //check one file for pretty print
-                if (f.getName().equals("coffee.xls.json")) {
-                    checkForPrettyPrint(f);
-                }
-                json++;
-            }
-        }
-        assertEquals(17, json);
-    }
-
-    private void checkForPrettyPrint(File f) throws IOException {
-        String json = FileUtils.readFileToString(f, UTF_8);
-        int previous = json.indexOf("Content-Length");
-        assertTrue(previous > -1);
-        for (String k : new String[]{"Content-Type", "dc:creator",
-                "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
-            int i = json.indexOf(k);
-            assertTrue( i > -1, "should have found " + k);
-            assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
-            previous = i;
-        }
-    }
-
-
     /**
      * reset outContent and errContent if they are not empty
      * run given params in TikaCLI and return outContent String with UTF-8
      */
-    private String getParamOutContent(String... params) throws Exception {
+    String getParamOutContent(String... params) throws Exception {
         resetContent();
         TikaCLI.main(params);
         return outContent.toString("UTF-8");
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentBytesHandler.java b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentBytesHandler.java
new file mode 100644
index 000000000..3f2f38f94
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentBytesHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.tika.io.FilenameUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.utils.StringUtils;
+
+public abstract class AbstractEmbeddedDocumentBytesHandler implements EmbeddedDocumentBytesHandler {
+
+    List<Integer> ids = new ArrayList<>();
+
+    public String getEmitKey(String containerEmitKey, int embeddedId,
+                             EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+                             Metadata metadata) {
+        String embeddedIdString = embeddedDocumentBytesConfig.getZeroPadName() > 0 ?
+                StringUtils.leftPad(Integer.toString(embeddedId),
+                        embeddedDocumentBytesConfig.getZeroPadName(), "0") :
+                Integer.toString(embeddedId);
+
+
+        StringBuilder emitKey = new StringBuilder(containerEmitKey)
+                .append("/")
+                .append(FilenameUtils.getName(containerEmitKey))
+                .append(embeddedDocumentBytesConfig.getEmbeddedIdPrefix())
+                .append(embeddedIdString);
+
+        if (embeddedDocumentBytesConfig.getSuffixStrategy().equals(
+                EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.EXISTING)) {
+            String fName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            String suffix = FilenameUtils.getSuffixFromPath(fName);
+            suffix = suffix.toLowerCase(Locale.US);
+            emitKey.append(suffix);
+        }
+        return emitKey.toString();
+    }
+
+    @Override
+    public void add(int id, Metadata metadata, InputStream bytes) throws IOException {
+        ids.add(id);
+    }
+
+    @Override
+    public List<Integer> getIds() {
+        return ids;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedBytesSelector.java b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedBytesSelector.java
new file mode 100644
index 000000000..1d5a239db
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedBytesSelector.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.util.Set;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.mime.MediaType;
+import org.apache.tika.utils.StringUtils;
+
+public class BasicEmbeddedBytesSelector implements EmbeddedBytesSelector {
+
+
+
+    private final Set<String> includeMimes;
+    private final Set<String> excludeMimes;
+    private final Set<String> includeEmbeddedResourceTypes;
+
+    private final Set<String> excludeEmbeddedResourceTypes;
+
+    public BasicEmbeddedBytesSelector(Set<String> includeMimes, Set<String> excludeMimes,
+                                      Set<String> includeEmbeddedResourceTypes,
+                                      Set<String> excludeEmbeddedResourceTypes) {
+        this.includeMimes = includeMimes;
+        this.excludeMimes = excludeMimes;
+        this.includeEmbeddedResourceTypes = includeEmbeddedResourceTypes;
+        this.excludeEmbeddedResourceTypes = excludeEmbeddedResourceTypes;
+    }
+
+    public boolean select(Metadata metadata) {
+        String mime = metadata.get(Metadata.CONTENT_TYPE);
+        if (mime == null) {
+            mime = "";
+        } else {
+            //if mime matters at all, make sure to get the mime without parameters
+            if (includeMimes.size() > 0 || excludeMimes.size() > 0) {
+                MediaType mt = MediaType.parse(mime);
+                if (mt != null) {
+                    mime = mt.getType() + "/" + mt.getSubtype();
+                }
+            }
+        }
+        if (excludeMimes.contains(mime)) {
+            return false;
+        }
+        if (includeMimes.size() > 0 && ! includeMimes.contains(mime)) {
+            return false;
+        }
+        String embeddedResourceType = metadata.get(TikaCoreProperties.EMBEDDED_RESOURCE_TYPE);
+        //if a parser doesn't specify the type, treat it as ATTACHMENT
+        embeddedResourceType = StringUtils.isBlank(embeddedResourceType) ? "ATTACHMENT" :
+                embeddedResourceType;
+
+        if (excludeEmbeddedResourceTypes.contains(embeddedResourceType)) {
+            return false;
+        }
+        if (includeEmbeddedResourceTypes.size() > 0 && includeEmbeddedResourceTypes.contains(embeddedResourceType)) {
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentBytesHandler.java b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentBytesHandler.java
new file mode 100644
index 000000000..cf6441b4f
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentBytesHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.UnsynchronizedBufferedInputStream;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+
+/**
+ * For now, this is an in-memory EmbeddedDocumentBytesHandler that stores
+ * all the bytes in memory. Users can retrieve the documents with {@link #getDocument(int)}.
+ *
+ * We'll need to make this cache to disk at some point if there are many bytes of
+ * embedded documents.
+ */
+public class BasicEmbeddedDocumentBytesHandler extends AbstractEmbeddedDocumentBytesHandler {
+    private final EmbeddedDocumentBytesConfig config;
+    public BasicEmbeddedDocumentBytesHandler(EmbeddedDocumentBytesConfig config) {
+        this.config = config;
+    }
+    //this won't scale, but let's start fully in memory for now;
+    Map<Integer, byte[]> docBytes = new HashMap<>();
+    @Override
+    public void add(int id, Metadata metadata, InputStream is) throws IOException {
+        super.add(id, metadata, is);
+        docBytes.put(id, IOUtils.toByteArray(is));
+    }
+
+    public InputStream getDocument(int id) throws IOException {
+        return new UnsynchronizedBufferedInputStream.Builder().setByteArray(docBytes.get(id)).get();
+    }
+
+    @Override
+    public void close() throws IOException {
+        //delete tmp dir or whatever here
+    }
+}
diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedBytesSelector.java
similarity index 52%
copy from tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
copy to tika-core/src/main/java/org/apache/tika/extractor/EmbeddedBytesSelector.java
index fc6694c74..2ec7df667 100644
--- a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
+++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedBytesSelector.java
@@ -14,30 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.async.cli;
+package org.apache.tika.extractor;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.tika.metadata.Metadata;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
+public interface EmbeddedBytesSelector {
 
-import org.junit.jupiter.api.Test;
-
-import org.apache.tika.exception.TikaConfigException;
-
-public class TikaAsyncCLITest {
-    @Test
-    public void testCrash() throws Exception {
-        Path config = getPath("/tika-config-broken.xml");
-        assertThrows(TikaConfigException.class,
-                () -> TikaAsyncCLI.main(
-                        new String[] {
-                            config.toAbsolutePath().toString()
-                        })
-        );
+    class AcceptAll implements EmbeddedBytesSelector {
+        @Override
+        public boolean select(Metadata metadata) {
+            return true;
+        }
     }
+    EmbeddedBytesSelector ACCEPT_ALL = new AcceptAll();
 
-    private Path getPath(String file) throws Exception {
-        return Paths.get(this.getClass().getResource(file).toURI());
-    }
+    boolean select(Metadata metadata);
 }
diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStoreExtractorFactory.java
similarity index 51%
copy from tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
copy to tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStoreExtractorFactory.java
index fc6694c74..f7237bd6a 100644
--- a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
+++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStoreExtractorFactory.java
@@ -14,30 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.async.cli;
+package org.apache.tika.extractor;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import org.junit.jupiter.api.Test;
-
-import org.apache.tika.exception.TikaConfigException;
-
-public class TikaAsyncCLITest {
-    @Test
-    public void testCrash() throws Exception {
-        Path config = getPath("/tika-config-broken.xml");
-        assertThrows(TikaConfigException.class,
-                () -> TikaAsyncCLI.main(
-                        new String[] {
-                            config.toAbsolutePath().toString()
-                        })
-        );
-    }
+/**
+ * This factory creates EmbeddedDocumentExtractors that require an
+ * {@link EmbeddedDocumentBytesHandler} in the
+ * {@link org.apache.tika.parser.ParseContext} should extend this.
+ *
+ * This is a shim interface to signal to {@link org.apache.tika.pipes.PipesServer}
+ * to use the {@link @RUnpackExtractor} if the user doesn't configure a custom
+ * EmbeddedDocumentExtractor.
+ *
+ * TODO: Figure out how to simplify this and allow for emitting of the source document.
+ */
+public interface EmbeddedDocumentByteStoreExtractorFactory extends EmbeddedDocumentExtractorFactory {
 
-    private Path getPath(String file) throws Exception {
-        return Paths.get(this.getClass().getResource(file).toURI());
-    }
 }
diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentBytesHandler.java
similarity index 51%
copy from tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
copy to tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentBytesHandler.java
index fc6694c74..12357a718 100644
--- a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
+++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentBytesHandler.java
@@ -14,30 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.async.cli;
+package org.apache.tika.extractor;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import org.apache.tika.metadata.Metadata;
 
-import org.junit.jupiter.api.Test;
+public interface EmbeddedDocumentBytesHandler extends Closeable {
+    //we need metadata for the emitter store...can we get away without it?
+    void add(int id, Metadata metadata, InputStream inputStream) throws IOException;
 
-import org.apache.tika.exception.TikaConfigException;
-
-public class TikaAsyncCLITest {
-    @Test
-    public void testCrash() throws Exception {
-        Path config = getPath("/tika-config-broken.xml");
-        assertThrows(TikaConfigException.class,
-                () -> TikaAsyncCLI.main(
-                        new String[] {
-                            config.toAbsolutePath().toString()
-                        })
-        );
-    }
-
-    private Path getPath(String file) throws Exception {
-        return Paths.get(this.getClass().getResource(file).toURI());
-    }
+    List<Integer> getIds();
 }
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
index d1b25f17c..edcb78ff1 100644
--- a/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingEmbeddedDocumentExtractor.java
@@ -56,7 +56,7 @@ public class ParsingEmbeddedDocumentExtractor implements EmbeddedDocumentExtract
 
     private boolean writeFileNameToContent = true;
 
-    private final ParseContext context;
+    protected final ParseContext context;
 
     public ParsingEmbeddedDocumentExtractor(ParseContext context) {
         this.context = context;
@@ -99,7 +99,7 @@ public class ParsingEmbeddedDocumentExtractor implements EmbeddedDocumentExtract
         // Use the delegate parser to parse this entry
         try (TemporaryResources tmp = new TemporaryResources()) {
             final TikaInputStream newStream =
-                    TikaInputStream.get(CloseShieldInputStream.wrap(stream), tmp, metadata);
+                    TikaInputStream.get(new CloseShieldInputStream(stream), tmp, metadata);
             if (stream instanceof TikaInputStream) {
                 final Object container = ((TikaInputStream) stream).getOpenContainer();
                 if (container != null) {
@@ -123,7 +123,7 @@ public class ParsingEmbeddedDocumentExtractor implements EmbeddedDocumentExtract
         }
     }
 
-    private void recordException(Exception e, ParseContext context) {
+    void recordException(Exception e, ParseContext context) {
         ParseRecord record = context.get(ParseRecord.class);
         if (record == null) {
             return;
@@ -138,4 +138,8 @@ public class ParsingEmbeddedDocumentExtractor implements EmbeddedDocumentExtract
     public void setWriteFileNameToContent(boolean writeFileNameToContent) {
         this.writeFileNameToContent = writeFileNameToContent;
     }
+
+    public boolean isWriteFileNameToContent() {
+        return writeFileNameToContent;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/RUnpackExtractor.java b/tika-core/src/main/java/org/apache/tika/extractor/RUnpackExtractor.java
new file mode 100644
index 000000000..76b297dd7
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/RUnpackExtractor.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import static org.apache.tika.sax.XHTMLContentHandler.XHTML;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.input.CloseShieldInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.AttributesImpl;
+
+import org.apache.tika.exception.CorruptedFileException;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.BoundedInputStream;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.sax.BodyContentHandler;
+import org.apache.tika.sax.EmbeddedContentHandler;
+
+/**
+ * Recursive Unpacker and text and metadata extractor.
+ *
+ * @since Apache Tika 3.0.0
+ */
+public class RUnpackExtractor extends ParsingEmbeddedDocumentExtractor {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(ParsingEmbeddedDocumentExtractor.class);
+
+    private static final File ABSTRACT_PATH = new File("");
+
+    private EmbeddedBytesSelector embeddedBytesSelector = EmbeddedBytesSelector.ACCEPT_ALL;
+
+    private long bytesExtracted = 0;
+    private final long maxEmbeddedBytesForExtraction;
+
+    public RUnpackExtractor(ParseContext context, long maxEmbeddedBytesForExtraction) {
+        super(context);
+        this.maxEmbeddedBytesForExtraction = maxEmbeddedBytesForExtraction;
+    }
+
+
+    @Override
+    public void parseEmbedded(
+            InputStream stream, ContentHandler handler, Metadata metadata, boolean outputHtml)
+            throws SAXException, IOException {
+        if (outputHtml) {
+            AttributesImpl attributes = new AttributesImpl();
+            attributes.addAttribute("", "class", "class", "CDATA", "package-entry");
+            handler.startElement(XHTML, "div", "div", attributes);
+        }
+
+        String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+        if (isWriteFileNameToContent() && name != null && name.length() > 0 && outputHtml) {
+            handler.startElement(XHTML, "h1", "h1", new AttributesImpl());
+            char[] chars = name.toCharArray();
+            handler.characters(chars, 0, chars.length);
+            handler.endElement(XHTML, "h1", "h1");
+        }
+
+        // Use the delegate parser to parse this entry
+        try (TemporaryResources tmp = new TemporaryResources()) {
+            final TikaInputStream newStream =
+                    TikaInputStream.get(CloseShieldInputStream.wrap(stream), tmp, metadata);
+            if (stream instanceof TikaInputStream) {
+                final Object container = ((TikaInputStream) stream).getOpenContainer();
+                if (container != null) {
+                    newStream.setOpenContainer(container);
+                }
+            }
+            EmbeddedDocumentBytesHandler bytesHandler = context.get(EmbeddedDocumentBytesHandler.class);
+            if (bytesHandler != null) {
+                parseWithBytes(newStream, handler, metadata);
+            } else {
+                parse(newStream, handler, metadata);
+            }
+        } catch (EncryptedDocumentException ede) {
+            recordException(ede, context);
+        } catch (CorruptedFileException e) {
+            //necessary to stop the parse to avoid infinite loops
+            //on corrupt sqlite3 files
+            throw new IOException(e);
+        } catch (TikaException e) {
+            recordException(e, context);
+        }
+
+        if (outputHtml) {
+            handler.endElement(XHTML, "div", "div");
+        }
+    }
+
+    private void parseWithBytes(TikaInputStream stream, ContentHandler handler, Metadata metadata)
+            throws TikaException, IOException, SAXException {
+        //TODO -- improve the efficiency of this so that we're not
+        //literally writing out a file per request
+        Path p = stream.getPath();
+        try {
+            parse(stream, handler, metadata);
+        } finally {
+            storeEmbeddedBytes(p, metadata);
+        }
+    }
+
+    private void parse(TikaInputStream stream, ContentHandler handler, Metadata metadata)
+            throws TikaException, IOException, SAXException {
+        getDelegatingParser().parse(stream,
+                new EmbeddedContentHandler(new BodyContentHandler(handler)),
+                metadata, context);
+    }
+
+    private void storeEmbeddedBytes(Path p, Metadata metadata) {
+        if (! embeddedBytesSelector.select(metadata)) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("skipping embedded bytes {} <-> {}",
+                        metadata.get(Metadata.CONTENT_TYPE),
+                        metadata.get(TikaCoreProperties.EMBEDDED_RESOURCE_TYPE));
+            }
+            return;
+        }
+        EmbeddedDocumentBytesHandler embeddedDocumentBytesHandler =
+                context.get(EmbeddedDocumentBytesHandler.class);
+        int id = metadata.getInt(TikaCoreProperties.EMBEDDED_ID);
+        try (InputStream is = Files.newInputStream(p)) {
+            if (bytesExtracted >= maxEmbeddedBytesForExtraction) {
+                throw new IOException("Bytes extracted (" + bytesExtracted +
+                        ") >= max allowed (" + maxEmbeddedBytesForExtraction + ")");
+            }
+            long maxToRead = maxEmbeddedBytesForExtraction - bytesExtracted;
+
+            try (BoundedInputStream boundedIs = new BoundedInputStream(maxToRead, is)) {
+                embeddedDocumentBytesHandler.add(id, metadata, boundedIs);
+                bytesExtracted += boundedIs.getPos();
+                if (boundedIs.hasHitBound()) {
+                    throw new IOException("Bytes extracted (" + bytesExtracted +
+                            ") >= max allowed (" + maxEmbeddedBytesForExtraction + "). Truncated " +
+                            "bytes");
+                }
+            }
+        } catch (IOException e) {
+            LOGGER.warn("problem writing out embedded bytes", e);
+            //info in metadata doesn't actually make it back to the metadata list
+            //because we're filtering and cloning the metadata at the end of the parse
+            //which happens before we try to copy out the files.
+            //TODO fix this
+            //metadata.set(TikaCoreProperties.EMBEDDED_BYTES_EXCEPTION,
+              //      ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    public void setEmbeddedBytesSelector(EmbeddedBytesSelector embeddedBytesSelector) {
+        this.embeddedBytesSelector = embeddedBytesSelector;
+    }
+
+    public EmbeddedBytesSelector getEmbeddedBytesSelector() {
+        return embeddedBytesSelector;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/RUnpackExtractorFactory.java b/tika-core/src/main/java/org/apache/tika/extractor/RUnpackExtractorFactory.java
new file mode 100644
index 000000000..a715ed25f
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/RUnpackExtractorFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+public class RUnpackExtractorFactory implements EmbeddedDocumentByteStoreExtractorFactory {
+
+    public static long DEFAULT_MAX_EMBEDDED_BYTES_FOR_EXTRACTION = 10l * 1024l * 1024l * 1024l;
+
+    private boolean writeFileNameToContent = true;
+    private Set<String> embeddedBytesIncludeMimeTypes = Collections.EMPTY_SET;
+    private Set<String> embeddedBytesExcludeMimeTypes = Collections.EMPTY_SET;
+    private Set<String> embeddedBytesIncludeEmbeddedResourceTypes = Collections.EMPTY_SET;
+    private Set<String> embeddedBytesExcludeEmbeddedResourceTypes = Collections.EMPTY_SET;
+
+    private long maxEmbeddedBytesForExtraction = DEFAULT_MAX_EMBEDDED_BYTES_FOR_EXTRACTION;
+    @Field
+    public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+        this.writeFileNameToContent = writeFileNameToContent;
+    }
+
+    @Field
+    public void setEmbeddedBytesIncludeMimeTypes(List<String> includeMimeTypes) {
+        embeddedBytesIncludeMimeTypes = new HashSet<>();
+        embeddedBytesIncludeMimeTypes.addAll(includeMimeTypes);
+    }
+
+    @Field
+    public void setEmbeddedBytesExcludeMimeTypes(List<String> excludeMimeTypes) {
+        embeddedBytesExcludeMimeTypes = new HashSet<>();
+        embeddedBytesExcludeMimeTypes.addAll(excludeMimeTypes);
+
+    }
+
+    @Field
+    public void setEmbeddedBytesIncludeEmbeddedResourceTypes(List<String> includeAttachmentTypes) {
+        embeddedBytesIncludeEmbeddedResourceTypes = new HashSet<>();
+        embeddedBytesIncludeEmbeddedResourceTypes.addAll(includeAttachmentTypes);
+
+    }
+
+    @Field
+    public void setEmbeddedBytesExcludeEmbeddedResourceTypes(List<String> excludeAttachmentTypes) {
+        embeddedBytesExcludeEmbeddedResourceTypes = new HashSet<>();
+        embeddedBytesExcludeEmbeddedResourceTypes.addAll(excludeAttachmentTypes);
+
+    }
+
+    /**
+     * Total number of bytes to write out. A good zip bomb may contain petabytes
+     * compressed into a few kb. Make sure that you can't fill up a disk!
+     *
+     * This does not include the container file in the count of bytes written out.
+     * This only counts the lengths of the embedded files.
+     *
+     * @param maxEmbeddedBytesForExtraction
+     */
+    @Field
+    public void setMaxEmbeddedBytesForExtraction(long maxEmbeddedBytesForExtraction) throws TikaConfigException {
+        if (maxEmbeddedBytesForExtraction < 0) {
+            throw new TikaConfigException("maxEmbeddedBytesForExtraction must be >= 0");
+        }
+        this.maxEmbeddedBytesForExtraction = maxEmbeddedBytesForExtraction;
+    }
+
+    @Override
+    public EmbeddedDocumentExtractor newInstance(Metadata metadata, ParseContext parseContext) {
+        RUnpackExtractor ex =
+                new RUnpackExtractor(parseContext,
+                        maxEmbeddedBytesForExtraction);
+        ex.setWriteFileNameToContent(writeFileNameToContent);
+        ex.setEmbeddedBytesSelector(createEmbeddedBytesSelector());
+        return ex;
+    }
+
+
+    private EmbeddedBytesSelector createEmbeddedBytesSelector() {
+        if (embeddedBytesIncludeMimeTypes.size() == 0 &&
+                embeddedBytesExcludeMimeTypes.size() == 0 &&
+                embeddedBytesIncludeEmbeddedResourceTypes.size() == 0 &&
+                embeddedBytesExcludeEmbeddedResourceTypes.size() == 0) {
+            return EmbeddedBytesSelector.ACCEPT_ALL;
+        }
+        return new BasicEmbeddedBytesSelector(embeddedBytesIncludeMimeTypes,
+                embeddedBytesExcludeMimeTypes, embeddedBytesIncludeEmbeddedResourceTypes,
+                embeddedBytesExcludeEmbeddedResourceTypes);
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/io/BoundedInputStream.java b/tika-core/src/main/java/org/apache/tika/io/BoundedInputStream.java
index a80009cd2..31290cc1a 100644
--- a/tika-core/src/main/java/org/apache/tika/io/BoundedInputStream.java
+++ b/tika-core/src/main/java/org/apache/tika/io/BoundedInputStream.java
@@ -147,4 +147,8 @@ public class BoundedInputStream extends InputStream {
     public long transferTo(OutputStream out) throws IOException {
         return in.transferTo(out);
     }
+
+    public long getPos() {
+        return pos;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/metadata/TikaCoreProperties.java b/tika-core/src/main/java/org/apache/tika/metadata/TikaCoreProperties.java
index 6ff02c1cf..effa4a667 100644
--- a/tika-core/src/main/java/org/apache/tika/metadata/TikaCoreProperties.java
+++ b/tika-core/src/main/java/org/apache/tika/metadata/TikaCoreProperties.java
@@ -98,6 +98,10 @@ public interface TikaCoreProperties {
     Property EMBEDDED_EXCEPTION =
             Property.internalTextBag(TIKA_META_EXCEPTION_PREFIX + "embedded_exception");
 
+    //exception handling the raw bytes of an embedded file by an EmbeddedDocumentByteStore
+    Property EMBEDDED_BYTES_EXCEPTION =
+            Property.internalTextBag(TIKA_META_EXCEPTION_PREFIX + "embedded_bytes_exception");
+
     //warning while parsing in an embedded file
     Property EMBEDDED_WARNING =
             Property.internalTextBag(TIKA_META_EXCEPTION_PREFIX + "embedded_warning");
diff --git a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
index d333c2e9a..86eae692a 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
@@ -28,6 +28,8 @@ import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.exception.ZeroByteFileException;
 import org.apache.tika.extractor.EmbeddedDocumentExtractor;
+import org.apache.tika.extractor.EmbeddedDocumentExtractorFactory;
+import org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.HttpHeaders;
@@ -197,7 +199,6 @@ public class AutoDetectParser extends CompositeParser {
                     createSecureContentHandler(handler, tis, autoDetectParserConfig) : null;
 
             initializeEmbeddedDocumentExtractor(metadata, context);
-
             try {
                 // Parse the document
                 super.parse(tis, sch, metadata, context);
@@ -267,8 +268,12 @@ public class AutoDetectParser extends CompositeParser {
         if (p == null) {
             context.set(Parser.class, this);
         }
-        EmbeddedDocumentExtractor edx = autoDetectParserConfig.getEmbeddedDocumentExtractorFactory()
-                .newInstance(metadata, context);
+        EmbeddedDocumentExtractorFactory edxf =
+                autoDetectParserConfig.getEmbeddedDocumentExtractorFactory();
+        if (edxf == null) {
+            edxf = new ParsingEmbeddedDocumentExtractorFactory();
+        }
+        EmbeddedDocumentExtractor edx = edxf.newInstance(metadata, context);
         context.set(EmbeddedDocumentExtractor.class, edx);
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParserConfig.java b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParserConfig.java
index bc4904367..afe65b07e 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParserConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParserConfig.java
@@ -25,7 +25,6 @@ import org.xml.sax.ContentHandler;
 import org.apache.tika.config.ConfigBase;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.extractor.EmbeddedDocumentExtractorFactory;
-import org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.writefilter.MetadataWriteFilterFactory;
 import org.apache.tika.sax.ContentHandlerDecoratorFactory;
@@ -87,8 +86,7 @@ public class AutoDetectParserConfig extends ConfigBase implements Serializable {
 
     private MetadataWriteFilterFactory metadataWriteFilterFactory = null;
 
-    private EmbeddedDocumentExtractorFactory embeddedDocumentExtractorFactory =
-            new ParsingEmbeddedDocumentExtractorFactory();
+    private EmbeddedDocumentExtractorFactory embeddedDocumentExtractorFactory = null;
 
     private ContentHandlerDecoratorFactory contentHandlerDecoratorFactory =
             NOOP_CONTENT_HANDLER_DECORATOR_FACTORY;
diff --git a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
index e8f029770..3cb78d520 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/RecursiveParserWrapper.java
@@ -223,6 +223,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
         @Override
         public void parse(InputStream stream, ContentHandler ignore, Metadata metadata,
                           ParseContext context) throws IOException, SAXException, TikaException {
+
             //Test to see if we should avoid parsing
             if (parserState.recursiveParserWrapperHandler.hasHitMaximumEmbeddedResources()) {
                 return;
@@ -255,6 +256,7 @@ public class RecursiveParserWrapper extends ParserDecorator {
             //so that you can return it back to its state at the end of this parse
             ContentHandler preContextHandler = secureContentHandler.handler;
             secureContentHandler.updateContentHandler(localHandler);
+
             try {
                 super.parse(stream, secureContentHandler, metadata, context);
             } catch (SAXException e) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 3a8ec2bdd..0c0334fd4 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -21,6 +21,7 @@ import java.util.Objects;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
 public class FetchEmitTuple implements Serializable {
@@ -38,6 +39,7 @@ public class FetchEmitTuple implements Serializable {
     private final ON_PARSE_EXCEPTION onParseException;
     private HandlerConfig handlerConfig;
 
+    private EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) {
         this(id, fetchKey, emitKey, new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
@@ -55,12 +57,20 @@ public class FetchEmitTuple implements Serializable {
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
                           HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException) {
+        this(id, fetchKey, emitKey, metadata, handlerConfig, onParseException,
+                EmbeddedDocumentBytesConfig.SKIP);
+    }
+
+    public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
+                          HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException,
+                          EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig) {
         this.id = id;
         this.fetchKey = fetchKey;
         this.emitKey = emitKey;
         this.metadata = metadata;
         this.handlerConfig = handlerConfig;
         this.onParseException = onParseException;
+        this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
     }
 
     public String getId() {
@@ -94,21 +104,40 @@ public class FetchEmitTuple implements Serializable {
         return handlerConfig == null ? HandlerConfig.DEFAULT_HANDLER_CONFIG : handlerConfig;
     }
 
+    public EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig() {
+        return embeddedDocumentBytesConfig;
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         FetchEmitTuple that = (FetchEmitTuple) o;
 
-        if (!Objects.equals(id, that.id)) return false;
-        if (!Objects.equals(fetchKey, that.fetchKey))
+        if (!Objects.equals(id, that.id)) {
+            return false;
+        }
+        if (!Objects.equals(fetchKey, that.fetchKey)) {
+            return false;
+        }
+        if (!Objects.equals(emitKey, that.emitKey)) {
+            return false;
+        }
+        if (!Objects.equals(metadata, that.metadata)) {
+            return false;
+        }
+        if (onParseException != that.onParseException) {
             return false;
-        if (!Objects.equals(emitKey, that.emitKey)) return false;
-        if (!Objects.equals(metadata, that.metadata))
+        }
+        if (!Objects.equals(handlerConfig, that.handlerConfig)) {
             return false;
-        if (onParseException != that.onParseException) return false;
-        return Objects.equals(handlerConfig, that.handlerConfig);
+        }
+        return Objects.equals(embeddedDocumentBytesConfig, that.embeddedDocumentBytesConfig);
     }
 
     @Override
@@ -119,13 +148,16 @@ public class FetchEmitTuple implements Serializable {
         result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
         result = 31 * result + (onParseException != null ? onParseException.hashCode() : 0);
         result = 31 * result + (handlerConfig != null ? handlerConfig.hashCode() : 0);
+        result = 31 * result +
+                (embeddedDocumentBytesConfig != null ? embeddedDocumentBytesConfig.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
         return "FetchEmitTuple{" + "id='" + id + '\'' + ", fetchKey=" + fetchKey + ", emitKey=" +
-            emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
-            ", handlerConfig=" + handlerConfig + '}';
+                emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
+                ", handlerConfig=" + handlerConfig + ", embeddedDocumentBytesConfig=" +
+                embeddedDocumentBytesConfig + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index ed1e5bb5e..d8957368d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes;
 
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -24,10 +25,12 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
@@ -40,8 +43,16 @@ import org.xml.sax.SAXException;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.BasicEmbeddedDocumentBytesHandler;
 import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.extractor.EmbeddedDocumentByteStoreExtractorFactory;
+import org.apache.tika.extractor.EmbeddedDocumentBytesHandler;
+import org.apache.tika.extractor.EmbeddedDocumentExtractor;
+import org.apache.tika.extractor.EmbeddedDocumentExtractorFactory;
+import org.apache.tika.extractor.RUnpackExtractor;
+import org.apache.tika.extractor.RUnpackExtractorFactory;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
@@ -56,7 +67,9 @@ import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.StreamEmitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.pipes.extractor.EmittingEmbeddedDocumentBytesHandler;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
@@ -70,7 +83,7 @@ import org.apache.tika.utils.StringUtils;
 /**
  * This server is forked from the PipesClient.  This class isolates
  * parsing from the client to protect the primary JVM.
- *
+ * <p>
  * When configuring logging for this class, make absolutely certain
  * not to write to STDOUT.  This class uses STDOUT to communicate with
  * the PipesClient.
@@ -87,22 +100,9 @@ public class PipesServer implements Runnable {
     private Detector detector;
 
     public enum STATUS {
-        READY,
-        CALL,
-        PING,
-        FAILED_TO_START,
-        FETCHER_NOT_FOUND,
-        EMITTER_NOT_FOUND,
-        FETCHER_INITIALIZATION_EXCEPTION,
-        FETCH_EXCEPTION,
-        PARSE_SUCCESS,
-        PARSE_EXCEPTION_NO_EMIT,
-        EMIT_SUCCESS,
-        EMIT_SUCCESS_PARSE_EXCEPTION,
-        EMIT_EXCEPTION,
-        OOM,
-        TIMEOUT,
-        EMPTY_OUTPUT,
+        READY, CALL, PING, FAILED_TO_START, FETCHER_NOT_FOUND, EMITTER_NOT_FOUND,
+        FETCHER_INITIALIZATION_EXCEPTION, FETCH_EXCEPTION, PARSE_SUCCESS, PARSE_EXCEPTION_NO_EMIT,
+        EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, OOM, TIMEOUT, EMPTY_OUTPUT,
         INTERMEDIATE_RESULT;
 
         byte getByte() {
@@ -117,8 +117,8 @@ public class PipesServer implements Runnable {
             STATUS[] statuses = STATUS.values();
 
             if (i >= statuses.length) {
-                throw new IllegalArgumentException("byte with index " +
-                        i + " must be < " + statuses.length);
+                throw new IllegalArgumentException(
+                        "byte with index " + i + " must be < " + statuses.length);
             }
             return statuses[i];
         }
@@ -145,8 +145,8 @@ public class PipesServer implements Runnable {
 
 
     public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
-                       long maxForEmitBatchBytes,
-                       long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
+                       long maxForEmitBatchBytes, long serverParseTimeoutMillis,
+                       long serverWaitTimeoutMillis)
             throws IOException, TikaException, SAXException {
         this.tikaConfigPath = tikaConfigPath;
         this.input = new DataInputStream(in);
@@ -188,7 +188,8 @@ public class PipesServer implements Runnable {
                 synchronized (lock) {
                     long elapsed = System.currentTimeMillis() - since;
                     if (parsing && elapsed > serverParseTimeoutMillis) {
-                        LOG.warn("timeout server; elapsed {}  with {}", elapsed, serverParseTimeoutMillis);
+                        LOG.warn("timeout server; elapsed {}  with {}", elapsed,
+                                serverParseTimeoutMillis);
                         exit(TIMEOUT_EXIT_CODE);
                     } else if (!parsing && serverWaitTimeoutMillis > 0 &&
                             elapsed > serverWaitTimeoutMillis) {
@@ -264,12 +265,13 @@ public class PipesServer implements Runnable {
     /**
      * returns stack trace if there was a container exception or empty string
      * if there was no stacktrace
+     *
      * @param t
      * @param metadataList
      * @return
      */
     private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataList) {
-        if (metadataList == null || metadataList.size() < 1) {
+        if (metadataIsEmpty(metadataList)) {
             return StringUtils.EMPTY;
         }
         String stack = metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
@@ -277,11 +279,13 @@ public class PipesServer implements Runnable {
     }
 
 
-    private void emit(String taskId, EmitData emitData, String parseExceptionStack) {
+    private void emit(String taskId, EmitKey emitKey,
+                      boolean isExtractEmbeddedBytes, MetadataListAndEmbeddedBytes parseData,
+                      String parseExceptionStack) {
         Emitter emitter = null;
 
         try {
-            emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+            emitter = emitterManager.getEmitter(emitKey.getEmitterName());
         } catch (IllegalArgumentException e) {
             String noEmitterMsg = getNoEmitterMsg(taskId);
             LOG.warn(noEmitterMsg);
@@ -289,7 +293,12 @@ public class PipesServer implements Runnable {
             return;
         }
         try {
-            emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList());
+            if (isExtractEmbeddedBytes &&
+                    parseData.toBePackagedForStreamEmitter()) {
+                emitContentsAndBytes(emitter, emitKey, parseData);
+            } else {
+                emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList());
+            }
         } catch (IOException | TikaEmitterException e) {
             LOG.warn("emit exception", e);
             String msg = ExceptionUtils.getStackTrace(e);
@@ -306,6 +315,16 @@ public class PipesServer implements Runnable {
         }
     }
 
+    private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
+                                      MetadataListAndEmbeddedBytes parseData) {
+        if (!(emitter instanceof StreamEmitter)) {
+            throw new IllegalArgumentException("The emitter for embedded document byte store must" +
+                    " be a StreamEmitter. I see: " + emitter.getClass());
+        }
+        //TODO: implement this
+        throw new UnsupportedOperationException("this is not yet implemented");
+    }
+
     private void parseOne() {
         synchronized (lock) {
             parsing = true;
@@ -316,7 +335,8 @@ public class PipesServer implements Runnable {
             long start = System.currentTimeMillis();
             t = readFetchEmitTuple();
             if (LOG.isTraceEnabled()) {
-                LOG.trace("timer -- read fetchEmitTuple: {} ms", System.currentTimeMillis() - start);
+                LOG.trace("timer -- read fetchEmitTuple: {} ms",
+                        System.currentTimeMillis() - start);
             }
             start = System.currentTimeMillis();
             actuallyParse(t);
@@ -348,43 +368,62 @@ public class PipesServer implements Runnable {
         }
 
         start = System.currentTimeMillis();
-        List<Metadata> metadataList = parseIt(t, fetcher);
+        MetadataListAndEmbeddedBytes parseData = null;
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
-        }
+        try {
+            //this can be null if there is a fetch exception
+            parseData = parseFromTuple(t, fetcher);
 
-        if (metadataIsEmpty(metadataList)) {
-            write(STATUS.EMPTY_OUTPUT);
-            return;
-        }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
+            }
+
+            if (metadataIsEmpty(parseData.getMetadataList())) {
+                write(STATUS.EMPTY_OUTPUT);
+                return;
+            }
 
-        emitIt(t, metadataList);
+            emitParseData(t, parseData);
+        } finally {
+            if (parseData != null && parseData.hasEmbeddedDocumentByteStore() &&
+                    parseData.getEmbeddedDocumentBytesHandler() instanceof Closeable) {
+                try {
+                    ((Closeable) parseData.getEmbeddedDocumentBytesHandler()).close();
+                } catch (IOException e) {
+                    LOG.warn("problem closing embedded document byte store", e);
+                }
+            }
+        }
     }
 
-    private void emitIt(FetchEmitTuple t, List<Metadata> metadataList) {
+    private void emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseData) {
         long start = System.currentTimeMillis();
-        String stack = getContainerStacktrace(t, metadataList);
+        String stack = getContainerStacktrace(t, parseData.getMetadataList());
         //we need to apply this after we pull out the stacktrace
-        filterMetadata(metadataList);
-        if (StringUtils.isBlank(stack) || t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
-            injectUserMetadata(t.getMetadata(), metadataList);
+        filterMetadata(parseData.getMetadataList());
+        if (StringUtils.isBlank(stack) ||
+                t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
+            injectUserMetadata(t.getMetadata(), parseData.getMetadataList());
             EmitKey emitKey = t.getEmitKey();
             if (StringUtils.isBlank(emitKey.getEmitKey())) {
                 emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
                 t.setEmitKey(emitKey);
             }
-            EmitData emitData = new EmitData(t.getEmitKey(), metadataList, stack);
-            if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
-                emit(t.getId(), emitData, stack);
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
-                }
+            EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack);
+            if (t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes() &&
+                    parseData.toBePackagedForStreamEmitter()) {
+                emit(t.getId(), emitKey, t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes(),
+                        parseData, stack);
+            } else if (maxForEmitBatchBytes >= 0 &&
+                    emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
+                emit(t.getId(), emitKey, t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes(),
+                        parseData, stack);
             } else {
+                //send back to the client
                 write(emitData);
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("timer -- to write data: {} ms", System.currentTimeMillis() - start);
-                }
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
             }
         } else {
             write(STATUS.PARSE_EXCEPTION_NO_EMIT, stack);
@@ -410,25 +449,23 @@ public class PipesServer implements Runnable {
             write(STATUS.FETCHER_NOT_FOUND, noFetcherMsg);
             return null;
         } catch (IOException | TikaException e) {
-            LOG.warn("Couldn't initialize fetcher for fetch id '" +
-                    t.getId() + "'", e);
-            write(STATUS.FETCHER_INITIALIZATION_EXCEPTION,
-                    ExceptionUtils.getStackTrace(e));
+            LOG.warn("Couldn't initialize fetcher for fetch id '" + t.getId() + "'", e);
+            write(STATUS.FETCHER_INITIALIZATION_EXCEPTION, ExceptionUtils.getStackTrace(e));
             return null;
         }
     }
 
-    protected List<Metadata> parseIt(FetchEmitTuple t, Fetcher fetcher) {
+    protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
         FetchKey fetchKey = t.getFetchKey();
         if (fetchKey.hasRange()) {
-            if (! (fetcher instanceof RangeFetcher)) {
+            if (!(fetcher instanceof RangeFetcher)) {
                 throw new IllegalArgumentException(
                         "fetch key has a range, but the fetcher is not a range fetcher");
             }
             Metadata metadata = new Metadata();
-            try (InputStream stream = ((RangeFetcher)fetcher).fetch(fetchKey.getFetchKey(),
+            try (InputStream stream = ((RangeFetcher) fetcher).fetch(fetchKey.getFetchKey(),
                     fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
-                return parse(t, stream, metadata);
+                return parseWithStream(t, stream, metadata);
             } catch (SecurityException e) {
                 LOG.error("security exception " + t.getId(), e);
                 throw e;
@@ -439,7 +476,7 @@ public class PipesServer implements Runnable {
         } else {
             Metadata metadata = new Metadata();
             try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
-                return parse(t, stream, metadata);
+                return parseWithStream(t, stream, metadata);
             } catch (SecurityException e) {
                 LOG.error("security exception " + t.getId(), e);
                 throw e;
@@ -488,20 +525,61 @@ public class PipesServer implements Runnable {
         exit(1);
     }
 
-    private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream,
-                                 Metadata metadata) {
+    private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTuple,
+                                                         InputStream stream, Metadata metadata)
+            throws TikaConfigException {
         HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
+        List<Metadata> metadataList;
+        //this adds the EmbeddedDocumentByteStore to the parsecontext
+        ParseContext parseContext = createParseContext(fetchEmitTuple);
         if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
-            return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata);
+            metadataList =
+                    parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
+        } else {
+            metadataList = parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata,
+                    parseContext);
+        }
+
+        return new MetadataListAndEmbeddedBytes(metadataList,
+                parseContext.get(EmbeddedDocumentBytesHandler.class));
+    }
+
+    private ParseContext createParseContext(FetchEmitTuple fetchEmitTuple)
+            throws TikaConfigException {
+        ParseContext parseContext = new ParseContext();
+        if (! fetchEmitTuple.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes()) {
+            return parseContext;
+        }
+        EmbeddedDocumentExtractorFactory factory = ((AutoDetectParser)autoDetectParser)
+                .getAutoDetectParserConfig().getEmbeddedDocumentExtractorFactory();
+        if (factory == null) {
+            parseContext.set(EmbeddedDocumentExtractor.class, new RUnpackExtractor(parseContext,
+                    RUnpackExtractorFactory.DEFAULT_MAX_EMBEDDED_BYTES_FOR_EXTRACTION));
         } else {
-            return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata);
+            if (! (factory instanceof EmbeddedDocumentByteStoreExtractorFactory)) {
+                throw new TikaConfigException("EmbeddedDocumentExtractorFactory must be an " +
+                        "instance of EmbeddedDocumentByteStoreExtractorFactory if you want" +
+                        "to extract embedded bytes! I see this embedded doc factory: " +
+                        factory.getClass() + "and a request: " +
+                        fetchEmitTuple.getEmbeddedDocumentBytesConfig());
+            }
+        }
+        //TODO: especially clean this up.
+        if (!StringUtils.isBlank(fetchEmitTuple.getEmbeddedDocumentBytesConfig().getEmitter())) {
+            parseContext.set(EmbeddedDocumentBytesHandler.class,
+                    new EmittingEmbeddedDocumentBytesHandler(fetchEmitTuple.getEmitKey(),
+                            fetchEmitTuple.getEmbeddedDocumentBytesConfig(), emitterManager));
+        } else {
+            parseContext.set(EmbeddedDocumentBytesHandler.class,
+                    new BasicEmbeddedDocumentBytesHandler(
+                    fetchEmitTuple.getEmbeddedDocumentBytesConfig()));
         }
+        return parseContext;
     }
 
     private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
                                              HandlerConfig handlerConfig, InputStream stream,
-                                             Metadata metadata) {
-        ParseContext parseContext = new ParseContext();
+                                             Metadata metadata, ParseContext parseContext) {
 
         ContentHandlerFactory contentHandlerFactory =
                 new BasicContentHandlerFactory(handlerConfig.getType(),
@@ -512,6 +590,7 @@ public class PipesServer implements Runnable {
         parseContext.set(DocumentSelector.class, new DocumentSelector() {
             final int maxEmbedded = handlerConfig.maxEmbeddedResources;
             int embedded = 0;
+
             @Override
             public boolean select(Metadata metadata) {
                 if (maxEmbedded < 0) {
@@ -552,16 +631,16 @@ public class PipesServer implements Runnable {
 
     private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
                                           HandlerConfig handlerConfig, InputStream stream,
-                                          Metadata metadata) {
-        ParseContext parseContext = new ParseContext();
+                                          Metadata metadata, ParseContext parseContext) {
         //Intentionally do not add the metadata filter here!
         //We need to let stacktraces percolate
         RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
                 new BasicContentHandlerFactory(handlerConfig.getType(),
-                        handlerConfig.getWriteLimit(), handlerConfig.isThrowOnWriteLimitReached(), parseContext),
-                handlerConfig.getMaxEmbeddedResources());
+                        handlerConfig.getWriteLimit(), handlerConfig.isThrowOnWriteLimitReached(),
+                        parseContext), handlerConfig.getMaxEmbeddedResources());
 
         long start = System.currentTimeMillis();
+
         preParse(fetchEmitTuple, stream, metadata, parseContext);
         try {
             rMetaParser.parse(stream, handler, metadata, parseContext);
@@ -590,7 +669,7 @@ public class PipesServer implements Runnable {
             if (tis == null) {
                 tis = TikaInputStream.get(stream, tmp, metadata);
             }
-            _preParse(t.getId(), tis, metadata, parseContext);
+            _preParse(t, tis, metadata, parseContext);
         } finally {
             IOUtils.closeQuietly(tmp);
         }
@@ -598,13 +677,13 @@ public class PipesServer implements Runnable {
         writeIntermediate(t.getEmitKey(), metadata);
     }
 
-    private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+    private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
                            ParseContext parseContext) {
         if (digester != null) {
             try {
                 digester.digest(tis, metadata, parseContext);
             } catch (IOException e) {
-                LOG.warn("problem digesting: " + id, e);
+                LOG.warn("problem digesting: " + t.getId(), e);
             }
         }
         try {
@@ -612,7 +691,18 @@ public class PipesServer implements Runnable {
             metadata.set(Metadata.CONTENT_TYPE, mt.toString());
             metadata.set(TikaCoreProperties.CONTENT_TYPE_PARSER_OVERRIDE, mt.toString());
         } catch (IOException e) {
-            LOG.warn("problem detecting: " + id, e);
+            LOG.warn("problem detecting: " + t.getId(), e);
+        }
+
+        if (t.getEmbeddedDocumentBytesConfig() != null &&
+                t.getEmbeddedDocumentBytesConfig().isIncludeOriginal()) {
+            EmbeddedDocumentBytesHandler embeddedDocumentByteStore =
+                    parseContext.get(EmbeddedDocumentBytesHandler.class);
+            try (InputStream is = Files.newInputStream(tis.getPath())) {
+                embeddedDocumentByteStore.add(0, metadata, is);
+            } catch (IOException e) {
+                LOG.warn("problem reading source file into embedded document byte store", e);
+            }
         }
     }
 
@@ -669,14 +759,23 @@ public class PipesServer implements Runnable {
             this.emitterManager = null;
         }
         this.autoDetectParser = new AutoDetectParser(this.tikaConfig);
-        if (((AutoDetectParser)autoDetectParser).getAutoDetectParserConfig().getDigesterFactory() != null) {
-            this.digester = ((AutoDetectParser) autoDetectParser).
-                    getAutoDetectParserConfig().getDigesterFactory().build();
+        if (((AutoDetectParser) autoDetectParser).getAutoDetectParserConfig()
+                .getDigesterFactory() != null) {
+            this.digester = ((AutoDetectParser) autoDetectParser).getAutoDetectParserConfig()
+                    .getDigesterFactory().build();
             //override this value because we'll be digesting before parse
-            ((AutoDetectParser)autoDetectParser).getAutoDetectParserConfig().getDigesterFactory()
+            ((AutoDetectParser) autoDetectParser).getAutoDetectParserConfig().getDigesterFactory()
                     .setSkipContainerDocument(true);
+            //if the user hasn't configured an embedded document extractor, set up the
+            // RUnpackExtractorFactory
+            if (((AutoDetectParser) autoDetectParser).getAutoDetectParserConfig()
+                    .getEmbeddedDocumentExtractorFactory() == null) {
+                ((AutoDetectParser) autoDetectParser)
+                        .getAutoDetectParserConfig().setEmbeddedDocumentExtractorFactory(
+                                new RUnpackExtractorFactory());
+            }
         }
-        this.detector = ((AutoDetectParser)this.autoDetectParser).getDetector();
+        this.detector = ((AutoDetectParser) this.autoDetectParser).getDetector();
         this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
     }
 
@@ -734,4 +833,45 @@ public class PipesServer implements Runnable {
             exit(1);
         }
     }
+
+    class MetadataListAndEmbeddedBytes {
+        final List<Metadata> metadataList;
+        final Optional<EmbeddedDocumentBytesHandler> embeddedDocumentBytesHandler;
+
+        public MetadataListAndEmbeddedBytes(List<Metadata> metadataList,
+                                            EmbeddedDocumentBytesHandler embeddedDocumentBytesHandler) {
+            this.metadataList = metadataList;
+            this.embeddedDocumentBytesHandler = Optional.ofNullable(embeddedDocumentBytesHandler);
+        }
+
+        public List<Metadata> getMetadataList() {
+            return metadataList;
+        }
+
+        public EmbeddedDocumentBytesHandler getEmbeddedDocumentBytesHandler() {
+            return embeddedDocumentBytesHandler.get();
+        }
+
+        /**
+         * This tests whether there's any type of embedded document store
+         * ...that, for example, may require closing at the end of the parse.
+         *
+         * @return
+         */
+        public boolean hasEmbeddedDocumentByteStore() {
+            return embeddedDocumentBytesHandler.isPresent();
+        }
+
+        /**
+         * If the intent is that the metadata and byte store be packaged in a zip
+         * or similar and emitted via a single stream emitter.
+         * <p>
+         * This is basically a test that this is not an EmbeddedDocumentEmitterStore.
+         *
+         * @return
+         */
+        public boolean toBePackagedForStreamEmitter() {
+            return !(embeddedDocumentBytesHandler.get() instanceof EmittingEmbeddedDocumentBytesHandler);
+        }
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
new file mode 100644
index 000000000..071de05c4
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.extractor;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class EmbeddedDocumentBytesConfig implements Serializable {
+
+    /**
+     * Serial version UID
+     */
+    private static final long serialVersionUID = -3861669115439125268L;
+
+
+    public static EmbeddedDocumentBytesConfig SKIP = new EmbeddedDocumentBytesConfig(false);
+
+    public enum SUFFIX_STRATEGY {
+            NONE, EXISTING, DETECTED;
+
+        public static SUFFIX_STRATEGY parse(String s) {
+            if (s.equalsIgnoreCase("none")) {
+                return NONE;
+            } else if (s.equalsIgnoreCase("existing")) {
+                return EXISTING;
+            } else if (s.equalsIgnoreCase("detected")) {
+                return DETECTED;
+            }
+            throw new IllegalArgumentException("can't parse " + s);
+        }
+    }
+    private final boolean extractEmbeddedDocumentBytes;
+
+    private int zeroPadName = 0;
+
+    private SUFFIX_STRATEGY suffixStrategy = SUFFIX_STRATEGY.NONE;
+
+    private String embeddedIdPrefix = "-";
+
+    private String emitter;
+
+    private boolean includeOriginal = false;
+
+    /**
+     * Create an EmbeddedDocumentBytesConfig with
+     * {@link EmbeddedDocumentBytesConfig#extractEmbeddedDocumentBytes}
+     * set to <code>true</code>
+     */
+    public EmbeddedDocumentBytesConfig() {
+        this.extractEmbeddedDocumentBytes = true;
+    }
+
+    public EmbeddedDocumentBytesConfig(boolean extractEmbeddedDocumentBytes) {
+        this.extractEmbeddedDocumentBytes = extractEmbeddedDocumentBytes;
+    }
+
+    public static EmbeddedDocumentBytesConfig getSKIP() {
+        return SKIP;
+    }
+
+    public boolean isExtractEmbeddedDocumentBytes() {
+        return extractEmbeddedDocumentBytes;
+    }
+
+    public int getZeroPadName() {
+        return zeroPadName;
+    }
+
+    public SUFFIX_STRATEGY getSuffixStrategy() {
+        return suffixStrategy;
+    }
+
+    public String getEmbeddedIdPrefix() {
+        return embeddedIdPrefix;
+    }
+
+    public String getEmitter() {
+        return emitter;
+    }
+
+    public boolean isIncludeOriginal() {
+        return includeOriginal;
+    }
+
+    public void setZeroPadNameLength(int zeroPadName) {
+        this.zeroPadName = zeroPadName;
+    }
+
+    public void setSuffixStrategy(SUFFIX_STRATEGY suffixStrategy) {
+        this.suffixStrategy = suffixStrategy;
+    }
+
+    public void setEmbeddedIdPrefix(String embeddedIdPrefix) {
+        this.embeddedIdPrefix = embeddedIdPrefix;
+    }
+
+    public void setEmitter(String emitter) {
+        this.emitter = emitter;
+    }
+
+    public void setIncludeOriginal(boolean includeOriginal) {
+        this.includeOriginal = includeOriginal;
+    }
+
+    @Override
+    public String toString() {
+        return "EmbeddedDocumentBytesConfig{" + "extractEmbeddedDocumentBytes=" +
+                extractEmbeddedDocumentBytes + ", zeroPadName=" + zeroPadName +
+                ", suffixStrategy=" + suffixStrategy + ", embeddedIdPrefix='" + embeddedIdPrefix +
+                '\'' + ", emitter='" + emitter + '\'' + ", includeOriginal=" + includeOriginal +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        EmbeddedDocumentBytesConfig that = (EmbeddedDocumentBytesConfig) o;
+
+        if (extractEmbeddedDocumentBytes != that.extractEmbeddedDocumentBytes) {
+            return false;
+        }
+        if (zeroPadName != that.zeroPadName) {
+            return false;
+        }
+        if (includeOriginal != that.includeOriginal) {
+            return false;
+        }
+        if (suffixStrategy != that.suffixStrategy) {
+            return false;
+        }
+        if (!Objects.equals(embeddedIdPrefix, that.embeddedIdPrefix)) {
+            return false;
+        }
+        return Objects.equals(emitter, that.emitter);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (extractEmbeddedDocumentBytes ? 1 : 0);
+        result = 31 * result + zeroPadName;
+        result = 31 * result + (suffixStrategy != null ? suffixStrategy.hashCode() : 0);
+        result = 31 * result + (embeddedIdPrefix != null ? embeddedIdPrefix.hashCode() : 0);
+        result = 31 * result + (emitter != null ? emitter.hashCode() : 0);
+        result = 31 * result + (includeOriginal ? 1 : 0);
+        return result;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
new file mode 100644
index 000000000..1132a4bc6
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOExceptionWithCause;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.extractor.AbstractEmbeddedDocumentBytesHandler;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+public class EmittingEmbeddedDocumentBytesHandler extends AbstractEmbeddedDocumentBytesHandler {
+    private final EmitKey containerEmitKey;
+    private final EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
+    private final StreamEmitter emitter;
+
+    private static final Metadata METADATA = new Metadata();
+    public EmittingEmbeddedDocumentBytesHandler(EmitKey containerEmitKey,
+                                                EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+                                                EmitterManager emitterManager) throws TikaConfigException {
+        this.containerEmitKey = containerEmitKey;
+        this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
+        Emitter tmpEmitter =
+                emitterManager.getEmitter(embeddedDocumentBytesConfig.getEmitter());
+        if (! (tmpEmitter instanceof StreamEmitter)) {
+            throw new TikaConfigException("Emitter " +
+                    embeddedDocumentBytesConfig.getEmitter()
+                    + " must implement a StreamEmitter");
+        }
+        this.emitter = (StreamEmitter) tmpEmitter;
+    }
+
+    @Override
+    public void add(int id, Metadata metadata, InputStream inputStream) throws IOException {
+        //intentionally do not call super.add, because we want the ids list to be empty
+        String emitKey = getEmitKey(containerEmitKey.getEmitKey(),
+                id, embeddedDocumentBytesConfig, metadata);
+        try {
+            emitter.emit(emitKey, inputStream, METADATA);
+        } catch (TikaEmitterException e) {
+            throw new IOExceptionWithCause(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (emitter instanceof Closeable) {
+            ((Closeable) emitter).close();
+        }
+    }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/parser/AutoDetectParserConfigTest.java b/tika-core/src/test/java/org/apache/tika/parser/AutoDetectParserConfigTest.java
new file mode 100644
index 000000000..62b061d98
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/parser/AutoDetectParserConfigTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.parser;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.InputStream;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.extractor.EmbeddedBytesSelector;
+import org.apache.tika.extractor.RUnpackExtractor;
+import org.apache.tika.extractor.RUnpackExtractorFactory;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.utils.StringUtils;
+
+public class AutoDetectParserConfigTest {
+
+    @Test
+    public void testEmbeddedBytesSelector() throws Exception {
+        TikaConfig config;
+        try (InputStream is = TikaConfig.class.getResourceAsStream(
+                "TIKA-4207-embedded-bytes-config.xml")) {
+            config = new TikaConfig(is);
+        }
+        AutoDetectParserConfig c = config.getAutoDetectParserConfig();
+        RUnpackExtractorFactory f =
+                (RUnpackExtractorFactory) c.getEmbeddedDocumentExtractorFactory();
+
+        Metadata metadata = new Metadata();
+        ParseContext parseContext = new ParseContext();
+        RUnpackExtractor ex = (RUnpackExtractor) f.newInstance(metadata, parseContext);
+        EmbeddedBytesSelector selector = ex.getEmbeddedBytesSelector();
+        assertFalse(selector.select(getMetadata("", "")));
+        assertTrue(selector.select(getMetadata("application/pdf", "")));
+        assertTrue(selector.select(getMetadata("application/pdf", "ATTACHMENT")));
+        assertTrue(selector.select(getMetadata("application/pdf", "INLINE")));
+        assertTrue(selector.select(getMetadata("text/plain;charset=UTF-7", "INLINE")));
+
+        assertFalse(selector.select(getMetadata("application/pdf", "MACRO")));
+        assertFalse(selector.select(getMetadata("application/docx", "")));
+
+    }
+
+    private Metadata getMetadata(String mime, String embeddedResourceType) {
+        Metadata m = new Metadata();
+        if (!StringUtils.isBlank(mime)) {
+            m.set(Metadata.CONTENT_TYPE, mime);
+        }
+        if (!StringUtils.isBlank(embeddedResourceType)) {
+            m.set(TikaCoreProperties.EMBEDDED_RESOURCE_TYPE, embeddedResourceType);
+        }
+        return m;
+    }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/parser/mock/MockParser.java b/tika-core/src/test/java/org/apache/tika/parser/mock/MockParser.java
index 0051a7740..de464bca5 100644
--- a/tika-core/src/test/java/org/apache/tika/parser/mock/MockParser.java
+++ b/tika-core/src/test/java/org/apache/tika/parser/mock/MockParser.java
@@ -45,6 +45,7 @@ import com.martensigwart.fakeload.FakeLoadBuilder;
 import com.martensigwart.fakeload.FakeLoadExecutor;
 import com.martensigwart.fakeload.FakeLoadExecutors;
 import com.martensigwart.fakeload.MemoryUnit;
+import org.apache.commons.io.input.CloseShieldInputStream;
 import org.w3c.dom.Document;
 import org.w3c.dom.NamedNodeMap;
 import org.w3c.dom.Node;
@@ -54,7 +55,7 @@ import org.xml.sax.SAXException;
 
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.extractor.EmbeddedDocumentExtractor;
-import org.apache.tika.extractor.ParsingEmbeddedDocumentExtractor;
+import org.apache.tika.extractor.EmbeddedDocumentUtil;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.mime.MediaType;
@@ -120,7 +121,7 @@ public class MockParser implements Parser {
         Document doc = null;
         try {
             DocumentBuilder docBuilder = context.getDocumentBuilder();
-            doc = docBuilder.parse(stream);
+            doc = docBuilder.parse(new CloseShieldInputStream(stream));
         } catch (SAXException e) {
             //to distinguish between SAX on read vs SAX while writing
             throw new IOException(e);
@@ -258,29 +259,16 @@ public class MockParser implements Parser {
         }
 
         String embeddedText = action.getTextContent();
-        EmbeddedDocumentExtractor extractor = getEmbeddedDocumentExtractor(context);
+        EmbeddedDocumentExtractor extractor = EmbeddedDocumentUtil.getEmbeddedDocumentExtractor(context);
+
         Metadata m = new Metadata();
         m.set(TikaCoreProperties.RESOURCE_NAME_KEY, fileName);
         if (!"".equals(contentType)) {
             m.set(Metadata.CONTENT_TYPE, contentType);
         }
-        InputStream is = new ByteArrayInputStream(embeddedText.getBytes(UTF_8));
-
-        extractor.parseEmbedded(is, new EmbeddedContentHandler(handler), m, true);
-
-
-    }
-
-    protected EmbeddedDocumentExtractor getEmbeddedDocumentExtractor(ParseContext context) {
-        EmbeddedDocumentExtractor extractor = context.get(EmbeddedDocumentExtractor.class);
-        if (extractor == null) {
-            Parser p = context.get(Parser.class);
-            if (p == null) {
-                context.set(Parser.class, new MockParser());
-            }
-            extractor = new ParsingEmbeddedDocumentExtractor(context);
+        try (InputStream is = new ByteArrayInputStream(embeddedText.getBytes(UTF_8))) {
+            extractor.parseEmbedded(is, new EmbeddedContentHandler(handler), m, true);
         }
-        return extractor;
     }
 
     private void print(Node action, String name) throws IOException {
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
index 53c784796..66f54272b 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
@@ -22,8 +22,8 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.List;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
@@ -31,8 +31,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.TikaTest;
+import org.apache.tika.extractor.BasicEmbeddedDocumentBytesHandler;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
@@ -69,8 +71,120 @@ public class PipesServerTest extends TikaTest {
                 new FetchKey("fs", "mock.xml"),
                 new EmitKey("", ""));
         Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
-        List<Metadata> metadataList = pipesServer.parseIt(fetchEmitTuple, fetcher);
+        PipesServer.MetadataListAndEmbeddedBytes
+                parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
         assertEquals("5f3b924303e960ce35d7f705e91d3018dd110a9c3cef0546a91fe013d6dad6fd",
-                metadataList.get(0).get("X-TIKA:digest:SHA-256"));
+                parseData.metadataList.get(0).get("X-TIKA:digest:SHA-256"));
+    }
+
+    @Test
+    public void testEmbeddedStreamEmitter(@TempDir Path tmp) throws Exception {
+        if (Files.isDirectory(tmp)) {
+            FileUtils.deleteDirectory(tmp.toFile());
+        }
+        Files.createDirectories(tmp);
+        Path tikaConfig = tmp.resolve("tika-config.xml");
+
+        String xml = IOUtils.toString(
+                PipesServerTest.class.getResourceAsStream("TIKA-4207.xml"),
+                StandardCharsets.UTF_8);
+        xml = xml.replace("BASE_PATH", tmp.toAbsolutePath().toString());
+        Files.write(tikaConfig, xml.getBytes(StandardCharsets.UTF_8));
+
+        Files.copy(PipesServerTest.class.getResourceAsStream("/test-documents/basic_embedded.xml"),
+                tmp.resolve("mock.xml"));
+
+        PipesServer pipesServer = new PipesServer(tikaConfig,
+                new UnsynchronizedByteArrayInputStream(new byte[0]),
+                new PrintStream(new UnsynchronizedByteArrayOutputStream(), true,
+                        StandardCharsets.UTF_8.name()),
+                -1, 30000, 30000);
+
+        pipesServer.initializeResources();
+        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
+                new EmbeddedDocumentBytesConfig(true);
+        embeddedDocumentBytesConfig.setIncludeOriginal(true);
+
+        FetchEmitTuple fetchEmitTuple = new FetchEmitTuple("id",
+                new FetchKey("fs", "mock.xml"),
+                new EmitKey("", ""), new Metadata(),
+                HandlerConfig.DEFAULT_HANDLER_CONFIG, FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT,
+                embeddedDocumentBytesConfig);
+        Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
+        PipesServer.MetadataListAndEmbeddedBytes
+                parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
+        assertEquals(2, parseData.metadataList.size());
+
+        byte[] bytes0 =
+                IOUtils.toByteArray(
+                        ((BasicEmbeddedDocumentBytesHandler)parseData.getEmbeddedDocumentBytesHandler())
+                        .getDocument(0));
+        byte[] bytes1 =
+                IOUtils.toByteArray(
+                        ((BasicEmbeddedDocumentBytesHandler)parseData.getEmbeddedDocumentBytesHandler())
+                                .getDocument(1));
+
+        assertContains("is to trigger mock on the embedded",
+                new String(bytes0, StandardCharsets.UTF_8));
+
+        assertContains("embeddedAuthor</metadata>",
+                new String(bytes1, StandardCharsets.UTF_8));
+        assertEquals("fdaa937c96d1ed010b8d307ccddf9d11c3b48db732a8771eaafe99d59e076d0a",
+                parseData.metadataList.get(0).get("X-TIKA:digest:SHA-256"));
+    }
+
+    @Test
+    public void testEmbeddedStreamEmitterLimitBytes(@TempDir Path tmp) throws Exception {
+        if (Files.isDirectory(tmp)) {
+            FileUtils.deleteDirectory(tmp.toFile());
+        }
+        Files.createDirectories(tmp);
+        Path tikaConfig = tmp.resolve("tika-config.xml");
+
+        String xml = IOUtils.toString(
+                PipesServerTest.class.getResourceAsStream("TIKA-4207-limit-bytes.xml"),
+                StandardCharsets.UTF_8);
+        xml = xml.replace("BASE_PATH", tmp.toAbsolutePath().toString());
+        Files.write(tikaConfig, xml.getBytes(StandardCharsets.UTF_8));
+
+        Files.copy(PipesServerTest.class.getResourceAsStream("/test-documents/basic_embedded.xml"),
+                tmp.resolve("mock.xml"));
+
+        PipesServer pipesServer = new PipesServer(tikaConfig,
+                new UnsynchronizedByteArrayInputStream(new byte[0]),
+                new PrintStream(new UnsynchronizedByteArrayOutputStream(), true,
+                        StandardCharsets.UTF_8.name()),
+                -1, 30000, 30000);
+
+        pipesServer.initializeResources();
+        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
+                new EmbeddedDocumentBytesConfig(true);
+        embeddedDocumentBytesConfig.setIncludeOriginal(true);
+
+        FetchEmitTuple fetchEmitTuple = new FetchEmitTuple("id",
+                new FetchKey("fs", "mock.xml"),
+                new EmitKey("", ""), new Metadata(),
+                HandlerConfig.DEFAULT_HANDLER_CONFIG, FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT,
+                embeddedDocumentBytesConfig);
+        Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
+        PipesServer.MetadataListAndEmbeddedBytes
+                parseData = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
+        assertEquals(2, parseData.metadataList.size());
+
+        byte[] bytes0 =
+                IOUtils.toByteArray(
+                        ((BasicEmbeddedDocumentBytesHandler)parseData.getEmbeddedDocumentBytesHandler())
+                                .getDocument(0));
+        byte[] bytes1 =
+                IOUtils.toByteArray(
+                        ((BasicEmbeddedDocumentBytesHandler)parseData.getEmbeddedDocumentBytesHandler())
+                                .getDocument(1));
+
+        assertContains("is to trigger mock on the embedded",
+                new String(bytes0, StandardCharsets.UTF_8));
+
+        assertEquals(10, bytes1.length);
+        assertEquals("fdaa937c96d1ed010b8d307ccddf9d11c3b48db732a8771eaafe99d59e076d0a",
+                parseData.metadataList.get(0).get("X-TIKA:digest:SHA-256"));
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java
similarity index 99%
rename from tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
rename to tika-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java
index 0277bc11d..4522a2ea1 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncChaosMonkeyTest.java
@@ -40,7 +40,7 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
 import org.apache.tika.utils.ProcessUtils;
 
-public class AsyncProcessorTest {
+public class AsyncChaosMonkeyTest {
 
     private final String OOM = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
             "<throw class=\"java.lang.OutOfMemoryError\">oom message</throw>\n</mock>";
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml b/tika-core/src/test/resources/org/apache/tika/config/TIKA-4207-embedded-bytes-config.xml
similarity index 75%
copy from tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
copy to tika-core/src/test/resources/org/apache/tika/config/TIKA-4207-embedded-bytes-config.xml
index 0e2f26bd2..5e1339a40 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
+++ b/tika-core/src/test/resources/org/apache/tika/config/TIKA-4207-embedded-bytes-config.xml
@@ -22,8 +22,17 @@
   <autoDetectParserConfig>
     <spoolToDisk>123450</spoolToDisk>
     <outputThreshold>678900</outputThreshold>
-    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory">
+    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.RUnpackExtractorFactory">
       <writeFileNameToContent>false</writeFileNameToContent>
+      <embeddedBytesIncludeMimeTypes>
+        <mime>application/pdf</mime>
+        <mime>application/rtf</mime>
+        <mime>text/plain</mime>
+      </embeddedBytesIncludeMimeTypes>
+      <embeddedBytesIncludeEmbeddedResourceTypes>
+        <type>ATTACHMENT</type>
+        <type>INLINE</type>
+      </embeddedBytesIncludeEmbeddedResourceTypes>
     </embeddedDocumentExtractorFactory>
   </autoDetectParserConfig>
-</properties>
+</properties>
\ No newline at end of file
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml b/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml
similarity index 70%
copy from tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
copy to tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml
index 0e2f26bd2..5e46a09e9 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207-limit-bytes.xml
@@ -16,14 +16,19 @@
   limitations under the License.
 -->
 <properties>
-  <parsers>
-    <parser class="org.apache.tika.parser.DefaultParser"/>
-  </parsers>
   <autoDetectParserConfig>
-    <spoolToDisk>123450</spoolToDisk>
-    <outputThreshold>678900</outputThreshold>
-    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory">
+    <digesterFactory class="org.apache.tika.pipes.async.MockDigesterFactory">
+      <skipContainerDocument>false</skipContainerDocument>
+    </digesterFactory>
+    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.RUnpackExtractorFactory">
       <writeFileNameToContent>false</writeFileNameToContent>
+      <maxEmbeddedBytesForExtraction>10</maxEmbeddedBytesForExtraction>
     </embeddedDocumentExtractorFactory>
   </autoDetectParserConfig>
-</properties>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>fs</name>
+      <basePath>BASE_PATH</basePath>
+    </fetcher>
+  </fetchers>
+</properties>
\ No newline at end of file
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml b/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml
similarity index 69%
copy from tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml
copy to tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml
index f54eb9a0a..9f37ad0fe 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/TIKA-4207.xml
@@ -16,14 +16,15 @@
   limitations under the License.
 -->
 <properties>
-  <parsers>
-    <parser class="org.apache.tika.parser.DefaultParser"/>
-  </parsers>
   <autoDetectParserConfig>
-    <spoolToDisk>123450</spoolToDisk>
-    <outputThreshold>678900</outputThreshold>
-    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory">
-      <writeFileNameToContent>true</writeFileNameToContent>
-    </embeddedDocumentExtractorFactory>
+    <digesterFactory class="org.apache.tika.pipes.async.MockDigesterFactory">
+      <skipContainerDocument>false</skipContainerDocument>
+    </digesterFactory>
   </autoDetectParserConfig>
-</properties>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>fs</name>
+      <basePath>BASE_PATH</basePath>
+    </fetcher>
+  </fetchers>
+</properties>
\ No newline at end of file
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/WMFParser.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/WMFParser.java
index 73b95b58c..3c55a14b0 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/WMFParser.java
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-microsoft-module/src/main/java/org/apache/tika/parser/microsoft/WMFParser.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.Set;
 
+import org.apache.commons.io.input.CloseShieldInputStream;
 import org.apache.poi.hwmf.record.HwmfFont;
 import org.apache.poi.hwmf.record.HwmfRecord;
 import org.apache.poi.hwmf.record.HwmfRecordType;
@@ -63,7 +64,7 @@ public class WMFParser implements Parser {
         try {
             HwmfPicture picture = null;
             try {
-                picture = new HwmfPicture(stream);
+                picture = new HwmfPicture(CloseShieldInputStream.wrap(stream));
             } catch (ArrayIndexOutOfBoundsException e) {
                 //POI can throw this on corrupt files
                 throw new TikaException(e.getClass().getSimpleName() + ": " + e.getMessage(), e);
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml b/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
index 0e2f26bd2..9cedc9ed4 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
@@ -22,7 +22,7 @@
   <autoDetectParserConfig>
     <spoolToDisk>123450</spoolToDisk>
     <outputThreshold>678900</outputThreshold>
-    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory">
+    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.RUnpackExtractorFactory">
       <writeFileNameToContent>false</writeFileNameToContent>
     </embeddedDocumentExtractorFactory>
   </autoDetectParserConfig>
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml b/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml
index f54eb9a0a..369acafc9 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-with-names.xml
@@ -22,7 +22,7 @@
   <autoDetectParserConfig>
     <spoolToDisk>123450</spoolToDisk>
     <outputThreshold>678900</outputThreshold>
-    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory">
+    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.RUnpackExtractorFactory">
       <writeFileNameToContent>true</writeFileNameToContent>
     </embeddedDocumentExtractorFactory>
   </autoDetectParserConfig>
diff --git a/tika-pipes/tika-async-cli/pom.xml b/tika-pipes/tika-async-cli/pom.xml
index db2966136..239cf22c7 100644
--- a/tika-pipes/tika-async-cli/pom.xml
+++ b/tika-pipes/tika-async-cli/pom.xml
@@ -37,6 +37,13 @@
       <artifactId>tika-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <!-- logging -->
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
new file mode 100644
index 000000000..4bcdacb9e
--- /dev/null
+++ b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/AsyncProcessorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.async.cli;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.tika.TikaTest;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.async.AsyncProcessor;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+
+/**
+ * This should be in tika-core, but we want to avoid a dependency mess with tika-serialization
+ */
+public class AsyncProcessorTest extends TikaTest {
+    //TODO -- integrate json pipes iterator and run with AyncProcessor.main
+    @TempDir
+    private Path basedir;
+    private Path inputDir;
+
+    private Path bytesDir;
+
+    private Path jsonDir;
+
+    private Path configDir;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        inputDir = basedir.resolve("input");
+
+        bytesDir = basedir.resolve("bytes");
+
+        jsonDir = basedir.resolve("json");
+
+        configDir = basedir.resolve("config");
+        Path tikaConfig = configDir.resolve("tika-config.xml");
+
+        Files.createDirectories(basedir);
+        Files.createDirectories(configDir);
+        Files.createDirectories(inputDir);
+
+        String xml = IOUtils.toString(
+                AsyncProcessorTest.class.getResourceAsStream("/configs/TIKA-4207-emitter.xml"),
+                    StandardCharsets.UTF_8);
+        //do stuff to xml
+        xml = xml.replace("BASE_PATH", inputDir.toAbsolutePath().toString());
+        xml = xml.replace("JSON_PATH", jsonDir.toAbsolutePath().toString());
+        xml = xml.replace("BYTES_PATH", bytesDir.toAbsolutePath().toString());
+
+        Files.writeString(tikaConfig, xml, StandardCharsets.UTF_8);
+
+        Path mock = inputDir.resolve("mock.xml");
+        try (OutputStream os = Files.newOutputStream(mock)) {
+            IOUtils.copy(getClass().getResourceAsStream("/test-documents/basic_embedded.xml"),
+                    os);
+        }
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+//        TikaAsyncCLI cli = new TikaAsyncCLI();
+  //      cli.main(new String[]{ configDir.resolve("tika-config.xml").toAbsolutePath().toString()});
+        AsyncProcessor processor = new AsyncProcessor(configDir.resolve("tika-config.xml"));
+
+        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
+                new EmbeddedDocumentBytesConfig(true);
+        embeddedDocumentBytesConfig.setIncludeOriginal(true);
+        embeddedDocumentBytesConfig.setEmitter("bytes");
+        embeddedDocumentBytesConfig.setSuffixStrategy(EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.NONE);
+        embeddedDocumentBytesConfig.setEmbeddedIdPrefix("-");
+
+        FetchEmitTuple t = new FetchEmitTuple("myId-1",
+                new FetchKey("fs",  "mock.xml"),
+                new EmitKey("json", "emit-1"),
+                new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
+                FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT, embeddedDocumentBytesConfig);
+
+        processor.offer(t, 1000);
+
+        for (int i = 0; i < 10; i++) {
+            processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000);
+        }
+        //TODO clean this up
+        while (processor.checkActive()) {
+            Thread.sleep(100);
+        }
+        processor.close();
+
+        String container = Files.readString(bytesDir.resolve("emit-1/emit-1-0"));
+        assertContains("\"dc:creator\">Nikolai Lobachevsky", container);
+
+        String xmlEmbedded = Files.readString(bytesDir.resolve("emit-1/emit-1-1"));
+        assertContains("name=\"dc:creator\"", xmlEmbedded);
+        assertContains(">embeddedAuthor</metadata>", xmlEmbedded);
+
+        List<Metadata> metadataList;
+        try (BufferedReader reader = Files.newBufferedReader(jsonDir.resolve("emit-1.json"))) {
+            metadataList = JsonMetadataList.fromJson(reader);
+        }
+        assertEquals(2, metadataList.size());
+        assertContains("main_content", metadataList.get(0).get(TikaCoreProperties.TIKA_CONTENT));
+        assertContains("some_embedded_content",
+                metadataList.get(1).get(TikaCoreProperties.TIKA_CONTENT));
+    }
+}
diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
index fc6694c74..08c962f10 100644
--- a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
+++ b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java
@@ -28,7 +28,7 @@ import org.apache.tika.exception.TikaConfigException;
 public class TikaAsyncCLITest {
     @Test
     public void testCrash() throws Exception {
-        Path config = getPath("/tika-config-broken.xml");
+        Path config = getPath("/configs/tika-config-broken.xml");
         assertThrows(TikaConfigException.class,
                 () -> TikaAsyncCLI.main(
                         new String[] {
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml b/tika-pipes/tika-async-cli/src/test/resources/configs/TIKA-4207-emitter.xml
similarity index 61%
copy from tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
copy to tika-pipes/tika-async-cli/src/test/resources/configs/TIKA-4207-emitter.xml
index 0e2f26bd2..5391c8496 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-package/src/test/resources/configs/tika-config-no-names.xml
+++ b/tika-pipes/tika-async-cli/src/test/resources/configs/TIKA-4207-emitter.xml
@@ -16,14 +16,20 @@
   limitations under the License.
 -->
 <properties>
-  <parsers>
-    <parser class="org.apache.tika.parser.DefaultParser"/>
-  </parsers>
-  <autoDetectParserConfig>
-    <spoolToDisk>123450</spoolToDisk>
-    <outputThreshold>678900</outputThreshold>
-    <embeddedDocumentExtractorFactory class="org.apache.tika.extractor.ParsingEmbeddedDocumentExtractorFactory">
-      <writeFileNameToContent>false</writeFileNameToContent>
-    </embeddedDocumentExtractorFactory>
-  </autoDetectParserConfig>
-</properties>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>fs</name>
+      <basePath>BASE_PATH</basePath>
+    </fetcher>
+  </fetchers>
+  <emitters>
+    <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+      <name>json</name>
+      <basePath>JSON_PATH</basePath>
+    </emitter>
+    <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+      <name>bytes</name>
+      <basePath>BYTES_PATH</basePath>
+    </emitter>
+  </emitters>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml b/tika-pipes/tika-async-cli/src/test/resources/configs/tika-config-broken.xml
similarity index 100%
copy from tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml
copy to tika-pipes/tika-async-cli/src/test/resources/configs/tika-config-broken.xml
diff --git a/tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml b/tika-pipes/tika-async-cli/src/test/resources/test-documents/basic_embedded.xml
similarity index 59%
rename from tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml
rename to tika-pipes/tika-async-cli/src/test/resources/test-documents/basic_embedded.xml
index 5ee379e6f..7536a1603 100644
--- a/tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml
+++ b/tika-pipes/tika-async-cli/src/test/resources/test-documents/basic_embedded.xml
@@ -1,4 +1,5 @@
 <?xml version="1.0" encoding="UTF-8" ?>
+
 <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
@@ -17,16 +18,18 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<properties>
-  <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
-      <name>s3</name>
-      <region>us-east-1</region>
-      <profile><!-- fill in here --></profile>
-    </fetcher>
-  </fetchers>
-  <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
-    <fetcherName>fs</fetcherName>
-    <basePath>basePath</basePath>
-  </pipesIterator>
-</properties>
\ No newline at end of file
+
+<mock>
+
+    <metadata action="add" name="dc:creator">Nikolai Lobachevsky</metadata>
+    <write element="p">main_content</write>
+    <!-- auto detection wasn't working for some reason; add content-type as
+        is to trigger mock on the embedded -->
+    <embedded filename="embed1.xml" content-type="application/mock+xml">
+        &lt;mock&gt;
+            &lt;metadata action=&quot;add&quot; name=&quot;dc:creator&quot;&gt;embeddedAuthor&lt;/metadata&gt;
+            &lt;write element="p"&gt;some_embedded_content&lt;/write&gt;
+        &lt;/mock&gt;
+    </embedded>
+
+</mock>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-iterators/pom.xml b/tika-pipes/tika-pipes-iterators/pom.xml
index 1abdb0782..5cb99fbd1 100644
--- a/tika-pipes/tika-pipes-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -35,6 +35,7 @@
        in tika-core if you want a file system directory crawler -->
   <modules>
     <module>tika-pipes-iterator-csv</module>
+    <module>tika-pipes-iterator-json</module>
     <module>tika-pipes-iterator-jdbc</module>
     <module>tika-pipes-iterator-s3</module>
     <module>tika-pipes-iterator-kafka</module>
diff --git a/tika-pipes/tika-async-cli/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/pom.xml
similarity index 78%
copy from tika-pipes/tika-async-cli/pom.xml
copy to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/pom.xml
index db2966136..7b3307f5e 100644
--- a/tika-pipes/tika-async-cli/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/pom.xml
@@ -20,15 +20,15 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <groupId>org.apache.tika</groupId>
-    <artifactId>tika-pipes</artifactId>
+    <artifactId>tika-pipes-iterators</artifactId>
     <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>tika-async-cli</artifactId>
+  <artifactId>tika-pipes-iterator-json</artifactId>
 
-  <name>Apache Tika Async CLI</name>
+  <name>Apache Tika Pipes Iterator - json</name>
   <url>https://tika.apache.org/</url>
 
   <dependencies>
@@ -36,38 +36,34 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-core</artifactId>
       <version>${project.version}</version>
-    </dependency>
-    <!-- logging -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j2-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>tika-emitter-fs</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-serialization</artifactId>
       <version>${project.version}</version>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
   </dependencies>
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <version>${rat.version}</version>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/test-simple.csv</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
         <configuration>
           <archive>
             <manifestEntries>
-              <Automatic-Module-Name>org.apache.tika.pipes.reporters.fs.status</Automatic-Module-Name>
+              <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.csv</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
@@ -104,12 +100,6 @@
                 </filter>
               </filters>
               <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  <mainClass>org.apache.tika.async.cli.TikaAsyncCLI</mainClass>
-                  <manifestEntries>
-                    <Multi-Release>true</Multi-Release>
-                  </manifestEntries>
-                </transformer>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                   <resource>META-INF/LICENSE</resource>
                   <file>target/classes/META-INF/LICENSE</file>
@@ -127,6 +117,7 @@
           </execution>
         </executions>
       </plugin>
+
     </plugins>
   </build>
 
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java
new file mode 100644
index 000000000..4ff338736
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/main/java/org/apache/tika/pipes/pipesiterator/json/JsonPipesIterator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.pipesiterator.json;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Initializable;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+
+/**
+ * Iterates through a UTF-8 text file with one FetchEmitTuple
+ * json object per line.
+ */
+public class JsonPipesIterator extends PipesIterator implements Initializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JsonPipesIterator.class);
+
+    private Path jsonPath;
+
+    @Override
+    protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+        try (BufferedReader reader = Files.newBufferedReader(jsonPath, StandardCharsets.UTF_8)) {
+            String line = reader.readLine();
+            while (line != null) {
+                try (Reader r = new StringReader(line)) {
+                    FetchEmitTuple t = JsonFetchEmitTuple.fromJson(r);
+                    LOGGER.info("from json: " + t);
+                    tryToAdd(t);
+                    line = reader.readLine();
+                }
+            }
+        }
+    }
+
+    public void setJsonPath(String jsonPath) {
+        this.jsonPath = Paths.get(jsonPath);
+    }
+}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/java/org/apache/tika/pipes/pipesiterator/json/TestJsonPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/java/org/apache/tika/pipes/pipesiterator/json/TestJsonPipesIterator.java
new file mode 100644
index 000000000..671fecc5f
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/java/org/apache/tika/pipes/pipesiterator/json/TestJsonPipesIterator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.pipesiterator.json;
+
+import java.nio.file.Paths;
+import java.util.Iterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.FetchEmitTuple;
+
+@Disabled("until we can write actual tests")
+public class TestJsonPipesIterator {
+
+    @Test
+    public void testBasic() throws Exception {
+        JsonPipesIterator pipesIterator = new JsonPipesIterator();
+        pipesIterator.setJsonPath(
+                Paths.get(this.getClass().getResource("/test-documents/test.json").toURI())
+                        .toAbsolutePath().toString());
+        Iterator<FetchEmitTuple> it = pipesIterator.iterator();
+        while (it.hasNext()) {
+            //System.out.println(it.next());
+        }
+    }
+
+    @Test
+    public void testWithEmbDocBytes() throws Exception {
+        JsonPipesIterator pipesIterator = new JsonPipesIterator();
+        pipesIterator.setJsonPath(
+                Paths.get(
+                        this.getClass().getResource("/test-documents/test-with-embedded-bytes.json").toURI())
+                        .toAbsolutePath().toString());
+        Iterator<FetchEmitTuple> it = pipesIterator.iterator();
+        while (it.hasNext()) {
+            //System.out.println(it.next());
+        }
+    }
+
+
+    /*
+    //use this to generate test files
+    public static void main(String[] args) throws Exception {
+        Path p = Paths.get("/home/tallison/Intellij/tika-main/tika-pipes/tika-pipes-iterators" +
+                "/tika-pipes-iterator-json/src/test/resources/test-documents/test-with-embedded" +
+                "-bytes.json");
+        try (BufferedWriter writer = Files.newBufferedWriter(p, StandardCharsets.UTF_8)) {
+            HandlerConfig handlerConfig =
+                    new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT,
+                            HandlerConfig.PARSE_MODE.RMETA, -1, -1,
+                            false);
+            EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true);
+            for (int i = 0; i < 100; i++) {
+                String id = "myid-"+i;
+                FetchEmitTuple t = new FetchEmitTuple(
+                        id,
+                        new FetchKey("fs", i + ".xml"),
+                        new EmitKey("fs", i + ".xml.json"),
+                        new Metadata(),
+                        handlerConfig,
+                        FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT,
+                        config);
+                String line = JsonFetchEmitTuple.toJson(t);
+                writer.write(line);
+                writer.newLine();
+            }
+        }
+    }*/
+}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/resources/test-documents/test-with-embedded-bytes.json b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/resources/test-documents/test-with-embedded-bytes.json
new file mode 100644
index 000000000..5e064d2d7
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/resources/test-documents/test-with-embedded-bytes.json
@@ -0,0 +1,100 @@
+{"id":"myid-0","fetcher":"fs","fetchKey":"0.xml","emitter":"fs","emitKey":"0.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-1","fetcher":"fs","fetchKey":"1.xml","emitter":"fs","emitKey":"1.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-2","fetcher":"fs","fetchKey":"2.xml","emitter":"fs","emitKey":"2.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-3","fetcher":"fs","fetchKey":"3.xml","emitter":"fs","emitKey":"3.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-4","fetcher":"fs","fetchKey":"4.xml","emitter":"fs","emitKey":"4.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-5","fetcher":"fs","fetchKey":"5.xml","emitter":"fs","emitKey":"5.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-6","fetcher":"fs","fetchKey":"6.xml","emitter":"fs","emitKey":"6.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-7","fetcher":"fs","fetchKey":"7.xml","emitter":"fs","emitKey":"7.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-8","fetcher":"fs","fetchKey":"8.xml","emitter":"fs","emitKey":"8.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-9","fetcher":"fs","fetchKey":"9.xml","emitter":"fs","emitKey":"9.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-10","fetcher":"fs","fetchKey":"10.xml","emitter":"fs","emitKey":"10.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-11","fetcher":"fs","fetchKey":"11.xml","emitter":"fs","emitKey":"11.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-12","fetcher":"fs","fetchKey":"12.xml","emitter":"fs","emitKey":"12.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-13","fetcher":"fs","fetchKey":"13.xml","emitter":"fs","emitKey":"13.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-14","fetcher":"fs","fetchKey":"14.xml","emitter":"fs","emitKey":"14.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-15","fetcher":"fs","fetchKey":"15.xml","emitter":"fs","emitKey":"15.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-16","fetcher":"fs","fetchKey":"16.xml","emitter":"fs","emitKey":"16.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-17","fetcher":"fs","fetchKey":"17.xml","emitter":"fs","emitKey":"17.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-18","fetcher":"fs","fetchKey":"18.xml","emitter":"fs","emitKey":"18.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-19","fetcher":"fs","fetchKey":"19.xml","emitter":"fs","emitKey":"19.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-20","fetcher":"fs","fetchKey":"20.xml","emitter":"fs","emitKey":"20.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-21","fetcher":"fs","fetchKey":"21.xml","emitter":"fs","emitKey":"21.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-22","fetcher":"fs","fetchKey":"22.xml","emitter":"fs","emitKey":"22.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-23","fetcher":"fs","fetchKey":"23.xml","emitter":"fs","emitKey":"23.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-24","fetcher":"fs","fetchKey":"24.xml","emitter":"fs","emitKey":"24.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-25","fetcher":"fs","fetchKey":"25.xml","emitter":"fs","emitKey":"25.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-26","fetcher":"fs","fetchKey":"26.xml","emitter":"fs","emitKey":"26.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-27","fetcher":"fs","fetchKey":"27.xml","emitter":"fs","emitKey":"27.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-28","fetcher":"fs","fetchKey":"28.xml","emitter":"fs","emitKey":"28.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-29","fetcher":"fs","fetchKey":"29.xml","emitter":"fs","emitKey":"29.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-30","fetcher":"fs","fetchKey":"30.xml","emitter":"fs","emitKey":"30.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-31","fetcher":"fs","fetchKey":"31.xml","emitter":"fs","emitKey":"31.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-32","fetcher":"fs","fetchKey":"32.xml","emitter":"fs","emitKey":"32.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-33","fetcher":"fs","fetchKey":"33.xml","emitter":"fs","emitKey":"33.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-34","fetcher":"fs","fetchKey":"34.xml","emitter":"fs","emitKey":"34.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-35","fetcher":"fs","fetchKey":"35.xml","emitter":"fs","emitKey":"35.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-36","fetcher":"fs","fetchKey":"36.xml","emitter":"fs","emitKey":"36.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-37","fetcher":"fs","fetchKey":"37.xml","emitter":"fs","emitKey":"37.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-38","fetcher":"fs","fetchKey":"38.xml","emitter":"fs","emitKey":"38.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-39","fetcher":"fs","fetchKey":"39.xml","emitter":"fs","emitKey":"39.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-40","fetcher":"fs","fetchKey":"40.xml","emitter":"fs","emitKey":"40.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-41","fetcher":"fs","fetchKey":"41.xml","emitter":"fs","emitKey":"41.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-42","fetcher":"fs","fetchKey":"42.xml","emitter":"fs","emitKey":"42.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-43","fetcher":"fs","fetchKey":"43.xml","emitter":"fs","emitKey":"43.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-44","fetcher":"fs","fetchKey":"44.xml","emitter":"fs","emitKey":"44.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-45","fetcher":"fs","fetchKey":"45.xml","emitter":"fs","emitKey":"45.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-46","fetcher":"fs","fetchKey":"46.xml","emitter":"fs","emitKey":"46.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-47","fetcher":"fs","fetchKey":"47.xml","emitter":"fs","emitKey":"47.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-48","fetcher":"fs","fetchKey":"48.xml","emitter":"fs","emitKey":"48.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-49","fetcher":"fs","fetchKey":"49.xml","emitter":"fs","emitKey":"49.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-50","fetcher":"fs","fetchKey":"50.xml","emitter":"fs","emitKey":"50.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-51","fetcher":"fs","fetchKey":"51.xml","emitter":"fs","emitKey":"51.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-52","fetcher":"fs","fetchKey":"52.xml","emitter":"fs","emitKey":"52.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-53","fetcher":"fs","fetchKey":"53.xml","emitter":"fs","emitKey":"53.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-54","fetcher":"fs","fetchKey":"54.xml","emitter":"fs","emitKey":"54.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-55","fetcher":"fs","fetchKey":"55.xml","emitter":"fs","emitKey":"55.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-56","fetcher":"fs","fetchKey":"56.xml","emitter":"fs","emitKey":"56.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-57","fetcher":"fs","fetchKey":"57.xml","emitter":"fs","emitKey":"57.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-58","fetcher":"fs","fetchKey":"58.xml","emitter":"fs","emitKey":"58.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-59","fetcher":"fs","fetchKey":"59.xml","emitter":"fs","emitKey":"59.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-60","fetcher":"fs","fetchKey":"60.xml","emitter":"fs","emitKey":"60.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-61","fetcher":"fs","fetchKey":"61.xml","emitter":"fs","emitKey":"61.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-62","fetcher":"fs","fetchKey":"62.xml","emitter":"fs","emitKey":"62.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-63","fetcher":"fs","fetchKey":"63.xml","emitter":"fs","emitKey":"63.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-64","fetcher":"fs","fetchKey":"64.xml","emitter":"fs","emitKey":"64.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-65","fetcher":"fs","fetchKey":"65.xml","emitter":"fs","emitKey":"65.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-66","fetcher":"fs","fetchKey":"66.xml","emitter":"fs","emitKey":"66.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-67","fetcher":"fs","fetchKey":"67.xml","emitter":"fs","emitKey":"67.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-68","fetcher":"fs","fetchKey":"68.xml","emitter":"fs","emitKey":"68.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-69","fetcher":"fs","fetchKey":"69.xml","emitter":"fs","emitKey":"69.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-70","fetcher":"fs","fetchKey":"70.xml","emitter":"fs","emitKey":"70.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-71","fetcher":"fs","fetchKey":"71.xml","emitter":"fs","emitKey":"71.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-72","fetcher":"fs","fetchKey":"72.xml","emitter":"fs","emitKey":"72.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-73","fetcher":"fs","fetchKey":"73.xml","emitter":"fs","emitKey":"73.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-74","fetcher":"fs","fetchKey":"74.xml","emitter":"fs","emitKey":"74.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-75","fetcher":"fs","fetchKey":"75.xml","emitter":"fs","emitKey":"75.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-76","fetcher":"fs","fetchKey":"76.xml","emitter":"fs","emitKey":"76.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-77","fetcher":"fs","fetchKey":"77.xml","emitter":"fs","emitKey":"77.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-78","fetcher":"fs","fetchKey":"78.xml","emitter":"fs","emitKey":"78.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-79","fetcher":"fs","fetchKey":"79.xml","emitter":"fs","emitKey":"79.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-80","fetcher":"fs","fetchKey":"80.xml","emitter":"fs","emitKey":"80.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-81","fetcher":"fs","fetchKey":"81.xml","emitter":"fs","emitKey":"81.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-82","fetcher":"fs","fetchKey":"82.xml","emitter":"fs","emitKey":"82.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-83","fetcher":"fs","fetchKey":"83.xml","emitter":"fs","emitKey":"83.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-84","fetcher":"fs","fetchKey":"84.xml","emitter":"fs","emitKey":"84.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-85","fetcher":"fs","fetchKey":"85.xml","emitter":"fs","emitKey":"85.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-86","fetcher":"fs","fetchKey":"86.xml","emitter":"fs","emitKey":"86.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-87","fetcher":"fs","fetchKey":"87.xml","emitter":"fs","emitKey":"87.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-88","fetcher":"fs","fetchKey":"88.xml","emitter":"fs","emitKey":"88.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-89","fetcher":"fs","fetchKey":"89.xml","emitter":"fs","emitKey":"89.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-90","fetcher":"fs","fetchKey":"90.xml","emitter":"fs","emitKey":"90.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-91","fetcher":"fs","fetchKey":"91.xml","emitter":"fs","emitKey":"91.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-92","fetcher":"fs","fetchKey":"92.xml","emitter":"fs","emitKey":"92.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-93","fetcher":"fs","fetchKey":"93.xml","emitter":"fs","emitKey":"93.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-94","fetcher":"fs","fetchKey":"94.xml","emitter":"fs","emitKey":"94.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-95","fetcher":"fs","fetchKey":"95.xml","emitter":"fs","emitKey":"95.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-96","fetcher":"fs","fetchKey":"96.xml","emitter":"fs","emitKey":"96.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-97","fetcher":"fs","fetchKey":"97.xml","emitter":"fs","emitKey":"97.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-98","fetcher":"fs","fetchKey":"98.xml","emitter":"fs","emitKey":"98.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
+{"id":"myid-99","fetcher":"fs","fetchKey":"99.xml","emitter":"fs","emitKey":"99.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit","embeddedDocumentBytesConfig":{"extractEmbeddedDocumentBytes":true,"zeroPadName":0,"suffixStrategy":"NONE","embeddedIdPrefix":"-","includeOriginal":false}}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/resources/test-documents/test.json b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/resources/test-documents/test.json
new file mode 100644
index 000000000..199772ecb
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-json/src/test/resources/test-documents/test.json
@@ -0,0 +1,100 @@
+{"id":"myid-0","fetcher":"fs","fetchKey":"0.xml","emitter":"fs","emitKey":"0.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-1","fetcher":"fs","fetchKey":"1.xml","emitter":"fs","emitKey":"1.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-2","fetcher":"fs","fetchKey":"2.xml","emitter":"fs","emitKey":"2.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-3","fetcher":"fs","fetchKey":"3.xml","emitter":"fs","emitKey":"3.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-4","fetcher":"fs","fetchKey":"4.xml","emitter":"fs","emitKey":"4.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-5","fetcher":"fs","fetchKey":"5.xml","emitter":"fs","emitKey":"5.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-6","fetcher":"fs","fetchKey":"6.xml","emitter":"fs","emitKey":"6.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-7","fetcher":"fs","fetchKey":"7.xml","emitter":"fs","emitKey":"7.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-8","fetcher":"fs","fetchKey":"8.xml","emitter":"fs","emitKey":"8.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-9","fetcher":"fs","fetchKey":"9.xml","emitter":"fs","emitKey":"9.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-10","fetcher":"fs","fetchKey":"10.xml","emitter":"fs","emitKey":"10.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-11","fetcher":"fs","fetchKey":"11.xml","emitter":"fs","emitKey":"11.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-12","fetcher":"fs","fetchKey":"12.xml","emitter":"fs","emitKey":"12.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-13","fetcher":"fs","fetchKey":"13.xml","emitter":"fs","emitKey":"13.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-14","fetcher":"fs","fetchKey":"14.xml","emitter":"fs","emitKey":"14.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-15","fetcher":"fs","fetchKey":"15.xml","emitter":"fs","emitKey":"15.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-16","fetcher":"fs","fetchKey":"16.xml","emitter":"fs","emitKey":"16.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-17","fetcher":"fs","fetchKey":"17.xml","emitter":"fs","emitKey":"17.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-18","fetcher":"fs","fetchKey":"18.xml","emitter":"fs","emitKey":"18.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-19","fetcher":"fs","fetchKey":"19.xml","emitter":"fs","emitKey":"19.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-20","fetcher":"fs","fetchKey":"20.xml","emitter":"fs","emitKey":"20.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-21","fetcher":"fs","fetchKey":"21.xml","emitter":"fs","emitKey":"21.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-22","fetcher":"fs","fetchKey":"22.xml","emitter":"fs","emitKey":"22.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-23","fetcher":"fs","fetchKey":"23.xml","emitter":"fs","emitKey":"23.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-24","fetcher":"fs","fetchKey":"24.xml","emitter":"fs","emitKey":"24.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-25","fetcher":"fs","fetchKey":"25.xml","emitter":"fs","emitKey":"25.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-26","fetcher":"fs","fetchKey":"26.xml","emitter":"fs","emitKey":"26.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-27","fetcher":"fs","fetchKey":"27.xml","emitter":"fs","emitKey":"27.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-28","fetcher":"fs","fetchKey":"28.xml","emitter":"fs","emitKey":"28.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-29","fetcher":"fs","fetchKey":"29.xml","emitter":"fs","emitKey":"29.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-30","fetcher":"fs","fetchKey":"30.xml","emitter":"fs","emitKey":"30.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-31","fetcher":"fs","fetchKey":"31.xml","emitter":"fs","emitKey":"31.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-32","fetcher":"fs","fetchKey":"32.xml","emitter":"fs","emitKey":"32.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-33","fetcher":"fs","fetchKey":"33.xml","emitter":"fs","emitKey":"33.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-34","fetcher":"fs","fetchKey":"34.xml","emitter":"fs","emitKey":"34.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-35","fetcher":"fs","fetchKey":"35.xml","emitter":"fs","emitKey":"35.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-36","fetcher":"fs","fetchKey":"36.xml","emitter":"fs","emitKey":"36.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-37","fetcher":"fs","fetchKey":"37.xml","emitter":"fs","emitKey":"37.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-38","fetcher":"fs","fetchKey":"38.xml","emitter":"fs","emitKey":"38.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-39","fetcher":"fs","fetchKey":"39.xml","emitter":"fs","emitKey":"39.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-40","fetcher":"fs","fetchKey":"40.xml","emitter":"fs","emitKey":"40.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-41","fetcher":"fs","fetchKey":"41.xml","emitter":"fs","emitKey":"41.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-42","fetcher":"fs","fetchKey":"42.xml","emitter":"fs","emitKey":"42.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-43","fetcher":"fs","fetchKey":"43.xml","emitter":"fs","emitKey":"43.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-44","fetcher":"fs","fetchKey":"44.xml","emitter":"fs","emitKey":"44.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-45","fetcher":"fs","fetchKey":"45.xml","emitter":"fs","emitKey":"45.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-46","fetcher":"fs","fetchKey":"46.xml","emitter":"fs","emitKey":"46.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-47","fetcher":"fs","fetchKey":"47.xml","emitter":"fs","emitKey":"47.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-48","fetcher":"fs","fetchKey":"48.xml","emitter":"fs","emitKey":"48.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-49","fetcher":"fs","fetchKey":"49.xml","emitter":"fs","emitKey":"49.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-50","fetcher":"fs","fetchKey":"50.xml","emitter":"fs","emitKey":"50.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-51","fetcher":"fs","fetchKey":"51.xml","emitter":"fs","emitKey":"51.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-52","fetcher":"fs","fetchKey":"52.xml","emitter":"fs","emitKey":"52.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-53","fetcher":"fs","fetchKey":"53.xml","emitter":"fs","emitKey":"53.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-54","fetcher":"fs","fetchKey":"54.xml","emitter":"fs","emitKey":"54.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-55","fetcher":"fs","fetchKey":"55.xml","emitter":"fs","emitKey":"55.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-56","fetcher":"fs","fetchKey":"56.xml","emitter":"fs","emitKey":"56.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-57","fetcher":"fs","fetchKey":"57.xml","emitter":"fs","emitKey":"57.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-58","fetcher":"fs","fetchKey":"58.xml","emitter":"fs","emitKey":"58.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-59","fetcher":"fs","fetchKey":"59.xml","emitter":"fs","emitKey":"59.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-60","fetcher":"fs","fetchKey":"60.xml","emitter":"fs","emitKey":"60.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-61","fetcher":"fs","fetchKey":"61.xml","emitter":"fs","emitKey":"61.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-62","fetcher":"fs","fetchKey":"62.xml","emitter":"fs","emitKey":"62.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-63","fetcher":"fs","fetchKey":"63.xml","emitter":"fs","emitKey":"63.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-64","fetcher":"fs","fetchKey":"64.xml","emitter":"fs","emitKey":"64.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-65","fetcher":"fs","fetchKey":"65.xml","emitter":"fs","emitKey":"65.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-66","fetcher":"fs","fetchKey":"66.xml","emitter":"fs","emitKey":"66.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-67","fetcher":"fs","fetchKey":"67.xml","emitter":"fs","emitKey":"67.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-68","fetcher":"fs","fetchKey":"68.xml","emitter":"fs","emitKey":"68.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-69","fetcher":"fs","fetchKey":"69.xml","emitter":"fs","emitKey":"69.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-70","fetcher":"fs","fetchKey":"70.xml","emitter":"fs","emitKey":"70.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-71","fetcher":"fs","fetchKey":"71.xml","emitter":"fs","emitKey":"71.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-72","fetcher":"fs","fetchKey":"72.xml","emitter":"fs","emitKey":"72.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-73","fetcher":"fs","fetchKey":"73.xml","emitter":"fs","emitKey":"73.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-74","fetcher":"fs","fetchKey":"74.xml","emitter":"fs","emitKey":"74.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-75","fetcher":"fs","fetchKey":"75.xml","emitter":"fs","emitKey":"75.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-76","fetcher":"fs","fetchKey":"76.xml","emitter":"fs","emitKey":"76.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-77","fetcher":"fs","fetchKey":"77.xml","emitter":"fs","emitKey":"77.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-78","fetcher":"fs","fetchKey":"78.xml","emitter":"fs","emitKey":"78.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-79","fetcher":"fs","fetchKey":"79.xml","emitter":"fs","emitKey":"79.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-80","fetcher":"fs","fetchKey":"80.xml","emitter":"fs","emitKey":"80.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-81","fetcher":"fs","fetchKey":"81.xml","emitter":"fs","emitKey":"81.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-82","fetcher":"fs","fetchKey":"82.xml","emitter":"fs","emitKey":"82.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-83","fetcher":"fs","fetchKey":"83.xml","emitter":"fs","emitKey":"83.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-84","fetcher":"fs","fetchKey":"84.xml","emitter":"fs","emitKey":"84.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-85","fetcher":"fs","fetchKey":"85.xml","emitter":"fs","emitKey":"85.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-86","fetcher":"fs","fetchKey":"86.xml","emitter":"fs","emitKey":"86.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-87","fetcher":"fs","fetchKey":"87.xml","emitter":"fs","emitKey":"87.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-88","fetcher":"fs","fetchKey":"88.xml","emitter":"fs","emitKey":"88.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-89","fetcher":"fs","fetchKey":"89.xml","emitter":"fs","emitKey":"89.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-90","fetcher":"fs","fetchKey":"90.xml","emitter":"fs","emitKey":"90.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-91","fetcher":"fs","fetchKey":"91.xml","emitter":"fs","emitKey":"91.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-92","fetcher":"fs","fetchKey":"92.xml","emitter":"fs","emitKey":"92.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-93","fetcher":"fs","fetchKey":"93.xml","emitter":"fs","emitKey":"93.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-94","fetcher":"fs","fetchKey":"94.xml","emitter":"fs","emitKey":"94.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-95","fetcher":"fs","fetchKey":"95.xml","emitter":"fs","emitKey":"95.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-96","fetcher":"fs","fetchKey":"96.xml","emitter":"fs","emitKey":"96.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-97","fetcher":"fs","fetchKey":"97.xml","emitter":"fs","emitKey":"97.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-98","fetcher":"fs","fetchKey":"98.xml","emitter":"fs","emitKey":"98.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
+{"id":"myid-99","fetcher":"fs","fetchKey":"99.xml","emitter":"fs","emitKey":"99.xml.json","handlerConfig":{"type":"text","parseMode":"rmeta","writeLimit":-1,"maxEmbeddedResources":-1},"onParseException":"emit"}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3fbd67c0c..ed5931932 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -33,6 +33,7 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.utils.StringUtils;
@@ -54,6 +55,13 @@ public class JsonFetchEmitTuple {
     private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
     private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode";
 
+    private static final String EMBEDDED_DOCUMENT_BYTES_CONFIG = "embeddedDocumentBytesConfig";
+    private static final String ZERO_PAD_NAME = "zeroPadName";
+    private static final String EXTRACT_EMBEDDED_DOCUMENT_BYTES = "extractEmbeddedDocumentBytes";
+    private static final String SUFFIX_STRATEGY = "suffixStrategy";
+    private static final String EMBEDDED_ID_PREFIX = "embeddedIdPrefix";
+    private static final String INCLUDE_ORIGINAL = "includeOriginal";
+
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
         try (JsonParser jParser = new JsonFactory().setStreamReadConstraints(StreamReadConstraints.builder()
@@ -84,6 +92,8 @@ public class JsonFetchEmitTuple {
                 FetchEmitTuple.DEFAULT_ON_PARSE_EXCEPTION;
         HandlerConfig handlerConfig = HandlerConfig.DEFAULT_HANDLER_CONFIG;
         Metadata metadata = new Metadata();
+        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = EmbeddedDocumentBytesConfig.SKIP;
+
         while (token != JsonToken.END_OBJECT) {
             if (token != JsonToken.FIELD_NAME) {
                 throw new IOException("required field name, but see: " + token.name());
@@ -120,6 +130,8 @@ public class JsonFetchEmitTuple {
                 fetchRangeStart = getLong(jParser);
             } else if (FETCH_RANGE_END.equals(name)) {
                 fetchRangeEnd = getLong(jParser);
+            } else if (EMBEDDED_DOCUMENT_BYTES_CONFIG.equals(name)) {
+                embeddedDocumentBytesConfig = getEmbeddedDocumentBytesConfig(jParser);
             }
             token = jParser.nextToken();
         }
@@ -127,7 +139,48 @@ public class JsonFetchEmitTuple {
             id = fetchKey;
         }
         return new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey, fetchRangeStart, fetchRangeEnd),
-                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException);
+                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException,
+                embeddedDocumentBytesConfig);
+    }
+
+    private static EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig(JsonParser jParser) throws IOException {
+        JsonToken token = jParser.nextToken();
+        if (token != JsonToken.START_OBJECT) {
+            throw new IOException("required start object, but see: " + token.name());
+        }
+        String fieldName = jParser.nextFieldName();
+        EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true);
+        while (fieldName != null) {
+            switch (fieldName) {
+                case EXTRACT_EMBEDDED_DOCUMENT_BYTES:
+                    boolean extract = jParser.nextBooleanValue();
+                    if (! extract) {
+                        return new EmbeddedDocumentBytesConfig(false);
+                    }
+                    break;
+                case INCLUDE_ORIGINAL:
+                    config.setIncludeOriginal(jParser.nextBooleanValue());
+                    break;
+                case EMITTER:
+                    config.setEmitter(jParser.nextTextValue());
+                    break;
+                case ZERO_PAD_NAME:
+                    config.setZeroPadNameLength(jParser.nextIntValue(0));
+                    break;
+                case SUFFIX_STRATEGY:
+                    config.setSuffixStrategy(EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.parse(
+                            jParser.nextTextValue()));
+                    break;
+                case EMBEDDED_ID_PREFIX:
+                    config.setEmbeddedIdPrefix(jParser.nextTextValue());
+                    break;
+                default:
+                    throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
+                            "' in the context of an embeddedDocumentBytesConfig");
+            }
+            fieldName = jParser.nextFieldName();
+        }
+        return config;
     }
 
     private static HandlerConfig getHandlerConfig(JsonParser jParser) throws IOException {
@@ -231,6 +284,22 @@ public class JsonFetchEmitTuple {
         }
         jsonGenerator.writeStringField(ON_PARSE_EXCEPTION,
                 t.getOnParseException().name().toLowerCase(Locale.US));
+        if (t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes()) {
+            EmbeddedDocumentBytesConfig edbc = t.getEmbeddedDocumentBytesConfig();
+            jsonGenerator.writeFieldName(EMBEDDED_DOCUMENT_BYTES_CONFIG);
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(EXTRACT_EMBEDDED_DOCUMENT_BYTES,
+                    edbc.isExtractEmbeddedDocumentBytes());
+            jsonGenerator.writeNumberField(ZERO_PAD_NAME, edbc.getZeroPadName());
+            jsonGenerator.writeStringField(SUFFIX_STRATEGY,
+                    edbc.getSuffixStrategy().toString());
+            jsonGenerator.writeStringField(EMBEDDED_ID_PREFIX, edbc.getEmbeddedIdPrefix());
+            if (! StringUtils.isBlank(edbc.getEmitter())) {
+                jsonGenerator.writeStringField(EMITTER, edbc.getEmitter());
+            }
+            jsonGenerator.writeBooleanField(INCLUDE_ORIGINAL, edbc.isIncludeOriginal());
+            jsonGenerator.writeEndObject();
+        }
         jsonGenerator.writeEndObject();
 
     }
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
index aeb4fefd4..4484478dc 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
@@ -28,6 +28,7 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 
@@ -77,4 +78,23 @@ public class JsonFetchEmitTupleTest {
         FetchEmitTuple deserialized = JsonFetchEmitTuple.fromJson(reader);
         assertEquals(t, deserialized);
     }
+
+    @Test
+    public void testBytes() throws Exception {
+        EmbeddedDocumentBytesConfig bytesConfig = new EmbeddedDocumentBytesConfig(true);
+        bytesConfig.setEmitter("emitter");
+        FetchEmitTuple t = new FetchEmitTuple("my_id",
+                new FetchKey("my_fetcher", "fetchKey1", 10, 1000),
+                new EmitKey("my_emitter", "emitKey1"), new Metadata(),
+                new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML,
+                        HandlerConfig.PARSE_MODE.CONCATENATE,
+                        10000,10, true),
+                FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP, bytesConfig);
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTuple.toJson(t, writer);
+        Reader reader = new StringReader(writer.toString());
+        FetchEmitTuple deserialized = JsonFetchEmitTuple.fromJson(reader);
+        assertEquals(t, deserialized);
+
+    }
 }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 2cc7b1294..a4d4ed489 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +34,7 @@ import jakarta.ws.rs.Produces;
 import jakarta.ws.rs.core.Context;
 import jakarta.ws.rs.core.HttpHeaders;
 import jakarta.ws.rs.core.UriInfo;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -45,8 +45,8 @@ import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.async.AsyncProcessor;
+import org.apache.tika.pipes.async.OfferLargerThanQueueSize;
 import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
@@ -107,14 +107,25 @@ public class AsyncResource {
                 return badFetcher(t.getFetchKey());
             }
             if (!emitterManager.getSupported().contains(t.getEmitKey().getEmitterName())) {
-                return badEmitter(t.getEmitKey());
+                return badEmitter(t.getEmitKey().getEmitterName());
+            }
+            if (t.getEmbeddedDocumentBytesConfig().isExtractEmbeddedDocumentBytes() &&
+                    !StringUtils.isAllBlank(t.getEmbeddedDocumentBytesConfig().getEmitter())) {
+                String bytesEmitter = t.getEmbeddedDocumentBytesConfig().getEmitter();
+                if (!emitterManager.getSupported().contains(bytesEmitter)) {
+                    return badEmitter(bytesEmitter);
+                }
             }
         }
-        Instant start = Instant.now();
-        boolean offered = asyncProcessor.offer(request.getTuples(), maxQueuePauseMs);
-        if (offered) {
-            return ok(request.getTuples().size());
-        } else {
+        //Instant start = Instant.now();
+        try {
+            boolean offered = asyncProcessor.offer(request.getTuples(), maxQueuePauseMs);
+            if (offered) {
+                return ok(request.getTuples().size());
+            } else {
+                return throttle(request.getTuples().size());
+            }
+        } catch (OfferLargerThanQueueSize e) {
             return throttle(request.getTuples().size());
         }
     }
@@ -130,11 +141,12 @@ public class AsyncResource {
         Map<String, Object> map = new HashMap<>();
         map.put("status", "throttled");
         map.put("msg", "not able to receive request of size " + requestSize + " at this time");
+        map.put("capacity", asyncProcessor.getCapacity());
         return map;
     }
 
-    private Map<String, Object> badEmitter(EmitKey emitKey) {
-        throw new BadRequestException("can't find emitter for " + emitKey.getEmitterName());
+    private Map<String, Object> badEmitter(String emitterName) {
+        throw new BadRequestException("can't find emitter for " + emitterName);
     }
 
     private Map<String, Object> badFetcher(FetchKey fetchKey) {
diff --git a/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java b/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
index 7f41e065c..391e67fee 100644
--- a/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
+++ b/tika-server/tika-server-standard/src/test/java/org/apache/tika/server/standard/TikaPipesTest.java
@@ -25,11 +25,16 @@ import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitResult;
+import java.nio.file.FileVisitor;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import jakarta.ws.rs.core.Response;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
@@ -49,6 +54,7 @@ import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -72,6 +78,7 @@ public class TikaPipesTest extends CXFTestBase {
     private static Path TMP_WORKING_DIR;
     private static Path TMP_OUTPUT_DIR;
     private static Path TMP_OUTPUT_FILE;
+    private static Path TMP_BYTES_DIR;
     private static Path TIKA_PIPES_LOG4j2_PATH;
     private static Path TIKA_CONFIG_PATH;
     private static String TIKA_CONFIG_XML;
@@ -81,6 +88,7 @@ public class TikaPipesTest extends CXFTestBase {
     public static void setUpBeforeClass() throws Exception {
         Path inputDir = TMP_WORKING_DIR.resolve("input");
         TMP_OUTPUT_DIR = TMP_WORKING_DIR.resolve("output");
+        TMP_BYTES_DIR = TMP_WORKING_DIR.resolve("bytes");
         TMP_OUTPUT_FILE = TMP_OUTPUT_DIR.resolve(TEST_RECURSIVE_DOC + ".json");
 
         Files.createDirectories(inputDir);
@@ -103,6 +111,9 @@ public class TikaPipesTest extends CXFTestBase {
                         "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
                         "<params>" + "<name>fse</name>" + "<basePath>" +
                         TMP_OUTPUT_DIR.toAbsolutePath() + "</basePath>" + "</params>" +
+                        "</emitter>" + "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+                        "<params>" + "<name>bytes</name>" + "<basePath>" +
+                        TMP_BYTES_DIR.toAbsolutePath() + "</basePath>" + "</params>" +
                         "</emitter>" + "</emitters>" + "<pipes><params><tikaConfig>" +
                         ProcessUtils.escapeCommandLine(
                                 TIKA_CONFIG_PATH.toAbsolutePath().toString()) +
@@ -203,4 +214,86 @@ public class TikaPipesTest extends CXFTestBase {
         assertContains("When in the Course",
                 metadataList.get(0).get(TikaCoreProperties.TIKA_CONTENT));
     }
+
+    @Test
+    public void testBytes() throws Exception {
+        EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true);
+        config.setEmitter("bytes");
+        config.setIncludeOriginal(true);
+        config.setEmbeddedIdPrefix("-");
+        config.setZeroPadNameLength(10);
+        config.setSuffixStrategy(EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.EXISTING);
+
+        FetchEmitTuple t =
+                new FetchEmitTuple("myId", new FetchKey("fsf", "test_recursive_embedded.docx"),
+                        new EmitKey("fse", "test_recursive_embedded.docx"), new Metadata(),
+                        new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT,
+                                HandlerConfig.PARSE_MODE.RMETA, -1, -1, false),
+                        FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT, config);
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTuple.toJson(t, writer);
+
+        String getUrl = endPoint + PIPES_PATH;
+        Response response =
+                WebClient.create(getUrl).accept("application/json").post(writer.toString());
+        assertEquals(200, response.getStatus());
+
+        List<Metadata> metadataList = null;
+        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
+            metadataList = JsonMetadataList.fromJson(reader);
+        }
+        assertEquals(12, metadataList.size());
+        assertContains("When in the Course",
+                metadataList.get(6).get(TikaCoreProperties.TIKA_CONTENT));
+        Map<String, Long> expected = loadExpected();
+        Map<String, Long> byteFileNames = getFileNames(TMP_BYTES_DIR);
+        assertEquals(expected, byteFileNames);
+    }
+
+    private Map<String, Long> loadExpected() {
+        Map<String, Long> m = new HashMap<>();
+        m.put("test_recursive_embedded.docx-0000000009.txt", 8151l);
+        m.put("test_recursive_embedded.docx-0000000007.txt", 8l);
+        m.put("test_recursive_embedded.docx-0000000006.txt", 8l);
+        m.put("test_recursive_embedded.docx-0000000002.zip", 4827l);
+        m.put("test_recursive_embedded.docx-0000000001.emf", 4992l);
+        m.put("test_recursive_embedded.docx-0000000008.zip", 4048l);
+        m.put("test_recursive_embedded.docx-0000000004.txt", 8l);
+        m.put("test_recursive_embedded.docx-0000000000.docx", 27082l);
+        m.put("test_recursive_embedded.docx-0000000003.txt", 8l);
+        m.put("test_recursive_embedded.docx-0000000011.txt", 7l);
+        m.put("test_recursive_embedded.docx-0000000005.zip", 4492l);
+        m.put("test_recursive_embedded.docx-0000000010.zip", 163l);
+        return m;
+    }
+
+    private Map<String, Long> getFileNames(Path p) throws Exception {
+        final Map<String, Long> ret = new HashMap<>();
+        Files.walkFileTree(TMP_BYTES_DIR, new FileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+                    throws IOException {
+                return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+                    throws IOException {
+                ret.put(file.getFileName().toString(), Files.size(file));
+                return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+                return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult postVisitDirectory(Path dir, IOException exc)
+                    throws IOException {
+                return FileVisitResult.CONTINUE;
+            }
+        });
+        return ret;
+    }
 }