You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2024/03/09 01:14:05 UTC

(tika) 01/01: TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4207
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 0674ea4693d57cbc30f8a59417f469a48ce53c2f
Author: tallison <ta...@apache.org>
AuthorDate: Fri Mar 8 20:13:49 2024 -0500

    TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D
---
 .../java/org/apache/tika/cli/TikaCLIAsyncTest.java |  73 ++++++++
 .../test/java/org/apache/tika/cli/TikaCLITest.java |  57 +------
 .../AbstractEmbeddedDocumentByteStore.java         |  63 +++++++
 .../extractor/BasicEmbeddedDocumentByteStore.java  |  46 +++++
 .../tika/extractor/EmbeddedDocumentByteStore.java  |  32 ++++
 .../extractor/ParsingAndEmbeddedDocExtractor.java  | 162 ++++++++++++++++++
 .../ParsingAndEmbeddedDocExtractorFactory.java     |  40 +++++
 .../java/org/apache/tika/pipes/FetchEmitTuple.java |  52 ++++--
 .../java/org/apache/tika/pipes/PipesServer.java    | 188 +++++++++++++++++----
 .../extractor/EmbeddedDocumentBytesConfig.java     |  93 ++++++++++
 .../extractor/EmbeddedDocumentEmitterStore.java    |  63 +++++++
 .../org/apache/tika/pipes/PipesServerTest.java     |   2 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |  41 ++++-
 13 files changed, 811 insertions(+), 101 deletions(-)

diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
new file mode 100644
index 000000000..1f6c8fc2c
--- /dev/null
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
@@ -0,0 +1,73 @@
+package org.apache.tika.cli;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TikaCLIAsyncTest extends TikaCLITest {
+
+    private static Path ASYNC_CONFIG;
+    @TempDir
+    private static Path ASYNC_OUTPUT_DIR;
+
+    @BeforeAll
+    public static void setUpClass() throws Exception {
+        ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
+        String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
+                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
+                "</async>" + "<fetchers>" +
+                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
+                "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
+                "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
+                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+                "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
+                "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
+                "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
+                "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
+                "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
+                "</pipesIterator>" + "</properties>";
+        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
+    }
+
+    @Test
+    public void testAsync() throws Exception {
+        String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
+
+        int json = 0;
+        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
+            if (f.getName().endsWith(".json")) {
+                //check one file for pretty print
+                if (f.getName().equals("coffee.xls.json")) {
+                    checkForPrettyPrint(f);
+                }
+                json++;
+            }
+        }
+        assertEquals(17, json);
+    }
+
+    private void checkForPrettyPrint(File f) throws IOException {
+        String json = FileUtils.readFileToString(f, UTF_8);
+        int previous = json.indexOf("Content-Length");
+        assertTrue(previous > -1);
+        for (String k : new String[]{"Content-Type", "dc:creator",
+                "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
+            int i = json.indexOf(k);
+            assertTrue( i > -1, "should have found " + k);
+            assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
+            previous = i;
+        }
+    }
+
+
+}
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
index ebd1d90b9..c160db396 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
@@ -45,11 +45,8 @@ import org.apache.tika.utils.ProcessUtils;
  */
 public class TikaCLITest {
 
-    private static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
+    static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
 
-    private static Path ASYNC_CONFIG;
-    @TempDir
-    private static Path ASYNC_OUTPUT_DIR;
 
     @TempDir
     private Path extractDir;
@@ -61,24 +58,7 @@ public class TikaCLITest {
     private PrintStream stderr = null;
     private String resourcePrefix;
 
-    @BeforeAll
-    public static void setUpClass() throws Exception {
-        ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
-        String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
-                "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
-                "</async>" + "<fetchers>" +
-                "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
-                "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
-                "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
-                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
-                "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
-                "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
-                "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
-                "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
-                "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
-                "</pipesIterator>" + "</properties>";
-        Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
-    }
+
 
     protected static void assertExtracted(Path p, String allFiles) throws IOException {
 
@@ -582,42 +562,11 @@ public class TikaCLITest {
         assertTrue(content.contains("application/vnd.oasis.opendocument.text-web"));
     }
 
-    @Test
-    public void testAsync() throws Exception {
-        String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
-
-        int json = 0;
-        for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
-            if (f.getName().endsWith(".json")) {
-                //check one file for pretty print
-                if (f.getName().equals("coffee.xls.json")) {
-                    checkForPrettyPrint(f);
-                }
-                json++;
-            }
-        }
-        assertEquals(17, json);
-    }
-
-    private void checkForPrettyPrint(File f) throws IOException {
-        String json = FileUtils.readFileToString(f, UTF_8);
-        int previous = json.indexOf("Content-Length");
-        assertTrue(previous > -1);
-        for (String k : new String[]{"Content-Type", "dc:creator",
-                "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
-            int i = json.indexOf(k);
-            assertTrue( i > -1, "should have found " + k);
-            assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
-            previous = i;
-        }
-    }
-
-
     /**
      * reset outContent and errContent if they are not empty
      * run given params in TikaCLI and return outContent String with UTF-8
      */
-    private String getParamOutContent(String... params) throws Exception {
+    String getParamOutContent(String... params) throws Exception {
         resetContent();
         TikaCLI.main(params);
         return outContent.toString("UTF-8");
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..c435a3e6e
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tika.io.FilenameUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.utils.StringUtils;
+
+public abstract class AbstractEmbeddedDocumentByteStore implements EmbeddedDocumentByteStore {
+
+    List<Integer> ids = new ArrayList<>();
+
+    public String getFetchKey(String containerFetchKey, int embeddedId,
+                              EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+                              Metadata metadata) {
+        String embeddedIdString = embeddedDocumentBytesConfig.getZeroPadName() > 0 ?
+                StringUtils.leftPad(Integer.toString(embeddedId),
+                        embeddedDocumentBytesConfig.getZeroPadName(), "0") :
+                Integer.toString(embeddedId);
+
+        StringBuilder fetchKey = new StringBuilder(containerFetchKey)
+                .append(embeddedDocumentBytesConfig.getEmbeddedIdPrefix())
+                .append(embeddedIdString);
+
+        if (embeddedDocumentBytesConfig.getSuffixStrategy().equals(
+                EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.EXISTING)) {
+            String fName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            String suffix = FilenameUtils.getSuffixFromPath(fName);
+            fetchKey.append(suffix);
+        }
+        return fetchKey.toString();
+    }
+
+    @Override
+    public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+           ids.add(id);
+    }
+
+    @Override
+    public List<Integer> getIds() {
+        return ids;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..b41285eb0
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+
+public class BasicEmbeddedDocumentByteStore extends AbstractEmbeddedDocumentByteStore {
+    private final EmbeddedDocumentBytesConfig config;
+    public BasicEmbeddedDocumentByteStore(EmbeddedDocumentBytesConfig config) {
+        this.config = config;
+    }
+    //this won't scale, but let's start fully in memory for now;
+    Map<Integer, byte[]> docBytes = new HashMap<>();
+    public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+        super.add(id, metadata, bytes);
+        docBytes.put(id, bytes);
+    }
+
+    public byte[] getDocument(int id) {
+        return docBytes.get(id);
+    }
+
+    @Override
+    public void close() throws IOException {
+        //delete tmp dir or whatever here
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..ad1bb81f3
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tika.metadata.Metadata;
+
+public interface EmbeddedDocumentByteStore extends Closeable {
+    //we need metadata for the emitter store...can we get away without it?
+    void add(int id, Metadata metadata, byte[] bytes) throws IOException;
+
+    byte[] getDocument(int id);
+
+    List<Integer> getIds();
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java
new file mode 100644
index 000000000..d88ec94c4
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import static org.apache.tika.sax.XHTMLContentHandler.XHTML;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.input.CloseShieldInputStream;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.AttributesImpl;
+
+import org.apache.tika.exception.CorruptedFileException;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.DelegatingParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.ParseRecord;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.sax.BodyContentHandler;
+import org.apache.tika.sax.EmbeddedContentHandler;
+
+/**
+ * Helper class for parsers of package archives or other compound document
+ * formats that support embedded or attached component documents.
+ *
+ * This is intended to both parse the embedded documents and extract
+ * the raw bytes from the embedded attachments when possible.
+ *
+ * See also {@link ParsingEmbeddedDocumentExtractor} and {@link ParserContainerExtractor}.
+ *
+ * @since 3.0.0
+ */
+public class ParsingAndEmbeddedDocExtractor implements EmbeddedDocumentExtractor {
+
+    private static final File ABSTRACT_PATH = new File("");
+
+    private static final Parser DELEGATING_PARSER = new DelegatingParser();
+
+    private boolean writeFileNameToContent = true;
+
+    private final ParseContext context;
+
+    public ParsingAndEmbeddedDocExtractor(ParseContext context) {
+        this.context = context;
+    }
+
+    public boolean shouldParseEmbedded(Metadata metadata) {
+        DocumentSelector selector = context.get(DocumentSelector.class);
+        if (selector != null) {
+            return selector.select(metadata);
+        }
+
+        FilenameFilter filter = context.get(FilenameFilter.class);
+        if (filter != null) {
+            String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            if (name != null) {
+                return filter.accept(ABSTRACT_PATH, name);
+            }
+        }
+
+        return true;
+    }
+
+    public void parseEmbedded(
+            InputStream stream, ContentHandler handler, Metadata metadata, boolean outputHtml)
+            throws SAXException, IOException {
+        if (outputHtml) {
+            AttributesImpl attributes = new AttributesImpl();
+            attributes.addAttribute("", "class", "class", "CDATA", "package-entry");
+            handler.startElement(XHTML, "div", "div", attributes);
+        }
+
+        String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+        if (writeFileNameToContent && name != null && name.length() > 0 && outputHtml) {
+            handler.startElement(XHTML, "h1", "h1", new AttributesImpl());
+            char[] chars = name.toCharArray();
+            handler.characters(chars, 0, chars.length);
+            handler.endElement(XHTML, "h1", "h1");
+        }
+
+        // Use the delegate parser to parse this entry
+        try (TemporaryResources tmp = new TemporaryResources()) {
+            final TikaInputStream newStream =
+                    TikaInputStream.get(CloseShieldInputStream.wrap(stream), tmp, metadata);
+            if (stream instanceof TikaInputStream) {
+                final Object container = ((TikaInputStream) stream).getOpenContainer();
+                if (container != null) {
+                    newStream.setOpenContainer(container);
+                }
+            }
+            Path p = newStream.getPath();
+            storeEmbeddedBytes(p, metadata);
+
+            DELEGATING_PARSER.parse(newStream, new EmbeddedContentHandler(new BodyContentHandler(handler)),
+                    metadata, context);
+        } catch (EncryptedDocumentException ede) {
+            recordException(ede, context);
+        } catch (CorruptedFileException e) {
+            //necessary to stop the parse to avoid infinite loops
+            //on corrupt sqlite3 files
+            throw new IOException(e);
+        } catch (TikaException e) {
+            recordException(e, context);
+        }
+
+        if (outputHtml) {
+            handler.endElement(XHTML, "div", "div");
+        }
+    }
+
+    private void storeEmbeddedBytes(Path p, Metadata metadata) {
+        EmbeddedDocumentByteStore embeddedDocumentByteStore =
+                context.get(EmbeddedDocumentByteStore.class);
+        int id = metadata.getInt(TikaCoreProperties.EMBEDDED_ID);
+        try {
+            embeddedDocumentByteStore.add(id, metadata, Files.readAllBytes(p));
+        } catch (IOException e) {
+            //log, or better, store embdocstore exception
+        }
+    }
+
+    private void recordException(Exception e, ParseContext context) {
+        ParseRecord record = context.get(ParseRecord.class);
+        if (record == null) {
+            return;
+        }
+        record.addException(e);
+    }
+
+    public Parser getDelegatingParser() {
+        return DELEGATING_PARSER;
+    }
+
+    public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+        this.writeFileNameToContent = writeFileNameToContent;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java
new file mode 100644
index 000000000..ca4c6633c
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+public class ParsingAndEmbeddedDocExtractorFactory
+        implements EmbeddedDocumentExtractorFactory {
+
+    private boolean writeFileNameToContent = true;
+
+    @Field
+    public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+        this.writeFileNameToContent = writeFileNameToContent;
+    }
+
+    @Override
+    public EmbeddedDocumentExtractor newInstance(Metadata metadata, ParseContext parseContext) {
+        ParsingEmbeddedDocumentExtractor ex =
+                new ParsingEmbeddedDocumentExtractor(parseContext);
+        ex.setWriteFileNameToContent(writeFileNameToContent);
+        return ex;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 3a8ec2bdd..c49f3743f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
@@ -38,6 +39,7 @@ public class FetchEmitTuple implements Serializable {
     private final ON_PARSE_EXCEPTION onParseException;
     private HandlerConfig handlerConfig;
 
+    private EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) {
         this(id, fetchKey, emitKey, new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
@@ -55,12 +57,20 @@ public class FetchEmitTuple implements Serializable {
 
     public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
                           HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException) {
+        this(id, fetchKey, emitKey, metadata, handlerConfig, onParseException,
+                EmbeddedDocumentBytesConfig.SKIP);
+    }
+
+    public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
+                          HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException,
+                          EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig) {
         this.id = id;
         this.fetchKey = fetchKey;
         this.emitKey = emitKey;
         this.metadata = metadata;
         this.handlerConfig = handlerConfig;
         this.onParseException = onParseException;
+        this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
     }
 
     public String getId() {
@@ -94,21 +104,40 @@ public class FetchEmitTuple implements Serializable {
         return handlerConfig == null ? HandlerConfig.DEFAULT_HANDLER_CONFIG : handlerConfig;
     }
 
+    public EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig() {
+        return embeddedDocumentBytesConfig;
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         FetchEmitTuple that = (FetchEmitTuple) o;
 
-        if (!Objects.equals(id, that.id)) return false;
-        if (!Objects.equals(fetchKey, that.fetchKey))
+        if (!Objects.equals(id, that.id)) {
+            return false;
+        }
+        if (!Objects.equals(fetchKey, that.fetchKey)) {
+            return false;
+        }
+        if (!Objects.equals(emitKey, that.emitKey)) {
+            return false;
+        }
+        if (!Objects.equals(metadata, that.metadata)) {
+            return false;
+        }
+        if (onParseException != that.onParseException) {
             return false;
-        if (!Objects.equals(emitKey, that.emitKey)) return false;
-        if (!Objects.equals(metadata, that.metadata))
+        }
+        if (!Objects.equals(handlerConfig, that.handlerConfig)) {
             return false;
-        if (onParseException != that.onParseException) return false;
-        return Objects.equals(handlerConfig, that.handlerConfig);
+        }
+        return Objects.equals(embeddedDocumentBytesConfig, that.embeddedDocumentBytesConfig);
     }
 
     @Override
@@ -119,13 +148,16 @@ public class FetchEmitTuple implements Serializable {
         result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
         result = 31 * result + (onParseException != null ? onParseException.hashCode() : 0);
         result = 31 * result + (handlerConfig != null ? handlerConfig.hashCode() : 0);
+        result = 31 * result +
+                (embeddedDocumentBytesConfig != null ? embeddedDocumentBytesConfig.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
         return "FetchEmitTuple{" + "id='" + id + '\'' + ", fetchKey=" + fetchKey + ", emitKey=" +
-            emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
-            ", handlerConfig=" + handlerConfig + '}';
+                emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
+                ", handlerConfig=" + handlerConfig + ", embeddedDocumentBytesConfig=" +
+                embeddedDocumentBytesConfig + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index ed1e5bb5e..94ef58502 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes;
 
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -24,10 +25,12 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
@@ -40,8 +43,11 @@ import org.xml.sax.SAXException;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.BasicEmbeddedDocumentByteStore;
 import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.extractor.EmbeddedDocumentByteStore;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
@@ -52,11 +58,14 @@ import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentEmitterStore;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
@@ -269,7 +278,7 @@ public class PipesServer implements Runnable {
      * @return
      */
     private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataList) {
-        if (metadataList == null || metadataList.size() < 1) {
+        if (metadataIsEmpty(metadataList)) {
             return StringUtils.EMPTY;
         }
         String stack = metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
@@ -277,11 +286,12 @@ public class PipesServer implements Runnable {
     }
 
 
-    private void emit(String taskId, EmitData emitData, String parseExceptionStack) {
+    private void emit(String taskId, EmitKey emitKey, MetadataListAndEmbeddedBytes parseData,
+                      String parseExceptionStack) {
         Emitter emitter = null;
 
         try {
-            emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+            emitter = emitterManager.getEmitter(emitKey.getEmitterName());
         } catch (IllegalArgumentException e) {
             String noEmitterMsg = getNoEmitterMsg(taskId);
             LOG.warn(noEmitterMsg);
@@ -289,7 +299,11 @@ public class PipesServer implements Runnable {
             return;
         }
         try {
-            emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList());
+            if (parseData.toBePackagedForStreamEmitter()) {
+                emitContentsAndBytes(emitter, emitKey, parseData);
+            } else {
+                emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList());
+            }
         } catch (IOException | TikaEmitterException e) {
             LOG.warn("emit exception", e);
             String msg = ExceptionUtils.getStackTrace(e);
@@ -306,6 +320,16 @@ public class PipesServer implements Runnable {
         }
     }
 
+    private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
+                                      MetadataListAndEmbeddedBytes parseData) {
+        if (! (emitter instanceof StreamEmitter)) {
+            throw new IllegalArgumentException("The emitter for embedded document byte store must" +
+                    " be a StreamEmitter. I see: " + emitter.getClass());
+        }
+        //TODO: implement this
+        throw new UnsupportedOperationException("this is not yet implemented");
+    }
+
     private void parseOne() {
         synchronized (lock) {
             parsing = true;
@@ -348,35 +372,53 @@ public class PipesServer implements Runnable {
         }
 
         start = System.currentTimeMillis();
-        List<Metadata> metadataList = parseIt(t, fetcher);
+        MetadataListAndEmbeddedBytes parseData = null;
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
-        }
+        try {
+            parseData = parseFromTuple(t, fetcher);
 
-        if (metadataIsEmpty(metadataList)) {
-            write(STATUS.EMPTY_OUTPUT);
-            return;
-        }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
+            }
+
+            if (metadataIsEmpty(parseData.getMetadataList())) {
+                write(STATUS.EMPTY_OUTPUT);
+                return;
+            }
 
-        emitIt(t, metadataList);
+            emitParseData(t, parseData);
+        } finally {
+            if (parseData.hasEmbeddedDocumentByteStore() &&
+                    parseData.getEmbeddedDocumentByteStore() instanceof Closeable) {
+                try {
+                    ((Closeable)parseData.getEmbeddedDocumentByteStore()).close();
+                } catch (IOException e) {
+                    LOG.warn("problem closing embedded document byte store", e);
+                }
+            }
+        }
     }
 
-    private void emitIt(FetchEmitTuple t, List<Metadata> metadataList) {
+    private void emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseData) {
         long start = System.currentTimeMillis();
-        String stack = getContainerStacktrace(t, metadataList);
+        String stack = getContainerStacktrace(t, parseData.getMetadataList());
         //we need to apply this after we pull out the stacktrace
-        filterMetadata(metadataList);
+        filterMetadata(parseData.getMetadataList());
         if (StringUtils.isBlank(stack) || t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
-            injectUserMetadata(t.getMetadata(), metadataList);
+            injectUserMetadata(t.getMetadata(), parseData.getMetadataList());
             EmitKey emitKey = t.getEmitKey();
             if (StringUtils.isBlank(emitKey.getEmitKey())) {
                 emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
                 t.setEmitKey(emitKey);
             }
-            EmitData emitData = new EmitData(t.getEmitKey(), metadataList, stack);
-            if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
-                emit(t.getId(), emitData, stack);
+            EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack);
+            if (parseData.toBePackagedForStreamEmitter()) {
+                emit(t.getId(), emitKey, parseData, stack);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
+                }
+            } else if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
+                emit(t.getId(), emitKey, parseData, stack);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
                 }
@@ -418,7 +460,7 @@ public class PipesServer implements Runnable {
         }
     }
 
-    protected List<Metadata> parseIt(FetchEmitTuple t, Fetcher fetcher) {
+    protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
         FetchKey fetchKey = t.getFetchKey();
         if (fetchKey.hasRange()) {
             if (! (fetcher instanceof RangeFetcher)) {
@@ -428,7 +470,7 @@ public class PipesServer implements Runnable {
             Metadata metadata = new Metadata();
             try (InputStream stream = ((RangeFetcher)fetcher).fetch(fetchKey.getFetchKey(),
                     fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
-                return parse(t, stream, metadata);
+                return parseWithStream(t, stream, metadata);
             } catch (SecurityException e) {
                 LOG.error("security exception " + t.getId(), e);
                 throw e;
@@ -439,7 +481,7 @@ public class PipesServer implements Runnable {
         } else {
             Metadata metadata = new Metadata();
             try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
-                return parse(t, stream, metadata);
+                return parseWithStream(t, stream, metadata);
             } catch (SecurityException e) {
                 LOG.error("security exception " + t.getId(), e);
                 throw e;
@@ -488,20 +530,46 @@ public class PipesServer implements Runnable {
         exit(1);
     }
 
-    private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream,
-                                 Metadata metadata) {
+    private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTuple, InputStream stream,
+                                           Metadata metadata) throws TikaConfigException {
         HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
+        List<Metadata> metadataList;
+        //this adds the EmbeddedDocumentByteStore to the parsecontext
+        ParseContext parseContext = createParseContext(fetchEmitTuple);
+        
         if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
-            return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata);
+            metadataList = parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
         } else {
-            return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata);
+            metadataList = parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
         }
+
+        return new MetadataListAndEmbeddedBytes(metadataList,
+                    parseContext.get(EmbeddedDocumentByteStore.class));
+    }
+
+    private ParseContext createParseContext(FetchEmitTuple fetchEmitTuple)
+            throws TikaConfigException {
+        ParseContext parseContext = new ParseContext();
+        if (fetchEmitTuple.getEmbeddedDocumentBytesConfig() == EmbeddedDocumentBytesConfig.SKIP) {
+            return parseContext;
+        }
+
+        if (! StringUtils.isBlank(fetchEmitTuple.getEmbeddedDocumentBytesConfig().getEmitter())) {
+            parseContext.set(EmbeddedDocumentByteStore.class,
+                    new EmbeddedDocumentEmitterStore(fetchEmitTuple.getEmitKey(),
+                            fetchEmitTuple.getEmbeddedDocumentBytesConfig(),
+                    emitterManager));
+        } else {
+            parseContext.set(EmbeddedDocumentByteStore.class,
+                    new BasicEmbeddedDocumentByteStore(fetchEmitTuple.getEmbeddedDocumentBytesConfig()));
+
+        }
+        return parseContext;
     }
 
     private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
                                              HandlerConfig handlerConfig, InputStream stream,
-                                             Metadata metadata) {
-        ParseContext parseContext = new ParseContext();
+                                             Metadata metadata, ParseContext parseContext) {
 
         ContentHandlerFactory contentHandlerFactory =
                 new BasicContentHandlerFactory(handlerConfig.getType(),
@@ -552,8 +620,7 @@ public class PipesServer implements Runnable {
 
     private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
                                           HandlerConfig handlerConfig, InputStream stream,
-                                          Metadata metadata) {
-        ParseContext parseContext = new ParseContext();
+                                          Metadata metadata, ParseContext parseContext) {
         //Intentionally do not add the metadata filter here!
         //We need to let stacktraces percolate
         RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
@@ -590,7 +657,7 @@ public class PipesServer implements Runnable {
             if (tis == null) {
                 tis = TikaInputStream.get(stream, tmp, metadata);
             }
-            _preParse(t.getId(), tis, metadata, parseContext);
+            _preParse(t, tis, metadata, parseContext);
         } finally {
             IOUtils.closeQuietly(tmp);
         }
@@ -598,13 +665,13 @@ public class PipesServer implements Runnable {
         writeIntermediate(t.getEmitKey(), metadata);
     }
 
-    private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+    private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
                            ParseContext parseContext) {
         if (digester != null) {
             try {
                 digester.digest(tis, metadata, parseContext);
             } catch (IOException e) {
-                LOG.warn("problem digesting: " + id, e);
+                LOG.warn("problem digesting: " + t.getId(), e);
             }
         }
         try {
@@ -612,7 +679,18 @@ public class PipesServer implements Runnable {
             metadata.set(Metadata.CONTENT_TYPE, mt.toString());
             metadata.set(TikaCoreProperties.CONTENT_TYPE_PARSER_OVERRIDE, mt.toString());
         } catch (IOException e) {
-            LOG.warn("problem detecting: " + id, e);
+            LOG.warn("problem detecting: " + t.getId(), e);
+        }
+
+        if (t.getEmbeddedDocumentBytesConfig() != null &&
+            t.getEmbeddedDocumentBytesConfig().isIncludeOriginal()) {
+            EmbeddedDocumentByteStore embeddedDocumentByteStore =
+                    parseContext.get(EmbeddedDocumentEmitterStore.class);
+            try {
+                embeddedDocumentByteStore.add(0, metadata, Files.readAllBytes(tis.getPath()));
+            } catch (IOException e) {
+                LOG.warn("problem reading source file into embedded document byte store", e);
+            }
         }
     }
 
@@ -734,4 +812,44 @@ public class PipesServer implements Runnable {
             exit(1);
         }
     }
+
+    private class MetadataListAndEmbeddedBytes {
+        final List<Metadata> metadataList;
+        final Optional<EmbeddedDocumentByteStore> embeddedDocumentByteStore;
+
+        public MetadataListAndEmbeddedBytes(List<Metadata> metadataList,
+                                            EmbeddedDocumentByteStore embeddedDocumentByteStore) {
+            this.metadataList = metadataList;
+            this.embeddedDocumentByteStore = Optional.ofNullable(embeddedDocumentByteStore);
+        }
+
+        public List<Metadata> getMetadataList() {
+            return metadataList;
+        }
+
+        public EmbeddedDocumentByteStore getEmbeddedDocumentByteStore() {
+            return embeddedDocumentByteStore.get();
+        }
+
+        /**
+         * This tests whether there's any type of embedded document store
+         * ...that, for example, may require closing at the end of the parse.
+         * @return
+         */
+        public boolean hasEmbeddedDocumentByteStore() {
+            return embeddedDocumentByteStore.isPresent();
+        }
+
+        /**
+         * If the intent is that the metadata and byte store be packaged in a zip
+         * or similar and emitted via a single stream emitter.
+         *
+         * This is basically a test that this is not an EmbeddedDocumentEmitterStore.
+         *
+         * @return
+         */
+        public boolean toBePackagedForStreamEmitter() {
+            return ! (embeddedDocumentByteStore.get() instanceof EmbeddedDocumentEmitterStore);
+        }
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
new file mode 100644
index 000000000..cdf3c77fe
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.extractor;
+
+public class EmbeddedDocumentBytesConfig {
+
+    public static EmbeddedDocumentBytesConfig SKIP = new EmbeddedDocumentBytesConfig(false);
+
+    public enum SUFFIX_STRATEGY {
+            NONE, EXISTING, DETECTED
+    }
+    private final boolean extractEmbeddedDocumentBytes;
+    //TODO -- add these at some point
+    /*
+        private Set<String> includeMimeTypes = new HashSet<>();
+        private Set<String> excludeMimeTypes = new HashSet<>();
+    */
+    private int zeroPadName = 0;
+
+    private SUFFIX_STRATEGY suffixStrategy = SUFFIX_STRATEGY.NONE;
+
+    private String embeddedIdPrefix = "-";
+
+    private String emitter;
+
+    private boolean includeOriginal = false;
+
+    public EmbeddedDocumentBytesConfig(boolean extractEmbeddedDocumentBytes) {
+        this.extractEmbeddedDocumentBytes = extractEmbeddedDocumentBytes;
+    }
+
+    public static EmbeddedDocumentBytesConfig getSKIP() {
+        return SKIP;
+    }
+
+    public boolean isExtractEmbeddedDocumentBytes() {
+        return extractEmbeddedDocumentBytes;
+    }
+
+    public int getZeroPadName() {
+        return zeroPadName;
+    }
+
+    public SUFFIX_STRATEGY getSuffixStrategy() {
+        return suffixStrategy;
+    }
+
+    public String getEmbeddedIdPrefix() {
+        return embeddedIdPrefix;
+    }
+
+    public String getEmitter() {
+        return emitter;
+    }
+
+    public boolean isIncludeOriginal() {
+        return includeOriginal;
+    }
+
+    public void setZeroPadNameLength(int zeroPadName) {
+        this.zeroPadName = zeroPadName;
+    }
+
+    public void setSuffixStrategy(SUFFIX_STRATEGY suffixStrategy) {
+        this.suffixStrategy = suffixStrategy;
+    }
+
+    public void setEmbeddedIdPrefix(String embeddedIdPrefix) {
+        this.embeddedIdPrefix = embeddedIdPrefix;
+    }
+
+    public void setEmitter(String emitter) {
+        this.emitter = emitter;
+    }
+
+    public void setIncludeOriginal(boolean includeOriginal) {
+        this.includeOriginal = includeOriginal;
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java
new file mode 100644
index 000000000..ddcca6edf
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java
@@ -0,0 +1,63 @@
+package org.apache.tika.pipes.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.io.IOExceptionWithCause;
+import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.extractor.AbstractEmbeddedDocumentByteStore;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+public class EmbeddedDocumentEmitterStore extends AbstractEmbeddedDocumentByteStore {
+    private final EmitKey containerEmitKey;
+    private final EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
+    private final StreamEmitter emitter;
+
+    private static final Metadata METADATA = new Metadata();
+    public EmbeddedDocumentEmitterStore(EmitKey containerEmitKey,
+                                        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+                                        EmitterManager emitterManager) throws TikaConfigException {
+        this.containerEmitKey = containerEmitKey;
+        this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
+        Emitter tmpEmitter =
+                emitterManager.getEmitter(embeddedDocumentBytesConfig.getEmitter());
+        if (! (tmpEmitter instanceof StreamEmitter)) {
+            throw new TikaConfigException("Emitter " +
+                    embeddedDocumentBytesConfig.getEmitter()
+                    + " must implement a StreamEmitter");
+        }
+        this.emitter = (StreamEmitter) tmpEmitter;
+    }
+
+    @Override
+    public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+        //intentionally do not call super.add, because we want the ids list to be empty
+        String emitKey = getFetchKey(containerEmitKey.getEmitKey(),
+                id, embeddedDocumentBytesConfig, metadata);
+
+        try {
+            emitter.emit(emitKey, new UnsynchronizedByteArrayInputStream(bytes), METADATA);
+        } catch (TikaEmitterException e) {
+            throw new IOExceptionWithCause(e);
+        }
+    }
+
+    @Override
+    public byte[] getDocument(int id) {
+        throw new UnsupportedOperationException("this is emit only.");
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (emitter instanceof Closeable) {
+            ((Closeable) emitter).close();
+        }
+    }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
index 53c784796..92d8c5c11 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
@@ -69,7 +69,7 @@ public class PipesServerTest extends TikaTest {
                 new FetchKey("fs", "mock.xml"),
                 new EmitKey("", ""));
         Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
-        List<Metadata> metadataList = pipesServer.parseIt(fetchEmitTuple, fetcher);
+        List<Metadata> metadataList = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
         assertEquals("5f3b924303e960ce35d7f705e91d3018dd110a9c3cef0546a91fe013d6dad6fd",
                 metadataList.get(0).get("X-TIKA:digest:SHA-256"));
     }
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3fbd67c0c..e1bec421a 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -32,6 +32,7 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -54,6 +55,8 @@ public class JsonFetchEmitTuple {
     private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
     private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode";
 
+    private static final String EMBEDDED_DOCUMENT_BYTES_CONFIG = "embeddedDocumentBytesConfig";
+
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
         try (JsonParser jParser = new JsonFactory().setStreamReadConstraints(StreamReadConstraints.builder()
@@ -84,6 +87,8 @@ public class JsonFetchEmitTuple {
                 FetchEmitTuple.DEFAULT_ON_PARSE_EXCEPTION;
         HandlerConfig handlerConfig = HandlerConfig.DEFAULT_HANDLER_CONFIG;
         Metadata metadata = new Metadata();
+        EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = EmbeddedDocumentBytesConfig.SKIP;
+
         while (token != JsonToken.END_OBJECT) {
             if (token != JsonToken.FIELD_NAME) {
                 throw new IOException("required field name, but see: " + token.name());
@@ -120,6 +125,8 @@ public class JsonFetchEmitTuple {
                 fetchRangeStart = getLong(jParser);
             } else if (FETCH_RANGE_END.equals(name)) {
                 fetchRangeEnd = getLong(jParser);
+            } else if (EMBEDDED_DOCUMENT_BYTES_CONFIG.equals(name)) {
+                embeddedDocumentBytesConfig = getEmbeddedDocumentBytesConfig(jParser);
             }
             token = jParser.nextToken();
         }
@@ -127,7 +134,39 @@ public class JsonFetchEmitTuple {
             id = fetchKey;
         }
         return new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey, fetchRangeStart, fetchRangeEnd),
-                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException);
+                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException,
+                embeddedDocumentBytesConfig);
+    }
+
+    private static EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig(JsonParser jParser) throws IOException {
+        JsonToken token = jParser.nextToken();
+        if (token != JsonToken.START_OBJECT) {
+            throw new IOException("required start object, but see: " + token.name());
+        }
+        String fieldName = jParser.nextFieldName();
+        EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true);
+        while (fieldName != null) {
+            switch (fieldName) {
+                //TODO: fill in more here!
+                case "extractEmbeddedDocumentBytes":
+                    boolean extract = jParser.nextBooleanValue();
+                    if (! extract) {
+                        return new EmbeddedDocumentBytesConfig(false);
+                    }
+                    break;
+                case "includeOriginal":
+                    config.setIncludeOriginal(jParser.nextBooleanValue());
+                    break;
+                case "emitter":
+                    config.setEmitter(jParser.nextTextValue());
+                    break;
+                default:
+                    throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
+                            "' in the context of an embeddedDocumentBytesConfig");
+            }
+            fieldName = jParser.nextFieldName();
+        }
+        return EmbeddedDocumentBytesConfig.SKIP;
     }
 
     private static HandlerConfig getHandlerConfig(JsonParser jParser) throws IOException {