You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2024/03/09 01:14:04 UTC

(tika) branch TIKA-4207 created (now 0674ea469)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-4207
in repository https://gitbox.apache.org/repos/asf/tika.git


      at 0674ea469 TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D

This branch includes the following new commits:

     new 0674ea469 TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(tika) 01/01: TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4207
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 0674ea4693d57cbc30f8a59417f469a48ce53c2f
Author: tallison <ta...@apache.org>
AuthorDate: Fri Mar 8 20:13:49 2024 -0500

    TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D
---
 .../java/org/apache/tika/cli/TikaCLIAsyncTest.java |  73 ++++++++
 .../test/java/org/apache/tika/cli/TikaCLITest.java |  57 +------
 .../AbstractEmbeddedDocumentByteStore.java         |  63 +++++++
 .../extractor/BasicEmbeddedDocumentByteStore.java  |  46 +++++
 .../tika/extractor/EmbeddedDocumentByteStore.java  |  32 ++++
 .../extractor/ParsingAndEmbeddedDocExtractor.java  | 162 ++++++++++++++++++
 .../ParsingAndEmbeddedDocExtractorFactory.java     |  40 +++++
 .../java/org/apache/tika/pipes/FetchEmitTuple.java |  52 ++++--
 .../java/org/apache/tika/pipes/PipesServer.java    | 188 +++++++++++++++++----
 .../extractor/EmbeddedDocumentBytesConfig.java     |  93 ++++++++++
 .../extractor/EmbeddedDocumentEmitterStore.java    |  63 +++++++
 .../org/apache/tika/pipes/PipesServerTest.java     |   2 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |  41 ++++-
 13 files changed, 811 insertions(+), 101 deletions(-)

diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
new file mode 100644
index 000000000..1f6c8fc2c
--- /dev/null
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
@@ -0,0 +1,73 @@
+package org.apache.tika.cli;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TikaCLIAsyncTest extends TikaCLITest {
+
+    private static Path ASYNC_CONFIG;
+    @TempDir
+    private static Path ASYNC_OUTPUT_DIR;
+
+    @BeforeAll
+    public static void setUpClass() throws Exception {
+        ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
+        String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
+                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
+                "</async>" + "<fetchers>" +
+                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
+                "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
+                "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
+                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+                "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
+                "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
+                "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
+                "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
+                "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
+                "</pipesIterator>" + "</properties>";
+        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
+    }
+
+    @Test
+    public void testAsync() throws Exception {
+        String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
+
+        int json = 0;
+        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
+            if (f.getName().endsWith(".json")) {
+                //check one file for pretty print
+                if (f.getName().equals("coffee.xls.json")) {
+                    checkForPrettyPrint(f);
+                }
+                json++;
+            }
+        }
+        assertEquals(17, json);
+    }
+
+    private void checkForPrettyPrint(File f) throws IOException {
+        String json = FileUtils.readFileToString(f, UTF_8);
+        int previous = json.indexOf("Content-Length");
+        assertTrue(previous > -1);
+        for (String k : new String[]{"Content-Type", "dc:creator",
+                "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
+            int i = json.indexOf(k);
+            assertTrue( i > -1, "should have found " + k);
+            assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
+            previous = i;
+        }
+    }
+
+
+}
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
index ebd1d90b9..c160db396 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
@@ -45,11 +45,8 @@ import org.apache.tika.utils.ProcessUtils;
  */
 public class TikaCLITest {
 
-    private static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
+    static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
 
-    private static Path ASYNC_CONFIG;
-    @TempDir
-    private static Path ASYNC_OUTPUT_DIR;
 
     @TempDir
     private Path extractDir;
@@ -61,24 +58,7 @@ public class TikaCLITest {
     private PrintStream stderr = null;
     private String resourcePrefix;
 
-    @BeforeAll
-    public static void setUpClass() throws Exception {
-        ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
-        String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
-                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
-                "</async>" + "<fetchers>" +
-                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
-                "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
-                "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
-                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
-                "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
-                "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
-                "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
-                "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
-                "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
-                "</pipesIterator>" + "</properties>";
-        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
-    }
+
 
     protected static void assertExtracted(Path p, String allFiles) throws IOException {
 
@@ -582,42 +562,11 @@ public class TikaCLITest {
         assertTrue(content.contains("application/vnd.oasis.opendocument.text-web"));
     }
 
-    @Test
-    public void testAsync() throws Exception {
-        String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
-
-        int json = 0;
-        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
-            if (f.getName().endsWith(".json")) {
-                //check one file for pretty print
-                if (f.getName().equals("coffee.xls.json")) {
-                    checkForPrettyPrint(f);
-                }
-                json++;
-            }
-        }
-        assertEquals(17, json);
-    }
-
-    private void checkForPrettyPrint(File f) throws IOException {
-        String json = FileUtils.readFileToString(f, UTF_8);
-        int previous = json.indexOf("Content-Length");
-        assertTrue(previous > -1);
-        for (String k : new String[]{"Content-Type", "dc:creator",
-                "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
-            int i = json.indexOf(k);
-            assertTrue( i > -1, "should have found " + k);
-            assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
-            previous = i;
-        }
-    }
-
-
     /**
      * reset outContent and errContent if they are not empty
      * run given params in TikaCLI and return outContent String with UTF-8
      */
-    private String getParamOutContent(String... params) throws Exception {
+    String getParamOutContent(String... params) throws Exception {
         resetContent();
         TikaCLI.main(params);
         return outContent.toString("UTF-8");
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..c435a3e6e
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tika.io.FilenameUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.utils.StringUtils;
+
+public abstract class AbstractEmbeddedDocumentByteStore implements EmbeddedDocumentByteStore {
+
+    List<Integer> ids = new ArrayList<>();
+
+    public String getFetchKey(String containerFetchKey, int embeddedId,
+                              EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+                              Metadata metadata) {
+        String embeddedIdString = embeddedDocumentBytesConfig.getZeroPadName() > 0 ?
+                StringUtils.leftPad(Integer.toString(embeddedId),
+                        embeddedDocumentBytesConfig.getZeroPadName(), "0") :
+                Integer.toString(embeddedId);
+
+        StringBuilder fetchKey = new StringBuilder(containerFetchKey)
+                .append(embeddedDocumentBytesConfig.getEmbeddedIdPrefix())
+                .append(embeddedIdString);
+
+        if (embeddedDocumentBytesConfig.getSuffixStrategy().equals(
+                EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.EXISTING)) {
+            String fName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            String suffix = FilenameUtils.getSuffixFromPath(fName);
+            fetchKey.append(suffix);
+        }
+        return fetchKey.toString();
+    }
+
+    @Override
+    public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+           ids.add(id);
+    }
+
+    @Override
+    public List<Integer> getIds() {
+        return ids;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..b41285eb0
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+
+public class BasicEmbeddedDocumentByteStore extends AbstractEmbeddedDocumentByteStore {
+    private final EmbeddedDocumentBytesConfig config;
+    public BasicEmbeddedDocumentByteStore(EmbeddedDocumentBytesConfig config) {
+        this.config = config;
+    }
+    //this won't scale, but let's start fully in memory for now;
+    Map<Integer, byte[]> docBytes = new HashMap<>();
+    public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+        super.add(id, metadata, bytes);
+        docBytes.put(id, bytes);
+    }
+
+    public byte[] getDocument(int id) {
+        return docBytes.get(id);
+    }
+
+    @Override
+    public void close() throws IOException {
+        //delete tmp dir or whatever here
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..ad1bb81f3
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tika.metadata.Metadata;
+
+public interface EmbeddedDocumentByteStore extends Closeable {
+    //we need metadata for the emitter store...can we get away without it?
+    void add(int id, Metadata metadata, byte[] bytes) throws IOException;
+
+    byte[] getDocument(int id);
+
+    List<Integer> getIds();
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java
new file mode 100644
index 000000000..d88ec94c4
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import static org.apache.tika.sax.XHTMLContentHandler.XHTML;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.input.CloseShieldInputStream;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.AttributesImpl;
+
+import org.apache.tika.exception.CorruptedFileException;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.DelegatingParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.ParseRecord;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.sax.BodyContentHandler;
+import org.apache.tika.sax.EmbeddedContentHandler;
+
+/**
+ * Helper class for parsers of package archives or other compound document
+ * formats that support embedded or attached component documents.
+ *
+ * This is intended to both parse the embedded documents and extract
+ * the raw bytes from the embedded attachments when possible.
+ *
+ * See also {@link ParsingEmbeddedDocumentExtractor} and {@link ParserContainerExtractor}.
+ *
+ * @since 3.0.0
+ */
+public class ParsingAndEmbeddedDocExtractor implements EmbeddedDocumentExtractor {
+
+    private static final File ABSTRACT_PATH = new File("");
+
+    private static final Parser DELEGATING_PARSER = new DelegatingParser();
+
+    private boolean writeFileNameToContent = true;
+
+    private final ParseContext context;
+
+    public ParsingAndEmbeddedDocExtractor(ParseContext context) {
+        this.context = context;
+    }
+
+    public boolean shouldParseEmbedded(Metadata metadata) {
+        DocumentSelector selector = context.get(DocumentSelector.class);
+        if (selector != null) {
+            return selector.select(metadata);
+        }
+
+        FilenameFilter filter = context.get(FilenameFilter.class);
+        if (filter != null) {
+            String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            if (name != null) {
+                return filter.accept(ABSTRACT_PATH, name);
+            }
+        }
+
+        return true;
+    }
+
+    public void parseEmbedded(
+            InputStream stream, ContentHandler handler, Metadata metadata, boolean outputHtml)
+            throws SAXException, IOException {
+        if (outputHtml) {
+            AttributesImpl attributes = new AttributesImpl();
+            attributes.addAttribute("", "class", "class", "CDATA", "package-entry");
+            handler.startElement(XHTML, "div", "div", attributes);
+        }
+
+        String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+        if (writeFileNameToContent && name != null && name.length() > 0 && outputHtml) {
+            handler.startElement(XHTML, "h1", "h1", new AttributesImpl());
+            char[] chars = name.toCharArray();
+            handler.characters(chars, 0, chars.length);
+            handler.endElement(XHTML, "h1", "h1");
+        }
+
+        // Use the delegate parser to parse this entry
+        try (TemporaryResources tmp = new TemporaryResources()) {
+            final TikaInputStream newStream =
+                    TikaInputStream.get(CloseShieldInputStream.wrap(stream), tmp, metadata);
+            if (stream instanceof TikaInputStream) {
+                final Object container = ((TikaInputStream) stream).getOpenContainer();
+                if (container != null) {
+                    newStream.setOpenContainer(container);
+                }
+            }
+            Path p = newStream.getPath();
+            storeEmbeddedBytes(p, metadata);
+
+            DELEGATING_PARSER.parse(newStream, new EmbeddedContentHandler(new BodyContentHandler(handler)),
+                    metadata, context);
+        } catch (EncryptedDocumentException ede) {
+            recordException(ede, context);
+        } catch (CorruptedFileException e) {
+            //necessary to stop the parse to avoid infinite loops
+            //on corrupt sqlite3 files
+            throw new IOException(e);
+        } catch (TikaException e) {
+            recordException(e, context);
+        }
+
+        if (outputHtml) {
+            handler.endElement(XHTML, "div", "div");
+        }
+    }
+
+    private void storeEmbeddedBytes(Path p, Metadata metadata) {
+        EmbeddedDocumentByteStore embeddedDocumentByteStore =
+                context.get(EmbeddedDocumentByteStore.class);
+        int id = metadata.getInt(TikaCoreProperties.EMBEDDED_ID);
+        try {
+            embeddedDocumentByteStore.add(id, metadata, Files.readAllBytes(p));
+        } catch (IOException e) {
+            //log, or better, store embdocstore exception
+        }
+    }
+
+    private void recordException(Exception e, ParseContext context) {
+        ParseRecord record = context.get(ParseRecord.class);
+        if (record == null) {
+            return;
+        }
+        record.addException(e);
+    }
+
+    public Parser getDelegatingParser() {
+        return DELEGATING_PARSER;
+    }
+
+    public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+        this.writeFileNameToContent = writeFileNameToContent;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java
new file mode 100644
index 000000000..ca4c6633c
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+public class ParsingAndEmbeddedDocExtractorFactory
+        implements EmbeddedDocumentExtractorFactory {
+
+    private boolean writeFileNameToContent = true;
+
+    @Field
+    public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+        this.writeFileNameToContent = writeFileNameToContent;
+    }
+
+    @Override
+    public EmbeddedDocumentExtractor newInstance(Metadata metadata, ParseContext parseContext) {
+        ParsingEmbeddedDocumentExtractor ex =
+                new ParsingEmbeddedDocumentExtractor(parseContext);
+        ex.setWriteFileNameToContent(writeFileNameToContent);
+        return ex;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 3a8ec2bdd..c49f3743f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
@@ -38,6 +39,7 @@ public class FetchEmitTuple implements Serializable {
     private final ON_PARSE_EXCEPTION onParseException;
     private HandlerConfig handlerConfig;
 
+    private EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) {
         this(id, fetchKey, emitKey, new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
@@ -55,12 +57,20 @@ public class FetchEmitTuple implements Serializable {
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
                           HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException) {
+        this(id, fetchKey, emitKey, metadata, handlerConfig, onParseException,
+                EmbeddedDocumentBytesConfig.SKIP);
+    }
+
+    public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
+                          HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException,
+                          EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig) {
         this.id = id;
         this.fetchKey = fetchKey;
         this.emitKey = emitKey;
         this.metadata = metadata;
         this.handlerConfig = handlerConfig;
         this.onParseException = onParseException;
+        this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
     }
 
     public String getId() {
@@ -94,21 +104,40 @@ public class FetchEmitTuple implements Serializable {
         return handlerConfig == null ? HandlerConfig.DEFAULT_HANDLER_CONFIG : handlerConfig;
     }
 
+    public EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig() {
+        return embeddedDocumentBytesConfig;
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         FetchEmitTuple that = (FetchEmitTuple) o;
 
-        if (!Objects.equals(id, that.id)) return false;
-        if (!Objects.equals(fetchKey, that.fetchKey))
+        if (!Objects.equals(id, that.id)) {
+            return false;
+        }
+        if (!Objects.equals(fetchKey, that.fetchKey)) {
+            return false;
+        }
+        if (!Objects.equals(emitKey, that.emitKey)) {
+            return false;
+        }
+        if (!Objects.equals(metadata, that.metadata)) {
+            return false;
+        }
+        if (onParseException != that.onParseException) {
             return false;
-        if (!Objects.equals(emitKey, that.emitKey)) return false;
-        if (!Objects.equals(metadata, that.metadata))
+        }
+        if (!Objects.equals(handlerConfig, that.handlerConfig)) {
             return false;
-        if (onParseException != that.onParseException) return false;
-        return Objects.equals(handlerConfig, that.handlerConfig);
+        }
+        return Objects.equals(embeddedDocumentBytesConfig, that.embeddedDocumentBytesConfig);
     }
 
     @Override
@@ -119,13 +148,16 @@ public class FetchEmitTuple implements Serializable {
         result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
         result = 31 * result + (onParseException != null ? onParseException.hashCode() : 0);
         result = 31 * result + (handlerConfig != null ? handlerConfig.hashCode() : 0);
+        result = 31 * result +
+                (embeddedDocumentBytesConfig != null ? embeddedDocumentBytesConfig.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
         return "FetchEmitTuple{" + "id='" + id + '\'' + ", fetchKey=" + fetchKey + ", emitKey=" +
-            emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
-            ", handlerConfig=" + handlerConfig + '}';
+                emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
+                ", handlerConfig=" + handlerConfig + ", embeddedDocumentBytesConfig=" +
+                embeddedDocumentBytesConfig + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index ed1e5bb5e..94ef58502 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes;
 
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -24,10 +25,12 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
@@ -40,8 +43,11 @@ import org.xml.sax.SAXException;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.BasicEmbeddedDocumentByteStore;
 import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.extractor.EmbeddedDocumentByteStore;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
@@ -52,11 +58,14 @@ import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentEmitterStore;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
@@ -269,7 +278,7 @@ public class PipesServer implements Runnable {
      * @return
      */
     private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataList) {
-        if (metadataList == null || metadataList.size() < 1) {
+        if (metadataIsEmpty(metadataList)) {
             return StringUtils.EMPTY;
         }
         String stack = metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
@@ -277,11 +286,12 @@ public class PipesServer implements Runnable {
     }
 
 
-    private void emit(String taskId, EmitData emitData, String parseExceptionStack) {
+    private void emit(String taskId, EmitKey emitKey, MetadataListAndEmbeddedBytes parseData,
+                      String parseExceptionStack) {
         Emitter emitter = null;
 
         try {
-            emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+            emitter = emitterManager.getEmitter(emitKey.getEmitterName());
         } catch (IllegalArgumentException e) {
             String noEmitterMsg = getNoEmitterMsg(taskId);
             LOG.warn(noEmitterMsg);
@@ -289,7 +299,11 @@ public class PipesServer implements Runnable {
             return;
         }
         try {
-            emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList());
+            if (parseData.toBePackagedForStreamEmitter()) {
+                emitContentsAndBytes(emitter, emitKey, parseData);
+            } else {
+                emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList());
+            }
         } catch (IOException | TikaEmitterException e) {
             LOG.warn("emit exception", e);
             String msg = ExceptionUtils.getStackTrace(e);
@@ -306,6 +320,16 @@ public class PipesServer implements Runnable {
         }
     }
 
+    private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
+                                      MetadataListAndEmbeddedBytes parseData) {
+        if (! (emitter instanceof StreamEmitter)) {
+            throw new IllegalArgumentException("The emitter for embedded document byte store must" +
+                    " be a StreamEmitter. I see: " + emitter.getClass());
+        }
+        //TODO: implement this
+        throw new UnsupportedOperationException("this is not yet implemented");
+    }
+
     private void parseOne() {
         synchronized (lock) {
             parsing = true;
@@ -348,35 +372,53 @@ public class PipesServer implements Runnable {
         }
 
         start = System.currentTimeMillis();
-        List<Metadata> metadataList = parseIt(t, fetcher);
+        MetadataListAndEmbeddedBytes parseData = null;
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
-        }
+        try {
+            parseData = parseFromTuple(t, fetcher);
 
-        if (metadataIsEmpty(metadataList)) {
-            write(STATUS.EMPTY_OUTPUT);
-            return;
-        }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
+            }
+
+            if (metadataIsEmpty(parseData.getMetadataList())) {
+                write(STATUS.EMPTY_OUTPUT);
+                return;
+            }
 
-        emitIt(t, metadataList);
+            emitParseData(t, parseData);
+        } finally {
+            if (parseData.hasEmbeddedDocumentByteStore() &&
+                    parseData.getEmbeddedDocumentByteStore() instanceof Closeable) {
+                try {
+                    ((Closeable)parseData.getEmbeddedDocumentByteStore()).close();
+                } catch (IOException e) {
+                    LOG.warn("problem closing embedded document byte store", e);
+                }
+            }
+        }
     }
 
-    private void emitIt(FetchEmitTuple t, List<Metadata> metadataList) {
+    private void emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseData) {
         long start = System.currentTimeMillis();
-        String stack = getContainerStacktrace(t, metadataList);
+        String stack = getContainerStacktrace(t, parseData.getMetadataList());
         //we need to apply this after we pull out the stacktrace
-        filterMetadata(metadataList);
+        filterMetadata(parseData.getMetadataList());
         if (StringUtils.isBlank(stack) || t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
-            injectUserMetadata(t.getMetadata(), metadataList);
+            injectUserMetadata(t.getMetadata(), parseData.getMetadataList());
             EmitKey emitKey = t.getEmitKey();
             if (StringUtils.isBlank(emitKey.getEmitKey())) {
                 emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
                 t.setEmitKey(emitKey);
             }
-            EmitData emitData = new EmitData(t.getEmitKey(), metadataList, stack);
-            if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
-                emit(t.getId(), emitData, stack);
+            EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack);
+            if (parseData.toBePackagedForStreamEmitter()) {
+                emit(t.getId(), emitKey, parseData, stack);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
+                }
+            } else if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
+                emit(t.getId(), emitKey, parseData, stack);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
                 }
@@ -418,7 +460,7 @@ public class PipesServer implements Runnable {
         }
     }
 
-    protected List<Metadata> parseIt(FetchEmitTuple t, Fetcher fetcher) {
+    protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
         FetchKey fetchKey = t.getFetchKey();
         if (fetchKey.hasRange()) {
             if (! (fetcher instanceof RangeFetcher)) {
@@ -428,7 +470,7 @@ public class PipesServer implements Runnable {
             Metadata metadata = new Metadata();
             try (InputStream stream = ((RangeFetcher)fetcher).fetch(fetchKey.getFetchKey(),
                     fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
-                return parse(t, stream, metadata);
+                return parseWithStream(t, stream, metadata);
             } catch (SecurityException e) {
                 LOG.error("security exception " + t.getId(), e);
                 throw e;
@@ -439,7 +481,7 @@ public class PipesServer implements Runnable {
         } else {
             Metadata metadata = new Metadata();
             try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
-                return parse(t, stream, metadata);
+                return parseWithStream(t, stream, metadata);
             } catch (SecurityException e) {
                 LOG.error("security exception " + t.getId(), e);
                 throw e;
@@ -488,20 +530,46 @@ public class PipesServer implements Runnable {
         exit(1);
     }
 
-    private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream,
-                                 Metadata metadata) {
+    private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTuple, InputStream stream,
+                                           Metadata metadata) throws TikaConfigException {
         HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
+        List<Metadata> metadataList;
+        //this adds the EmbeddedDocumentByteStore to the parsecontext
+        ParseContext parseContext = createParseContext(fetchEmitTuple);
+        
         if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
-            return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata);
+            metadataList = parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
         } else {
-            return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata);
+            metadataList = parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
         }
+
+        return new MetadataListAndEmbeddedBytes(metadataList,
+                    parseContext.get(EmbeddedDocumentByteStore.class));
+    }
+
+    private ParseContext createParseContext(FetchEmitTuple fetchEmitTuple)
+            throws TikaConfigException {
+        ParseContext parseContext = new ParseContext();
+        if (fetchEmitTuple.getEmbeddedDocumentBytesConfig() == EmbeddedDocumentBytesConfig.SKIP) {
+            return parseContext;
+        }
+
+        if (! StringUtils.isBlank(fetchEmitTuple.getEmbeddedDocumentBytesConfig().getEmitter())) {
+            parseContext.set(EmbeddedDocumentByteStore.class,
+                    new EmbeddedDocumentEmitterStore(fetchEmitTuple.getEmitKey(),
+                            fetchEmitTuple.getEmbeddedDocumentBytesConfig(),
+                    emitterManager));
+        } else {
+            parseContext.set(EmbeddedDocumentByteStore.class,
+                    new BasicEmbeddedDocumentByteStore(fetchEmitTuple.getEmbeddedDocumentBytesConfig()));
+
+        }
+        return parseContext;
     }
 
     private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
                                              HandlerConfig handlerConfig, InputStream stream,
-                                             Metadata metadata) {
-        ParseContext parseContext = new ParseContext();
+                                             Metadata metadata, ParseContext parseContext) {
 
         ContentHandlerFactory contentHandlerFactory =
                 new BasicContentHandlerFactory(handlerConfig.getType(),
@@ -552,8 +620,7 @@ public class PipesServer implements Runnable {
 
     private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
                                           HandlerConfig handlerConfig, InputStream stream,
-                                          Metadata metadata) {
-        ParseContext parseContext = new ParseContext();
+                                          Metadata metadata, ParseContext parseContext) {
         //Intentionally do not add the metadata filter here!
         //We need to let stacktraces percolate
         RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
@@ -590,7 +657,7 @@ public class PipesServer implements Runnable {
             if (tis == null) {
                 tis = TikaInputStream.get(stream, tmp, metadata);
             }
-            _preParse(t.getId(), tis, metadata, parseContext);
+            _preParse(t, tis, metadata, parseContext);
         } finally {
             IOUtils.closeQuietly(tmp);
         }
@@ -598,13 +665,13 @@ public class PipesServer implements Runnable {
         writeIntermediate(t.getEmitKey(), metadata);
     }
 
-    private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+    private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
                            ParseContext parseContext) {
         if (digester != null) {
             try {
                 digester.digest(tis, metadata, parseContext);
             } catch (IOException e) {
-                LOG.warn("problem digesting: " + id, e);
+                LOG.warn("problem digesting: " + t.getId(), e);
             }
         }
         try {
@@ -612,7 +679,18 @@ public class PipesServer implements Runnable {
             metadata.set(Metadata.CONTENT_TYPE, mt.toString());
             metadata.set(TikaCoreProperties.CONTENT_TYPE_PARSER_OVERRIDE, mt.toString());
         } catch (IOException e) {
-            LOG.warn("problem detecting: " + id, e);
+            LOG.warn("problem detecting: " + t.getId(), e);
+        }
+
+        if (t.getEmbeddedDocumentBytesConfig() != null &&
+            t.getEmbeddedDocumentBytesConfig().isIncludeOriginal()) {
+            EmbeddedDocumentByteStore embeddedDocumentByteStore =
+                    parseContext.get(EmbeddedDocumentEmitterStore.class);
+            try {
+                embeddedDocumentByteStore.add(0, metadata, Files.readAllBytes(tis.getPath()));
+            } catch (IOException e) {
+                LOG.warn("problem reading source file into embedded document byte store", e);
+            }
         }
     }
 
@@ -734,4 +812,44 @@ public class PipesServer implements Runnable {
             exit(1);
         }
     }
+
+    private class MetadataListAndEmbeddedBytes {
+        final List<Metadata> metadataList;
+        final Optional<EmbeddedDocumentByteStore> embeddedDocumentByteStore;
+
+        public MetadataListAndEmbeddedBytes(List<Metadata> metadataList,
+                                            EmbeddedDocumentByteStore embeddedDocumentByteStore) {
+            this.metadataList = metadataList;
+            this.embeddedDocumentByteStore = Optional.ofNullable(embeddedDocumentByteStore);
+        }
+
+        public List<Metadata> getMetadataList() {
+            return metadataList;
+        }
+
+        public EmbeddedDocumentByteStore getEmbeddedDocumentByteStore() {
+            return embeddedDocumentByteStore.get();
+        }
+
+        /**
+         * This tests whether there's any type of embedded document store
+         * ...that, for example, may require closing at the end of the parse.
+         * @return
+         */
+        public boolean hasEmbeddedDocumentByteStore() {
+            return embeddedDocumentByteStore.isPresent();
+        }
+
+        /**
+         * If the intent is that the metadata and byte store be packaged in a zip
+         * or similar and emitted via a single stream emitter.
+         *
+         * This is basically a test that this is not an EmbeddedDocumentEmitterStore.
+         *
+         * @return
+         */
+        public boolean toBePackagedForStreamEmitter() {
+            return ! (embeddedDocumentByteStore.get() instanceof EmbeddedDocumentEmitterStore);
+        }
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
new file mode 100644
index 000000000..cdf3c77fe
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.extractor;
+
+public class EmbeddedDocumentBytesConfig {
+
+    public static EmbeddedDocumentBytesConfig SKIP = new EmbeddedDocumentBytesConfig(false);
+
+    public enum SUFFIX_STRATEGY {
+            NONE, EXISTING, DETECTED
+    }
+    private final boolean extractEmbeddedDocumentBytes;
+    //TODO -- add these at some point
+    /*
+        private Set<String> includeMimeTypes = new HashSet<>();
+        private Set<String> excludeMimeTypes = new HashSet<>();
+    */
+    private int zeroPadName = 0;
+
+    private SUFFIX_STRATEGY suffixStrategy = SUFFIX_STRATEGY.NONE;
+
+    private String embeddedIdPrefix = "-";
+
+    private String emitter;
+
+    private boolean includeOriginal = false;
+
+    public EmbeddedDocumentBytesConfig(boolean extractEmbeddedDocumentBytes) {
+        this.extractEmbeddedDocumentBytes = extractEmbeddedDocumentBytes;
+    }
+
+    public static EmbeddedDocumentBytesConfig getSKIP() {
+        return SKIP;
+    }
+
+    public boolean isExtractEmbeddedDocumentBytes() {
+        return extractEmbeddedDocumentBytes;
+    }
+
+    public int getZeroPadName() {
+        return zeroPadName;
+    }
+
+    public SUFFIX_STRATEGY getSuffixStrategy() {
+        return suffixStrategy;
+    }
+
+    public String getEmbeddedIdPrefix() {
+        return embeddedIdPrefix;
+    }
+
+    public String getEmitter() {
+        return emitter;
+    }
+
+    public boolean isIncludeOriginal() {
+        return includeOriginal;
+    }
+
+    public void setZeroPadNameLength(int zeroPadName) {
+        this.zeroPadName = zeroPadName;
+    }
+
+    public void setSuffixStrategy(SUFFIX_STRATEGY suffixStrategy) {
+        this.suffixStrategy = suffixStrategy;
+    }
+
+    public void setEmbeddedIdPrefix(String embeddedIdPrefix) {
+        this.embeddedIdPrefix = embeddedIdPrefix;
+    }
+
+    public void setEmitter(String emitter) {
+        this.emitter = emitter;
+    }
+
+    public void setIncludeOriginal(boolean includeOriginal) {
+        this.includeOriginal = includeOriginal;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java
new file mode 100644
index 000000000..ddcca6edf
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java
@@ -0,0 +1,63 @@
+package org.apache.tika.pipes.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.io.IOExceptionWithCause;
+import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.extractor.AbstractEmbeddedDocumentByteStore;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+public class EmbeddedDocumentEmitterStore extends AbstractEmbeddedDocumentByteStore {
+    private final EmitKey containerEmitKey;
+    private final EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
+    private final StreamEmitter emitter;
+
+    private static final Metadata METADATA = new Metadata();
+    public EmbeddedDocumentEmitterStore(EmitKey containerEmitKey,
+                                        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+                                        EmitterManager emitterManager) throws TikaConfigException {
+        this.containerEmitKey = containerEmitKey;
+        this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
+        Emitter tmpEmitter =
+                emitterManager.getEmitter(embeddedDocumentBytesConfig.getEmitter());
+        if (! (tmpEmitter instanceof StreamEmitter)) {
+            throw new TikaConfigException("Emitter " +
+                    embeddedDocumentBytesConfig.getEmitter()
+                    + " must implement a StreamEmitter");
+        }
+        this.emitter = (StreamEmitter) tmpEmitter;
+    }
+
+    @Override
+    public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+        //intentionally do not call super.add, because we want the ids list to be empty
+        String emitKey = getFetchKey(containerEmitKey.getEmitKey(),
+                id, embeddedDocumentBytesConfig, metadata);
+
+        try {
+            emitter.emit(emitKey, new UnsynchronizedByteArrayInputStream(bytes), METADATA);
+        } catch (TikaEmitterException e) {
+            throw new IOExceptionWithCause(e);
+        }
+    }
+
+    @Override
+    public byte[] getDocument(int id) {
+        throw new UnsupportedOperationException("this is emit only.");
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (emitter instanceof Closeable) {
+            ((Closeable) emitter).close();
+        }
+    }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
index 53c784796..92d8c5c11 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
@@ -69,7 +69,7 @@ public class PipesServerTest extends TikaTest {
                 new FetchKey("fs", "mock.xml"),
                 new EmitKey("", ""));
         Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
-        List<Metadata> metadataList = pipesServer.parseIt(fetchEmitTuple, fetcher);
+        List<Metadata> metadataList = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
         assertEquals("5f3b924303e960ce35d7f705e91d3018dd110a9c3cef0546a91fe013d6dad6fd",
                 metadataList.get(0).get("X-TIKA:digest:SHA-256"));
     }
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3fbd67c0c..e1bec421a 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -32,6 +32,7 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -54,6 +55,8 @@ public class JsonFetchEmitTuple {
     private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
     private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode";
 
+    private static final String EMBEDDED_DOCUMENT_BYTES_CONFIG = "embeddedDocumentBytesConfig";
+
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
         try (JsonParser jParser = new JsonFactory().setStreamReadConstraints(StreamReadConstraints.builder()
@@ -84,6 +87,8 @@ public class JsonFetchEmitTuple {
                 FetchEmitTuple.DEFAULT_ON_PARSE_EXCEPTION;
         HandlerConfig handlerConfig = HandlerConfig.DEFAULT_HANDLER_CONFIG;
         Metadata metadata = new Metadata();
+        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = EmbeddedDocumentBytesConfig.SKIP;
+
         while (token != JsonToken.END_OBJECT) {
             if (token != JsonToken.FIELD_NAME) {
                 throw new IOException("required field name, but see: " + token.name());
@@ -120,6 +125,8 @@ public class JsonFetchEmitTuple {
                 fetchRangeStart = getLong(jParser);
             } else if (FETCH_RANGE_END.equals(name)) {
                 fetchRangeEnd = getLong(jParser);
+            } else if (EMBEDDED_DOCUMENT_BYTES_CONFIG.equals(name)) {
+                embeddedDocumentBytesConfig = getEmbeddedDocumentBytesConfig(jParser);
             }
             token = jParser.nextToken();
         }
@@ -127,7 +134,39 @@ public class JsonFetchEmitTuple {
             id = fetchKey;
         }
         return new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey, fetchRangeStart, fetchRangeEnd),
-                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException);
+                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException,
+                embeddedDocumentBytesConfig);
+    }
+
+    private static EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig(JsonParser jParser) throws IOException {
+        JsonToken token = jParser.nextToken();
+        if (token != JsonToken.START_OBJECT) {
+            throw new IOException("required start object, but see: " + token.name());
+        }
+        String fieldName = jParser.nextFieldName();
+        EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true);
+        while (fieldName != null) {
+            switch (fieldName) {
+                //TODO: fill in more here!
+                case "extractEmbeddedDocumentBytes":
+                    boolean extract = jParser.nextBooleanValue();
+                    if (! extract) {
+                        return new EmbeddedDocumentBytesConfig(false);
+                    }
+                    break;
+                case "includeOriginal":
+                    config.setIncludeOriginal(jParser.nextBooleanValue());
+                    break;
+                case "emitter":
+                    config.setEmitter(jParser.nextTextValue());
+                    break;
+                default:
+                    throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
+                            "' in the context of an embeddedDocumentBytesConfig");
+            }
+            fieldName = jParser.nextFieldName();
+        }
+        return EmbeddedDocumentBytesConfig.SKIP;
     }
 
     private static HandlerConfig getHandlerConfig(JsonParser jParser) throws IOException {