You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2024/03/09 01:14:05 UTC
(tika) 01/01: TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-4207
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 0674ea4693d57cbc30f8a59417f469a48ce53c2f
Author: tallison <ta...@apache.org>
AuthorDate: Fri Mar 8 20:13:49 2024 -0500
TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D
---
.../java/org/apache/tika/cli/TikaCLIAsyncTest.java | 73 ++++++++
.../test/java/org/apache/tika/cli/TikaCLITest.java | 57 +------
.../AbstractEmbeddedDocumentByteStore.java | 63 +++++++
.../extractor/BasicEmbeddedDocumentByteStore.java | 46 +++++
.../tika/extractor/EmbeddedDocumentByteStore.java | 32 ++++
.../extractor/ParsingAndEmbeddedDocExtractor.java | 162 ++++++++++++++++++
.../ParsingAndEmbeddedDocExtractorFactory.java | 40 +++++
.../java/org/apache/tika/pipes/FetchEmitTuple.java | 52 ++++--
.../java/org/apache/tika/pipes/PipesServer.java | 188 +++++++++++++++++----
.../extractor/EmbeddedDocumentBytesConfig.java | 93 ++++++++++
.../extractor/EmbeddedDocumentEmitterStore.java | 63 +++++++
.../org/apache/tika/pipes/PipesServerTest.java | 2 +-
.../metadata/serialization/JsonFetchEmitTuple.java | 41 ++++-
13 files changed, 811 insertions(+), 101 deletions(-)
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
new file mode 100644
index 000000000..1f6c8fc2c
--- /dev/null
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java
@@ -0,0 +1,73 @@
+package org.apache.tika.cli;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TikaCLIAsyncTest extends TikaCLITest {
+
+ private static Path ASYNC_CONFIG;
+ @TempDir
+ private static Path ASYNC_OUTPUT_DIR;
+
+ @BeforeAll
+ public static void setUpClass() throws Exception {
+ ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
+ String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
+ "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
+ "</async>" + "<fetchers>" +
+ "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
+ "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
+ "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
+ "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+ "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
+ "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
+ "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
+ "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
+ "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
+ "</pipesIterator>" + "</properties>";
+ Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
+ }
+
+ @Test
+ public void testAsync() throws Exception {
+ String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
+
+ int json = 0;
+ for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
+ if (f.getName().endsWith(".json")) {
+ //check one file for pretty print
+ if (f.getName().equals("coffee.xls.json")) {
+ checkForPrettyPrint(f);
+ }
+ json++;
+ }
+ }
+ assertEquals(17, json);
+ }
+
+ private void checkForPrettyPrint(File f) throws IOException {
+ String json = FileUtils.readFileToString(f, UTF_8);
+ int previous = json.indexOf("Content-Length");
+ assertTrue(previous > -1);
+ for (String k : new String[]{"Content-Type", "dc:creator",
+ "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
+ int i = json.indexOf(k);
+ assertTrue( i > -1, "should have found " + k);
+ assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
+ previous = i;
+ }
+ }
+
+
+}
diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
index ebd1d90b9..c160db396 100644
--- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
+++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java
@@ -45,11 +45,8 @@ import org.apache.tika.utils.ProcessUtils;
*/
public class TikaCLITest {
- private static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
+ static final File TEST_DATA_FILE = new File("src/test/resources/test-data");
- private static Path ASYNC_CONFIG;
- @TempDir
- private static Path ASYNC_OUTPUT_DIR;
@TempDir
private Path extractDir;
@@ -61,24 +58,7 @@ public class TikaCLITest {
private PrintStream stderr = null;
private String resourcePrefix;
- @BeforeAll
- public static void setUpClass() throws Exception {
- ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml");
- String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" +
- "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" +
- "</async>" + "<fetchers>" +
- "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" +
- "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() +
- "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" +
- "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
- "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() +
- "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" +
- "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" +
- "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" +
- "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" +
- "</pipesIterator>" + "</properties>";
- Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8));
- }
+
protected static void assertExtracted(Path p, String allFiles) throws IOException {
@@ -582,42 +562,11 @@ public class TikaCLITest {
assertTrue(content.contains("application/vnd.oasis.opendocument.text-web"));
}
- @Test
- public void testAsync() throws Exception {
- String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath());
-
- int json = 0;
- for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) {
- if (f.getName().endsWith(".json")) {
- //check one file for pretty print
- if (f.getName().equals("coffee.xls.json")) {
- checkForPrettyPrint(f);
- }
- json++;
- }
- }
- assertEquals(17, json);
- }
-
- private void checkForPrettyPrint(File f) throws IOException {
- String json = FileUtils.readFileToString(f, UTF_8);
- int previous = json.indexOf("Content-Length");
- assertTrue(previous > -1);
- for (String k : new String[]{"Content-Type", "dc:creator",
- "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) {
- int i = json.indexOf(k);
- assertTrue( i > -1, "should have found " + k);
- assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous);
- previous = i;
- }
- }
-
-
/**
* reset outContent and errContent if they are not empty
* run given params in TikaCLI and return outContent String with UTF-8
*/
- private String getParamOutContent(String... params) throws Exception {
+ String getParamOutContent(String... params) throws Exception {
resetContent();
TikaCLI.main(params);
return outContent.toString("UTF-8");
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..c435a3e6e
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tika.io.FilenameUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+import org.apache.tika.utils.StringUtils;
+
+public abstract class AbstractEmbeddedDocumentByteStore implements EmbeddedDocumentByteStore {
+
+ List<Integer> ids = new ArrayList<>();
+
+ public String getFetchKey(String containerFetchKey, int embeddedId,
+ EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+ Metadata metadata) {
+ String embeddedIdString = embeddedDocumentBytesConfig.getZeroPadName() > 0 ?
+ StringUtils.leftPad(Integer.toString(embeddedId),
+ embeddedDocumentBytesConfig.getZeroPadName(), "0") :
+ Integer.toString(embeddedId);
+
+ StringBuilder fetchKey = new StringBuilder(containerFetchKey)
+ .append(embeddedDocumentBytesConfig.getEmbeddedIdPrefix())
+ .append(embeddedIdString);
+
+ if (embeddedDocumentBytesConfig.getSuffixStrategy().equals(
+ EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.EXISTING)) {
+ String fName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+ String suffix = FilenameUtils.getSuffixFromPath(fName);
+ fetchKey.append(suffix);
+ }
+ return fetchKey.toString();
+ }
+
+ @Override
+ public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+ ids.add(id);
+ }
+
+ @Override
+ public List<Integer> getIds() {
+ return ids;
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..b41285eb0
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
+
+public class BasicEmbeddedDocumentByteStore extends AbstractEmbeddedDocumentByteStore {
+ private final EmbeddedDocumentBytesConfig config;
+ public BasicEmbeddedDocumentByteStore(EmbeddedDocumentBytesConfig config) {
+ this.config = config;
+ }
+ //this won't scale, but let's start fully in memory for now;
+ Map<Integer, byte[]> docBytes = new HashMap<>();
+ public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+ super.add(id, metadata, bytes);
+ docBytes.put(id, bytes);
+ }
+
+ public byte[] getDocument(int id) {
+ return docBytes.get(id);
+ }
+
+ @Override
+ public void close() throws IOException {
+ //delete tmp dir or whatever here
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java
new file mode 100644
index 000000000..ad1bb81f3
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tika.metadata.Metadata;
+
+public interface EmbeddedDocumentByteStore extends Closeable {
+ //we need metadata for the emitter store...can we get away without it?
+ void add(int id, Metadata metadata, byte[] bytes) throws IOException;
+
+ byte[] getDocument(int id);
+
+ List<Integer> getIds();
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java
new file mode 100644
index 000000000..d88ec94c4
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import static org.apache.tika.sax.XHTMLContentHandler.XHTML;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.input.CloseShieldInputStream;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.AttributesImpl;
+
+import org.apache.tika.exception.CorruptedFileException;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.DelegatingParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.ParseRecord;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.sax.BodyContentHandler;
+import org.apache.tika.sax.EmbeddedContentHandler;
+
+/**
+ * Helper class for parsers of package archives or other compound document
+ * formats that support embedded or attached component documents.
+ *
+ * This is intended to both parse the embedded documents and extract
+ * the raw bytes from the embedded attachments when possible.
+ *
+ * See also {@link ParsingEmbeddedDocumentExtractor} and {@link ParserContainerExtractor}.
+ *
+ * @since 3.0.0
+ */
+public class ParsingAndEmbeddedDocExtractor implements EmbeddedDocumentExtractor {
+
+ private static final File ABSTRACT_PATH = new File("");
+
+ private static final Parser DELEGATING_PARSER = new DelegatingParser();
+
+ private boolean writeFileNameToContent = true;
+
+ private final ParseContext context;
+
+ public ParsingAndEmbeddedDocExtractor(ParseContext context) {
+ this.context = context;
+ }
+
+ public boolean shouldParseEmbedded(Metadata metadata) {
+ DocumentSelector selector = context.get(DocumentSelector.class);
+ if (selector != null) {
+ return selector.select(metadata);
+ }
+
+ FilenameFilter filter = context.get(FilenameFilter.class);
+ if (filter != null) {
+ String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+ if (name != null) {
+ return filter.accept(ABSTRACT_PATH, name);
+ }
+ }
+
+ return true;
+ }
+
+ public void parseEmbedded(
+ InputStream stream, ContentHandler handler, Metadata metadata, boolean outputHtml)
+ throws SAXException, IOException {
+ if (outputHtml) {
+ AttributesImpl attributes = new AttributesImpl();
+ attributes.addAttribute("", "class", "class", "CDATA", "package-entry");
+ handler.startElement(XHTML, "div", "div", attributes);
+ }
+
+ String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+ if (writeFileNameToContent && name != null && name.length() > 0 && outputHtml) {
+ handler.startElement(XHTML, "h1", "h1", new AttributesImpl());
+ char[] chars = name.toCharArray();
+ handler.characters(chars, 0, chars.length);
+ handler.endElement(XHTML, "h1", "h1");
+ }
+
+ // Use the delegate parser to parse this entry
+ try (TemporaryResources tmp = new TemporaryResources()) {
+ final TikaInputStream newStream =
+ TikaInputStream.get(CloseShieldInputStream.wrap(stream), tmp, metadata);
+ if (stream instanceof TikaInputStream) {
+ final Object container = ((TikaInputStream) stream).getOpenContainer();
+ if (container != null) {
+ newStream.setOpenContainer(container);
+ }
+ }
+ Path p = newStream.getPath();
+ storeEmbeddedBytes(p, metadata);
+
+ DELEGATING_PARSER.parse(newStream, new EmbeddedContentHandler(new BodyContentHandler(handler)),
+ metadata, context);
+ } catch (EncryptedDocumentException ede) {
+ recordException(ede, context);
+ } catch (CorruptedFileException e) {
+ //necessary to stop the parse to avoid infinite loops
+ //on corrupt sqlite3 files
+ throw new IOException(e);
+ } catch (TikaException e) {
+ recordException(e, context);
+ }
+
+ if (outputHtml) {
+ handler.endElement(XHTML, "div", "div");
+ }
+ }
+
+ private void storeEmbeddedBytes(Path p, Metadata metadata) {
+ EmbeddedDocumentByteStore embeddedDocumentByteStore =
+ context.get(EmbeddedDocumentByteStore.class);
+ int id = metadata.getInt(TikaCoreProperties.EMBEDDED_ID);
+ try {
+ embeddedDocumentByteStore.add(id, metadata, Files.readAllBytes(p));
+ } catch (IOException e) {
+ //log, or better, store embdocstore exception
+ }
+ }
+
+ private void recordException(Exception e, ParseContext context) {
+ ParseRecord record = context.get(ParseRecord.class);
+ if (record == null) {
+ return;
+ }
+ record.addException(e);
+ }
+
+ public Parser getDelegatingParser() {
+ return DELEGATING_PARSER;
+ }
+
+ public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+ this.writeFileNameToContent = writeFileNameToContent;
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java
new file mode 100644
index 000000000..ca4c6633c
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.extractor;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+public class ParsingAndEmbeddedDocExtractorFactory
+ implements EmbeddedDocumentExtractorFactory {
+
+ private boolean writeFileNameToContent = true;
+
+ @Field
+ public void setWriteFileNameToContent(boolean writeFileNameToContent) {
+ this.writeFileNameToContent = writeFileNameToContent;
+ }
+
+ @Override
+ public EmbeddedDocumentExtractor newInstance(Metadata metadata, ParseContext parseContext) {
+ ParsingEmbeddedDocumentExtractor ex =
+ new ParsingEmbeddedDocumentExtractor(parseContext);
+ ex.setWriteFileNameToContent(writeFileNameToContent);
+ return ex;
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 3a8ec2bdd..c49f3743f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
import java.util.Objects;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
@@ -38,6 +39,7 @@ public class FetchEmitTuple implements Serializable {
private final ON_PARSE_EXCEPTION onParseException;
private HandlerConfig handlerConfig;
+ private EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) {
this(id, fetchKey, emitKey, new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
@@ -55,12 +57,20 @@ public class FetchEmitTuple implements Serializable {
public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException) {
+ this(id, fetchKey, emitKey, metadata, handlerConfig, onParseException,
+ EmbeddedDocumentBytesConfig.SKIP);
+ }
+
+ public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
+ HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException,
+ EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig) {
this.id = id;
this.fetchKey = fetchKey;
this.emitKey = emitKey;
this.metadata = metadata;
this.handlerConfig = handlerConfig;
this.onParseException = onParseException;
+ this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
}
public String getId() {
@@ -94,21 +104,40 @@ public class FetchEmitTuple implements Serializable {
return handlerConfig == null ? HandlerConfig.DEFAULT_HANDLER_CONFIG : handlerConfig;
}
+ public EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig() {
+ return embeddedDocumentBytesConfig;
+ }
+
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
FetchEmitTuple that = (FetchEmitTuple) o;
- if (!Objects.equals(id, that.id)) return false;
- if (!Objects.equals(fetchKey, that.fetchKey))
+ if (!Objects.equals(id, that.id)) {
+ return false;
+ }
+ if (!Objects.equals(fetchKey, that.fetchKey)) {
+ return false;
+ }
+ if (!Objects.equals(emitKey, that.emitKey)) {
+ return false;
+ }
+ if (!Objects.equals(metadata, that.metadata)) {
+ return false;
+ }
+ if (onParseException != that.onParseException) {
return false;
- if (!Objects.equals(emitKey, that.emitKey)) return false;
- if (!Objects.equals(metadata, that.metadata))
+ }
+ if (!Objects.equals(handlerConfig, that.handlerConfig)) {
return false;
- if (onParseException != that.onParseException) return false;
- return Objects.equals(handlerConfig, that.handlerConfig);
+ }
+ return Objects.equals(embeddedDocumentBytesConfig, that.embeddedDocumentBytesConfig);
}
@Override
@@ -119,13 +148,16 @@ public class FetchEmitTuple implements Serializable {
result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
result = 31 * result + (onParseException != null ? onParseException.hashCode() : 0);
result = 31 * result + (handlerConfig != null ? handlerConfig.hashCode() : 0);
+ result = 31 * result +
+ (embeddedDocumentBytesConfig != null ? embeddedDocumentBytesConfig.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "FetchEmitTuple{" + "id='" + id + '\'' + ", fetchKey=" + fetchKey + ", emitKey=" +
- emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
- ", handlerConfig=" + handlerConfig + '}';
+ emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException +
+ ", handlerConfig=" + handlerConfig + ", embeddedDocumentBytesConfig=" +
+ embeddedDocumentBytesConfig + '}';
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index ed1e5bb5e..94ef58502 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.pipes;
+import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -24,10 +25,12 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
@@ -40,8 +43,11 @@ import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.Detector;
import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.BasicEmbeddedDocumentByteStore;
import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.extractor.EmbeddedDocumentByteStore;
import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
@@ -52,11 +58,14 @@ import org.apache.tika.parser.DigestingParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentEmitterStore;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
@@ -269,7 +278,7 @@ public class PipesServer implements Runnable {
* @return
*/
private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataList) {
- if (metadataList == null || metadataList.size() < 1) {
+ if (metadataIsEmpty(metadataList)) {
return StringUtils.EMPTY;
}
String stack = metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
@@ -277,11 +286,12 @@ public class PipesServer implements Runnable {
}
- private void emit(String taskId, EmitData emitData, String parseExceptionStack) {
+ private void emit(String taskId, EmitKey emitKey, MetadataListAndEmbeddedBytes parseData,
+ String parseExceptionStack) {
Emitter emitter = null;
try {
- emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+ emitter = emitterManager.getEmitter(emitKey.getEmitterName());
} catch (IllegalArgumentException e) {
String noEmitterMsg = getNoEmitterMsg(taskId);
LOG.warn(noEmitterMsg);
@@ -289,7 +299,11 @@ public class PipesServer implements Runnable {
return;
}
try {
- emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList());
+ if (parseData.toBePackagedForStreamEmitter()) {
+ emitContentsAndBytes(emitter, emitKey, parseData);
+ } else {
+ emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList());
+ }
} catch (IOException | TikaEmitterException e) {
LOG.warn("emit exception", e);
String msg = ExceptionUtils.getStackTrace(e);
@@ -306,6 +320,16 @@ public class PipesServer implements Runnable {
}
}
+ private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
+ MetadataListAndEmbeddedBytes parseData) {
+ if (! (emitter instanceof StreamEmitter)) {
+ throw new IllegalArgumentException("The emitter for embedded document byte store must" +
+ " be a StreamEmitter. I see: " + emitter.getClass());
+ }
+ //TODO: implement this
+ throw new UnsupportedOperationException("this is not yet implemented");
+ }
+
private void parseOne() {
synchronized (lock) {
parsing = true;
@@ -348,35 +372,53 @@ public class PipesServer implements Runnable {
}
start = System.currentTimeMillis();
- List<Metadata> metadataList = parseIt(t, fetcher);
+ MetadataListAndEmbeddedBytes parseData = null;
- if (LOG.isTraceEnabled()) {
- LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
- }
+ try {
+ parseData = parseFromTuple(t, fetcher);
- if (metadataIsEmpty(metadataList)) {
- write(STATUS.EMPTY_OUTPUT);
- return;
- }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start);
+ }
+
+ if (metadataIsEmpty(parseData.getMetadataList())) {
+ write(STATUS.EMPTY_OUTPUT);
+ return;
+ }
- emitIt(t, metadataList);
+ emitParseData(t, parseData);
+ } finally {
+ if (parseData.hasEmbeddedDocumentByteStore() &&
+ parseData.getEmbeddedDocumentByteStore() instanceof Closeable) {
+ try {
+ ((Closeable)parseData.getEmbeddedDocumentByteStore()).close();
+ } catch (IOException e) {
+ LOG.warn("problem closing embedded document byte store", e);
+ }
+ }
+ }
}
- private void emitIt(FetchEmitTuple t, List<Metadata> metadataList) {
+ private void emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseData) {
long start = System.currentTimeMillis();
- String stack = getContainerStacktrace(t, metadataList);
+ String stack = getContainerStacktrace(t, parseData.getMetadataList());
//we need to apply this after we pull out the stacktrace
- filterMetadata(metadataList);
+ filterMetadata(parseData.getMetadataList());
if (StringUtils.isBlank(stack) || t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
- injectUserMetadata(t.getMetadata(), metadataList);
+ injectUserMetadata(t.getMetadata(), parseData.getMetadataList());
EmitKey emitKey = t.getEmitKey();
if (StringUtils.isBlank(emitKey.getEmitKey())) {
emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
t.setEmitKey(emitKey);
}
- EmitData emitData = new EmitData(t.getEmitKey(), metadataList, stack);
- if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
- emit(t.getId(), emitData, stack);
+ EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack);
+ if (parseData.toBePackagedForStreamEmitter()) {
+ emit(t.getId(), emitKey, parseData, stack);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
+ }
+ } else if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
+ emit(t.getId(), emitKey, parseData, stack);
if (LOG.isTraceEnabled()) {
LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start);
}
@@ -418,7 +460,7 @@ public class PipesServer implements Runnable {
}
}
- protected List<Metadata> parseIt(FetchEmitTuple t, Fetcher fetcher) {
+ protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) {
FetchKey fetchKey = t.getFetchKey();
if (fetchKey.hasRange()) {
if (! (fetcher instanceof RangeFetcher)) {
@@ -428,7 +470,7 @@ public class PipesServer implements Runnable {
Metadata metadata = new Metadata();
try (InputStream stream = ((RangeFetcher)fetcher).fetch(fetchKey.getFetchKey(),
fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) {
- return parse(t, stream, metadata);
+ return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
throw e;
@@ -439,7 +481,7 @@ public class PipesServer implements Runnable {
} else {
Metadata metadata = new Metadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
- return parse(t, stream, metadata);
+ return parseWithStream(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
throw e;
@@ -488,20 +530,46 @@ public class PipesServer implements Runnable {
exit(1);
}
- private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream,
- Metadata metadata) {
+ private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTuple, InputStream stream,
+ Metadata metadata) throws TikaConfigException {
HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
+ List<Metadata> metadataList;
+ //this adds the EmbeddedDocumentByteStore to the parsecontext
+ ParseContext parseContext = createParseContext(fetchEmitTuple);
+
if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
- return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata);
+ metadataList = parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
} else {
- return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata);
+ metadataList = parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata, parseContext);
}
+
+ return new MetadataListAndEmbeddedBytes(metadataList,
+ parseContext.get(EmbeddedDocumentByteStore.class));
+ }
+
+ private ParseContext createParseContext(FetchEmitTuple fetchEmitTuple)
+ throws TikaConfigException {
+ ParseContext parseContext = new ParseContext();
+ if (fetchEmitTuple.getEmbeddedDocumentBytesConfig() == EmbeddedDocumentBytesConfig.SKIP) {
+ return parseContext;
+ }
+
+ if (! StringUtils.isBlank(fetchEmitTuple.getEmbeddedDocumentBytesConfig().getEmitter())) {
+ parseContext.set(EmbeddedDocumentByteStore.class,
+ new EmbeddedDocumentEmitterStore(fetchEmitTuple.getEmitKey(),
+ fetchEmitTuple.getEmbeddedDocumentBytesConfig(),
+ emitterManager));
+ } else {
+ parseContext.set(EmbeddedDocumentByteStore.class,
+ new BasicEmbeddedDocumentByteStore(fetchEmitTuple.getEmbeddedDocumentBytesConfig()));
+
+ }
+ return parseContext;
}
private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
HandlerConfig handlerConfig, InputStream stream,
- Metadata metadata) {
- ParseContext parseContext = new ParseContext();
+ Metadata metadata, ParseContext parseContext) {
ContentHandlerFactory contentHandlerFactory =
new BasicContentHandlerFactory(handlerConfig.getType(),
@@ -552,8 +620,7 @@ public class PipesServer implements Runnable {
private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
HandlerConfig handlerConfig, InputStream stream,
- Metadata metadata) {
- ParseContext parseContext = new ParseContext();
+ Metadata metadata, ParseContext parseContext) {
//Intentionally do not add the metadata filter here!
//We need to let stacktraces percolate
RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
@@ -590,7 +657,7 @@ public class PipesServer implements Runnable {
if (tis == null) {
tis = TikaInputStream.get(stream, tmp, metadata);
}
- _preParse(t.getId(), tis, metadata, parseContext);
+ _preParse(t, tis, metadata, parseContext);
} finally {
IOUtils.closeQuietly(tmp);
}
@@ -598,13 +665,13 @@ public class PipesServer implements Runnable {
writeIntermediate(t.getEmitKey(), metadata);
}
- private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+ private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
ParseContext parseContext) {
if (digester != null) {
try {
digester.digest(tis, metadata, parseContext);
} catch (IOException e) {
- LOG.warn("problem digesting: " + id, e);
+ LOG.warn("problem digesting: " + t.getId(), e);
}
}
try {
@@ -612,7 +679,18 @@ public class PipesServer implements Runnable {
metadata.set(Metadata.CONTENT_TYPE, mt.toString());
metadata.set(TikaCoreProperties.CONTENT_TYPE_PARSER_OVERRIDE, mt.toString());
} catch (IOException e) {
- LOG.warn("problem detecting: " + id, e);
+ LOG.warn("problem detecting: " + t.getId(), e);
+ }
+
+ if (t.getEmbeddedDocumentBytesConfig() != null &&
+ t.getEmbeddedDocumentBytesConfig().isIncludeOriginal()) {
+ EmbeddedDocumentByteStore embeddedDocumentByteStore =
+ parseContext.get(EmbeddedDocumentEmitterStore.class);
+ try {
+ embeddedDocumentByteStore.add(0, metadata, Files.readAllBytes(tis.getPath()));
+ } catch (IOException e) {
+ LOG.warn("problem reading source file into embedded document byte store", e);
+ }
}
}
@@ -734,4 +812,44 @@ public class PipesServer implements Runnable {
exit(1);
}
}
+
+ private class MetadataListAndEmbeddedBytes {
+ final List<Metadata> metadataList;
+ final Optional<EmbeddedDocumentByteStore> embeddedDocumentByteStore;
+
+ public MetadataListAndEmbeddedBytes(List<Metadata> metadataList,
+ EmbeddedDocumentByteStore embeddedDocumentByteStore) {
+ this.metadataList = metadataList;
+ this.embeddedDocumentByteStore = Optional.ofNullable(embeddedDocumentByteStore);
+ }
+
+ public List<Metadata> getMetadataList() {
+ return metadataList;
+ }
+
+ public EmbeddedDocumentByteStore getEmbeddedDocumentByteStore() {
+ return embeddedDocumentByteStore.get();
+ }
+
+ /**
+ * This tests whether there's any type of embedded document store
+ * ...that, for example, may require closing at the end of the parse.
+ * @return
+ */
+ public boolean hasEmbeddedDocumentByteStore() {
+ return embeddedDocumentByteStore.isPresent();
+ }
+
+ /**
+ * If the intent is that the metadata and byte store be packaged in a zip
+ * or similar and emitted via a single stream emitter.
+ *
+ * This is basically a test that this is not an EmbeddedDocumentEmitterStore.
+ *
+ * @return
+ */
+ public boolean toBePackagedForStreamEmitter() {
+ return ! (embeddedDocumentByteStore.get() instanceof EmbeddedDocumentEmitterStore);
+ }
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
new file mode 100644
index 000000000..cdf3c77fe
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.extractor;
+
+public class EmbeddedDocumentBytesConfig {
+
+ public static EmbeddedDocumentBytesConfig SKIP = new EmbeddedDocumentBytesConfig(false);
+
+ public enum SUFFIX_STRATEGY {
+ NONE, EXISTING, DETECTED
+ }
+ private final boolean extractEmbeddedDocumentBytes;
+ //TODO -- add these at some point
+ /*
+ private Set<String> includeMimeTypes = new HashSet<>();
+ private Set<String> excludeMimeTypes = new HashSet<>();
+ */
+ private int zeroPadName = 0;
+
+ private SUFFIX_STRATEGY suffixStrategy = SUFFIX_STRATEGY.NONE;
+
+ private String embeddedIdPrefix = "-";
+
+ private String emitter;
+
+ private boolean includeOriginal = false;
+
+ public EmbeddedDocumentBytesConfig(boolean extractEmbeddedDocumentBytes) {
+ this.extractEmbeddedDocumentBytes = extractEmbeddedDocumentBytes;
+ }
+
+ public static EmbeddedDocumentBytesConfig getSKIP() {
+ return SKIP;
+ }
+
+ public boolean isExtractEmbeddedDocumentBytes() {
+ return extractEmbeddedDocumentBytes;
+ }
+
+ public int getZeroPadName() {
+ return zeroPadName;
+ }
+
+ public SUFFIX_STRATEGY getSuffixStrategy() {
+ return suffixStrategy;
+ }
+
+ public String getEmbeddedIdPrefix() {
+ return embeddedIdPrefix;
+ }
+
+ public String getEmitter() {
+ return emitter;
+ }
+
+ public boolean isIncludeOriginal() {
+ return includeOriginal;
+ }
+
+ public void setZeroPadNameLength(int zeroPadName) {
+ this.zeroPadName = zeroPadName;
+ }
+
+ public void setSuffixStrategy(SUFFIX_STRATEGY suffixStrategy) {
+ this.suffixStrategy = suffixStrategy;
+ }
+
+ public void setEmbeddedIdPrefix(String embeddedIdPrefix) {
+ this.embeddedIdPrefix = embeddedIdPrefix;
+ }
+
+ public void setEmitter(String emitter) {
+ this.emitter = emitter;
+ }
+
+ public void setIncludeOriginal(boolean includeOriginal) {
+ this.includeOriginal = includeOriginal;
+ }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java
new file mode 100644
index 000000000..ddcca6edf
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java
@@ -0,0 +1,63 @@
+package org.apache.tika.pipes.extractor;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.io.IOExceptionWithCause;
+import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.extractor.AbstractEmbeddedDocumentByteStore;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+public class EmbeddedDocumentEmitterStore extends AbstractEmbeddedDocumentByteStore {
+ private final EmitKey containerEmitKey;
+ private final EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig;
+ private final StreamEmitter emitter;
+
+ private static final Metadata METADATA = new Metadata();
+ public EmbeddedDocumentEmitterStore(EmitKey containerEmitKey,
+ EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig,
+ EmitterManager emitterManager) throws TikaConfigException {
+ this.containerEmitKey = containerEmitKey;
+ this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
+ Emitter tmpEmitter =
+ emitterManager.getEmitter(embeddedDocumentBytesConfig.getEmitter());
+ if (! (tmpEmitter instanceof StreamEmitter)) {
+ throw new TikaConfigException("Emitter " +
+ embeddedDocumentBytesConfig.getEmitter()
+ + " must implement a StreamEmitter");
+ }
+ this.emitter = (StreamEmitter) tmpEmitter;
+ }
+
+ @Override
+ public void add(int id, Metadata metadata, byte[] bytes) throws IOException {
+ //intentionally do not call super.add, because we want the ids list to be empty
+ String emitKey = getFetchKey(containerEmitKey.getEmitKey(),
+ id, embeddedDocumentBytesConfig, metadata);
+
+ try {
+ emitter.emit(emitKey, new UnsynchronizedByteArrayInputStream(bytes), METADATA);
+ } catch (TikaEmitterException e) {
+ throw new IOExceptionWithCause(e);
+ }
+ }
+
+ @Override
+ public byte[] getDocument(int id) {
+ throw new UnsupportedOperationException("this is emit only.");
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (emitter instanceof Closeable) {
+ ((Closeable) emitter).close();
+ }
+ }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
index 53c784796..92d8c5c11 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java
@@ -69,7 +69,7 @@ public class PipesServerTest extends TikaTest {
new FetchKey("fs", "mock.xml"),
new EmitKey("", ""));
Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher();
- List<Metadata> metadataList = pipesServer.parseIt(fetchEmitTuple, fetcher);
+ List<Metadata> metadataList = pipesServer.parseFromTuple(fetchEmitTuple, fetcher);
assertEquals("5f3b924303e960ce35d7f705e91d3018dd110a9c3cef0546a91fe013d6dad6fd",
metadataList.get(0).get("X-TIKA:digest:SHA-256"));
}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3fbd67c0c..e1bec421a 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -32,6 +32,7 @@ import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -54,6 +55,8 @@ public class JsonFetchEmitTuple {
private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode";
+ private static final String EMBEDDED_DOCUMENT_BYTES_CONFIG = "embeddedDocumentBytesConfig";
+
public static FetchEmitTuple fromJson(Reader reader) throws IOException {
try (JsonParser jParser = new JsonFactory().setStreamReadConstraints(StreamReadConstraints.builder()
@@ -84,6 +87,8 @@ public class JsonFetchEmitTuple {
FetchEmitTuple.DEFAULT_ON_PARSE_EXCEPTION;
HandlerConfig handlerConfig = HandlerConfig.DEFAULT_HANDLER_CONFIG;
Metadata metadata = new Metadata();
+ EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = EmbeddedDocumentBytesConfig.SKIP;
+
while (token != JsonToken.END_OBJECT) {
if (token != JsonToken.FIELD_NAME) {
throw new IOException("required field name, but see: " + token.name());
@@ -120,6 +125,8 @@ public class JsonFetchEmitTuple {
fetchRangeStart = getLong(jParser);
} else if (FETCH_RANGE_END.equals(name)) {
fetchRangeEnd = getLong(jParser);
+ } else if (EMBEDDED_DOCUMENT_BYTES_CONFIG.equals(name)) {
+ embeddedDocumentBytesConfig = getEmbeddedDocumentBytesConfig(jParser);
}
token = jParser.nextToken();
}
@@ -127,7 +134,39 @@ public class JsonFetchEmitTuple {
id = fetchKey;
}
return new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey, fetchRangeStart, fetchRangeEnd),
- new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException);
+ new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException,
+ embeddedDocumentBytesConfig);
+ }
+
+ private static EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig(JsonParser jParser) throws IOException {
+ JsonToken token = jParser.nextToken();
+ if (token != JsonToken.START_OBJECT) {
+ throw new IOException("required start object, but see: " + token.name());
+ }
+ String fieldName = jParser.nextFieldName();
+ EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true);
+ while (fieldName != null) {
+ switch (fieldName) {
+ //TODO: fill in more here!
+ case "extractEmbeddedDocumentBytes":
+ boolean extract = jParser.nextBooleanValue();
+ if (! extract) {
+ return new EmbeddedDocumentBytesConfig(false);
+ }
+ break;
+ case "includeOriginal":
+ config.setIncludeOriginal(jParser.nextBooleanValue());
+ break;
+ case "emitter":
+ config.setEmitter(jParser.nextTextValue());
+ break;
+ default:
+ throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
+ "' in the context of an embeddedDocumentBytesConfig");
+ }
+ fieldName = jParser.nextFieldName();
+ }
+ return EmbeddedDocumentBytesConfig.SKIP;
}
private static HandlerConfig getHandlerConfig(JsonParser jParser) throws IOException {