You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/05/31 21:10:52 UTC
[tika] 01/01: TIKA-3941 -- WIP ... more remains
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3941
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 2fa4a2cd4004848841a85f9cae11f379fd097ac8
Author: tballison <ta...@apache.org>
AuthorDate: Wed May 31 17:10:41 2023 -0400
TIKA-3941 -- WIP ... more remains
---
.../org/apache/tika/parser/AutoDetectParser.java | 4 ++
.../java/org/apache/tika/pipes/PipesClient.java | 62 +++++++++++++++++++--
.../java/org/apache/tika/pipes/PipesResult.java | 28 +++++++---
.../java/org/apache/tika/pipes/PipesServer.java | 65 +++++++++++++++++++++-
.../org/apache/tika/pipes/async/AsyncConfig.java | 10 ++++
.../apache/tika/pipes/async/AsyncProcessor.java | 14 ++++-
.../tika/pipes/async/AsyncProcessorTest.java | 61 ++++++++++++++++----
.../org/apache/tika/pipes/async/MockReporter.java | 6 +-
8 files changed, 220 insertions(+), 30 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
index 45e972c20..d333c2e9a 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
@@ -157,6 +157,10 @@ public class AutoDetectParser extends CompositeParser {
this.autoDetectParserConfig = autoDetectParserConfig;
}
+ public AutoDetectParserConfig getAutoDetectParserConfig() {
+ return this.autoDetectParserConfig;
+ }
+
public void parse(InputStream stream, ContentHandler handler, Metadata metadata,
ParseContext context) throws IOException, SAXException, TikaException {
if (autoDetectParserConfig.getMetadataWriteFilterFactory() != null) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 7a4e6eecf..3db897f79 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -30,6 +30,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
@@ -45,7 +46,9 @@ import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.utils.ProcessUtils;
import org.apache.tika.utils.StringUtils;
@@ -145,6 +148,7 @@ public class PipesClient implements Closeable {
private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedException {
long start = System.currentTimeMillis();
+ final PipesResult[] intermediateResult = new PipesResult[1];
FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
@@ -167,6 +171,10 @@ public class PipesClient implements Closeable {
throw new InterruptedException("thread interrupt");
}
PipesResult result = readResults(t, start);
+ while (result.getStatus().equals(PipesResult.STATUS.INTERMEDIATE_RESULT)) {
+ intermediateResult[0] = result;
+ result = readResults(t, start);
+ }
if (LOG.isDebugEnabled()) {
long elapsed = System.currentTimeMillis() - readStart;
LOG.debug("finished reading result in {} ms", elapsed);
@@ -177,6 +185,9 @@ public class PipesClient implements Closeable {
pipesClientId,
System.currentTimeMillis() - readStart);
}
+ if (result.getStatus() == PipesResult.STATUS.OOM) {
+ return buildFatalResult(result, intermediateResult);
+ }
return result;
});
@@ -197,7 +208,7 @@ public class PipesClient implements Closeable {
if (!process.isAlive() && TIMEOUT_EXIT_CODE == process.exitValue()) {
LOG.warn("pipesClientId={} server timeout: {} in {} ms", pipesClientId, t.getId(),
elapsed);
- return PipesResult.TIMEOUT;
+ return buildFatalResult(PipesResult.TIMEOUT, intermediateResult);
}
process.waitFor(500, TimeUnit.MILLISECONDS);
if (process.isAlive()) {
@@ -207,18 +218,34 @@ public class PipesClient implements Closeable {
LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}", pipesClientId,
t.getId(), elapsed, process.exitValue());
}
- return PipesResult.UNSPECIFIED_CRASH;
+ return buildFatalResult(PipesResult.UNSPECIFIED_CRASH, intermediateResult);
} catch (TimeoutException e) {
long elapsed = System.currentTimeMillis() - start;
destroyForcibly();
LOG.warn("pipesClientId={} client timeout: {} in {} ms", pipesClientId, t.getId(),
elapsed);
- return PipesResult.TIMEOUT;
+ return buildFatalResult(PipesResult.TIMEOUT, intermediateResult);
} finally {
futureTask.cancel(true);
}
}
+ private PipesResult buildFatalResult(PipesResult result,
+ PipesResult[] intermediateResult) {
+
+ if (intermediateResult[0] == null) {
+ return result;
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("intermediate2 result: {}", intermediateResult[0].getEmitData());
+ }
+ intermediateResult[0].getEmitData().getMetadataList().get(0).set("PipesResult",
+ result.getStatus().toString());
+ return new PipesResult(result.getStatus(),
+ intermediateResult[0].getEmitData(), true);
+ }
+ }
+
private void pauseThenDestroy() throws InterruptedException {
//wait just a little bit to let process end to get exit value
//if there's a timeout on the server side
@@ -259,9 +286,11 @@ public class PipesClient implements Closeable {
try {
status = lookup(statusByte);
} catch (IllegalArgumentException e) {
- throw new IOException("problem reading response from server "
- + String.format(Locale.US, "%02x", statusByte),
- e);
+ String byteString = "-1";
+ if (statusByte > -1) {
+ byteString = String.format(Locale.US, "%02x", (byte)statusByte);
+ }
+ throw new IOException("problem reading response from server: " + byteString, e);
}
switch (status) {
@@ -292,6 +321,10 @@ public class PipesClient implements Closeable {
LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(),
millis);
return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
+ case INTERMEDIATE_RESULT:
+ LOG.debug("pipesClientId={} intermediate success: {} in {} ms", pipesClientId,
+ t.getId(), millis);
+ return deserializeIntermediateResult(t.getEmitKey());
case PARSE_SUCCESS:
//there may have been a parse exception, but the parse didn't crash
LOG.debug("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
@@ -349,6 +382,23 @@ public class PipesClient implements Closeable {
}
}
+ private PipesResult deserializeIntermediateResult(EmitKey emitKey) throws IOException {
+
+ int length = input.readInt();
+ byte[] bytes = new byte[length];
+ input.readFully(bytes);
+ try (ObjectInputStream objectInputStream = new ObjectInputStream(
+ new UnsynchronizedByteArrayInputStream(bytes))) {
+ Metadata metadata = (Metadata) objectInputStream.readObject();
+ EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata));
+ return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitData, true);
+ } catch (ClassNotFoundException e) {
+ LOG.error("class not found exception deserializing data", e);
+ //this should be catastrophic
+ throw new RuntimeException(e);
+ }
+ }
+
private void restart() throws IOException, InterruptedException, TimeoutException {
if (process != null) {
LOG.debug("process still alive; trying to destroy it");
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index ace4f3724..639bfc437 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -20,6 +20,8 @@ import org.apache.tika.pipes.emitter.EmitData;
public class PipesResult {
+ private boolean intermediate = false;
+
public enum STATUS {
CLIENT_UNAVAILABLE_WITHIN_MS,
FETCHER_INITIALIZATION_EXCEPTION,
@@ -32,7 +34,8 @@ public class PipesResult {
OOM, TIMEOUT, UNSPECIFIED_CRASH,
NO_EMITTER_FOUND,
EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
- INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND;
+ INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND,
+ INTERMEDIATE_RESULT;
}
public static final PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -48,18 +51,19 @@ public class PipesResult {
private final EmitData emitData;
private final String message;
- private PipesResult(STATUS status, EmitData emitData, String message) {
+ private PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) {
this.status = status;
this.emitData = emitData;
this.message = message;
+ this.intermediate = intermediate;
}
public PipesResult(STATUS status) {
- this(status, null, null);
+ this(status, null, null, false);
}
public PipesResult(STATUS status, String message) {
- this(status, null, message);
+ this(status, null, message, false);
}
/**
@@ -68,7 +72,11 @@ public class PipesResult {
* @param emitData
*/
public PipesResult(EmitData emitData) {
- this(STATUS.PARSE_SUCCESS, emitData, null);
+ this(STATUS.PARSE_SUCCESS, emitData, null, false);
+ }
+
+ public PipesResult(STATUS status, EmitData emitData, boolean intermediate) {
+ this(status, emitData, null, intermediate);
}
/**
@@ -79,7 +87,7 @@ public class PipesResult {
* @param message
*/
public PipesResult(EmitData emitData, String message) {
- this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message);
+ this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message, false);
}
public STATUS getStatus() {
@@ -94,9 +102,13 @@ public class PipesResult {
return message;
}
+ public boolean isIntermediate() {
+ return intermediate;
+ }
+
@Override
public String toString() {
- return "PipesResult{" + "status=" + status + ", emitData=" + emitData + ", message='" +
- message + '\'' + '}';
+ return "PipesResult{" + "intermediate=" + intermediate + ", status=" + status +
+ ", emitData=" + emitData + ", message='" + message + '\'' + '}';
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index c97a4e39e..374cf7bc1 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -24,6 +24,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
@@ -37,12 +38,17 @@ import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
+import org.apache.tika.detect.Detector;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.mime.MediaType;
import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.DigestingParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.RecursiveParserWrapper;
@@ -76,6 +82,9 @@ public class PipesServer implements Runnable {
//this has to be some number not close to 0-3
//it looks like the server crashes with exit value 3 on OOM, for example
public static final int TIMEOUT_EXIT_CODE = 17;
+ private DigestingParser.Digester digester;
+
+ private Detector detector;
public enum STATUS {
READY,
@@ -93,7 +102,8 @@ public class PipesServer implements Runnable {
EMIT_EXCEPTION,
OOM,
TIMEOUT,
- EMPTY_OUTPUT;
+ EMPTY_OUTPUT,
+ INTERMEDIATE_RESULT;
byte getByte() {
return (byte) (ordinal() + 1);
@@ -544,6 +554,7 @@ public class PipesServer implements Runnable {
handlerConfig.getMaxEmbeddedResources());
ParseContext parseContext = new ParseContext();
long start = System.currentTimeMillis();
+ preParse(fetchEmitTuple, stream, metadata, parseContext);
try {
rMetaParser.parse(stream, handler, metadata, parseContext);
} catch (SAXException e) {
@@ -563,6 +574,39 @@ public class PipesServer implements Runnable {
return handler.getMetadataList();
}
+ private void preParse(FetchEmitTuple t, InputStream stream, Metadata metadata,
+ ParseContext parseContext) {
+ try (TemporaryResources temporaryResources = new TemporaryResources()) {
+ TikaInputStream tis = TikaInputStream.cast(stream);
+ if (tis == null) {
+ tis = TikaInputStream.get(stream, temporaryResources, metadata);
+ }
+ _preParse(t.getId(), tis, metadata, parseContext);
+ } catch (IOException e) {
+ LOG.warn("something went wrong in pre-parse casting of the inputstream to a " +
+ "TikaInputStream", e);
+ return;
+ }
+ writeIntermediate(t.getEmitKey(), metadata);
+ }
+
+ private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+ ParseContext parseContext) {
+ if (digester != null) {
+ try (InputStream is = Files.newInputStream(tis.getPath())) {
+ digester.digest(is, metadata, parseContext);
+ } catch (IOException e) {
+ LOG.warn("problem digesting: " + id, e);
+ }
+ }
+ try {
+ MediaType mt = detector.detect(tis, metadata);
+ metadata.set(Metadata.CONTENT_TYPE, mt.toString());
+ } catch (IOException e) {
+ LOG.warn("problem detecting: " + id, e);
+ }
+ }
+
private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
for (String n : userMetadata.names()) {
//overwrite whatever was there
@@ -609,10 +653,29 @@ public class PipesServer implements Runnable {
this.fetcherManager = FetcherManager.load(tikaConfigPath);
this.emitterManager = EmitterManager.load(tikaConfigPath);
this.autoDetectParser = new AutoDetectParser(this.tikaConfig);
+ if (((AutoDetectParser)autoDetectParser).getAutoDetectParserConfig().getDigesterFactory() != null) {
+ this.digester = ((AutoDetectParser) autoDetectParser).
+ getAutoDetectParserConfig().getDigesterFactory().build();
+ }
+ this.detector = ((AutoDetectParser)this.autoDetectParser).getDetector();
this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
}
+ private void writeIntermediate(EmitKey emitKey, Metadata metadata) {
+ EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata));
+ try {
+ UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
+ try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(metadata);
+ }
+ write(STATUS.INTERMEDIATE_RESULT, bos.toByteArray());
+ } catch (IOException e) {
+ LOG.error("problem writing intermediate data (forking process shutdown?)", e);
+ exit(1);
+ }
+ }
+
private void write(EmitData emitData) {
try {
UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 51b52af66..bc55cca5d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -33,6 +33,8 @@ public class AsyncConfig extends PipesConfigBase {
private int queueSize = 10000;
private int numEmitters = 1;
+ private boolean emitIntermediateResults = false;
+
private PipesReporter pipesReporter = PipesReporter.NO_OP_REPORTER;
public static AsyncConfig load(Path p) throws IOException, TikaConfigException {
@@ -107,4 +109,12 @@ public class AsyncConfig extends PipesConfigBase {
public void setPipesReporter(PipesReporter pipesReporter) {
this.pipesReporter = pipesReporter;
}
+
+ public void setEmitIntermediateResults(boolean emitIntermediateResults) {
+ this.emitIntermediateResults = emitIntermediateResults;
+ }
+
+ public boolean isEmitIntermediateResults() {
+ return emitIntermediateResults;
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 17292c047..3f51ef6ab 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -302,8 +302,9 @@ public class AsyncProcessor implements Closeable {
System.currentTimeMillis() - start);
}
long offerStart = System.currentTimeMillis();
- if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
- result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
+
+ if (shouldEmit(result)) {
+ LOG.debug("adding result to emitter queue: " + result.getEmitData());
boolean offered = emitDataQueue.offer(result.getEmitData(),
MAX_OFFER_WAIT_MS,
TimeUnit.MILLISECONDS);
@@ -323,5 +324,14 @@ public class AsyncProcessor implements Closeable {
}
}
}
+
+ private boolean shouldEmit(PipesResult result) {
+
+ if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
+ result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
+ return true;
+ }
+ return result.isIntermediate() && asyncConfig.isEmitIntermediateResults();
+ }
}
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 4104866c7..6c7748740 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -27,12 +27,12 @@ import java.util.HashSet;
import java.util.Random;
import java.util.Set;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
@@ -60,8 +60,6 @@ public class AsyncProcessorTest {
private final int totalFiles = 100;
- private Path tikaConfigPath;
-
@TempDir
private Path inputDir;
@@ -74,9 +72,12 @@ public class AsyncProcessorTest {
private int crash = 0;
- @BeforeEach
- public void setUp() throws SQLException, IOException {
- tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml");
+ public Path setUp(boolean emitIntermediateResults) throws SQLException, IOException {
+ ok = 0;
+ oom = 0;
+ timeouts = 0;
+ crash = 0;
+ Path tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml");
String xml =
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" + " <emitters>" +
" <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
@@ -86,7 +87,12 @@ public class AsyncProcessorTest {
" <name>mock</name>\n" + " <basePath>" +
ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
"</basePath>\n" + " </fetcher>" + " </fetchers>" +
- "<async><tikaConfig>" +
+
+
+ "<async><pipesReporter class=\"org.apache.tika.pipes.async.MockReporter\"/>" +
+ "<emitIntermediateResults>" + emitIntermediateResults +
+ "</emitIntermediateResults>" +
+ "<tikaConfig>" +
ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
"</tikaConfig><forkedJvmArgs><arg>-Xmx512m</arg" +
"></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
@@ -101,16 +107,19 @@ public class AsyncProcessorTest {
Files.write(inputDir.resolve(i + ".xml"), OOM.getBytes(StandardCharsets.UTF_8));
oom++;
} else if (f < 0.10) {
- Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8));
- timeouts++;
- } else if (f < 0.15) {
Files.write(inputDir.resolve(i + ".xml"), SYSTEM_EXIT.getBytes(StandardCharsets.UTF_8));
crash++;
+ } else if (f < 0.13) {
+ Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8));
+ timeouts++;
} else {
Files.write(inputDir.resolve(i + ".xml"), OK.getBytes(StandardCharsets.UTF_8));
ok++;
}
}
+ MockEmitter.EMIT_DATA.clear();
+ MockReporter.RESULTS.clear();
+ return tikaConfigPath;
}
/*
@@ -128,9 +137,9 @@ public class AsyncProcessorTest {
@Test
public void testBasic() throws Exception {
- AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+ AsyncProcessor processor = new AsyncProcessor(setUp(false));
for (int i = 0; i < totalFiles; i++) {
- FetchEmitTuple t = new FetchEmitTuple("myId",
+ FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
new FetchKey("mock", i + ".xml"),
new EmitKey("mock", "emit-" + i), new Metadata());
processor.offer(t, 1000);
@@ -148,5 +157,33 @@ public class AsyncProcessorTest {
emitKeys.add(d.getEmitKey().getEmitKey());
}
assertEquals(ok, emitKeys.size());
+ assertEquals(100, MockReporter.RESULTS.size());
+ for (PipesResult r : MockReporter.RESULTS) {
+ assertEquals("application/mock+xml",
+ r.getEmitData().getMetadataList().get(0).get(Metadata.CONTENT_TYPE));
+ }
+ }
+
+ @Test
+ public void testEmitIntermediate() throws Exception {
+ AsyncProcessor processor = new AsyncProcessor(setUp(true));
+ for (int i = 0; i < totalFiles; i++) {
+ FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey("mock", i + ".xml"),
+ new EmitKey("mock", "emit-" + i), new Metadata());
+ processor.offer(t, 1000);
+ }
+ for (int i = 0; i < 10; i++) {
+ processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000);
+ }
+ //TODO clean this up
+ while (processor.checkActive()) {
+ Thread.sleep(100);
+ }
+ processor.close();
+ Set<String> emitKeys = new HashSet<>();
+ for (EmitData d : MockEmitter.EMIT_DATA) {
+ emitKeys.add(d.getEmitKey().getEmitKey());
+ }
+ assertEquals(totalFiles, emitKeys.size());
}
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index b8197bd82..6e8308c89 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -16,6 +16,8 @@
*/
package org.apache.tika.pipes.async;
+import java.util.concurrent.ArrayBlockingQueue;
+
import org.apache.tika.config.Field;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesReporter;
@@ -23,11 +25,13 @@ import org.apache.tika.pipes.PipesResult;
public class MockReporter extends PipesReporter {
+ static ArrayBlockingQueue<PipesResult> RESULTS = new ArrayBlockingQueue<>(10000);
+
private String endpoint;
@Override
public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
-
+ RESULTS.add(result);
}
@Override