You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/05/31 21:10:51 UTC

[tika] branch TIKA-3941 created (now 2fa4a2cd4)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-3941
in repository https://gitbox.apache.org/repos/asf/tika.git


      at 2fa4a2cd4 TIKA-3941 -- WIP ... more remains

This branch includes the following new commits:

     new 2fa4a2cd4 TIKA-3941 -- WIP ... more remains

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[tika] 01/01: TIKA-3941 -- WIP ... more remains

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3941
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 2fa4a2cd4004848841a85f9cae11f379fd097ac8
Author: tballison <ta...@apache.org>
AuthorDate: Wed May 31 17:10:41 2023 -0400

    TIKA-3941 -- WIP ... more remains
---
 .../org/apache/tika/parser/AutoDetectParser.java   |  4 ++
 .../java/org/apache/tika/pipes/PipesClient.java    | 62 +++++++++++++++++++--
 .../java/org/apache/tika/pipes/PipesResult.java    | 28 +++++++---
 .../java/org/apache/tika/pipes/PipesServer.java    | 65 +++++++++++++++++++++-
 .../org/apache/tika/pipes/async/AsyncConfig.java   | 10 ++++
 .../apache/tika/pipes/async/AsyncProcessor.java    | 14 ++++-
 .../tika/pipes/async/AsyncProcessorTest.java       | 61 ++++++++++++++++----
 .../org/apache/tika/pipes/async/MockReporter.java  |  6 +-
 8 files changed, 220 insertions(+), 30 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
index 45e972c20..d333c2e9a 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
@@ -157,6 +157,10 @@ public class AutoDetectParser extends CompositeParser {
         this.autoDetectParserConfig = autoDetectParserConfig;
     }
 
+    public AutoDetectParserConfig getAutoDetectParserConfig() {
+        return this.autoDetectParserConfig;
+    }
+
     public void parse(InputStream stream, ContentHandler handler, Metadata metadata,
                       ParseContext context) throws IOException, SAXException, TikaException {
         if (autoDetectParserConfig.getMetadataWriteFilterFactory() != null) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 7a4e6eecf..3db897f79 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -30,6 +30,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.ExecutionException;
@@ -45,7 +46,9 @@ import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.utils.ProcessUtils;
 import org.apache.tika.utils.StringUtils;
 
@@ -145,6 +148,7 @@ public class PipesClient implements Closeable {
 
     private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedException {
         long start = System.currentTimeMillis();
+        final PipesResult[] intermediateResult = new PipesResult[1];
         FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
 
             UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
@@ -167,6 +171,10 @@ public class PipesClient implements Closeable {
                 throw new InterruptedException("thread interrupt");
             }
             PipesResult result = readResults(t, start);
+            while (result.getStatus().equals(PipesResult.STATUS.INTERMEDIATE_RESULT)) {
+                intermediateResult[0] = result;
+                result = readResults(t, start);
+            }
             if (LOG.isDebugEnabled()) {
                 long elapsed = System.currentTimeMillis() - readStart;
                 LOG.debug("finished reading result in {} ms", elapsed);
@@ -177,6 +185,9 @@ public class PipesClient implements Closeable {
                         pipesClientId,
                         System.currentTimeMillis() - readStart);
             }
+            if (result.getStatus() == PipesResult.STATUS.OOM) {
+                return buildFatalResult(result, intermediateResult);
+            }
             return result;
         });
 
@@ -197,7 +208,7 @@ public class PipesClient implements Closeable {
             if (!process.isAlive() && TIMEOUT_EXIT_CODE == process.exitValue()) {
                 LOG.warn("pipesClientId={} server timeout: {} in {} ms", pipesClientId, t.getId(),
                         elapsed);
-                return PipesResult.TIMEOUT;
+                return buildFatalResult(PipesResult.TIMEOUT, intermediateResult);
             }
             process.waitFor(500, TimeUnit.MILLISECONDS);
             if (process.isAlive()) {
@@ -207,18 +218,34 @@ public class PipesClient implements Closeable {
                 LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}", pipesClientId,
                         t.getId(), elapsed, process.exitValue());
             }
-            return PipesResult.UNSPECIFIED_CRASH;
+            return buildFatalResult(PipesResult.UNSPECIFIED_CRASH, intermediateResult);
         } catch (TimeoutException e) {
             long elapsed = System.currentTimeMillis() - start;
             destroyForcibly();
             LOG.warn("pipesClientId={} client timeout: {} in {} ms", pipesClientId, t.getId(),
                     elapsed);
-            return PipesResult.TIMEOUT;
+            return buildFatalResult(PipesResult.TIMEOUT, intermediateResult);
         } finally {
             futureTask.cancel(true);
         }
     }
 
+    private PipesResult buildFatalResult(PipesResult result,
+                                         PipesResult[] intermediateResult) {
+
+        if (intermediateResult[0] == null) {
+            return result;
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("intermediate2 result: {}", intermediateResult[0].getEmitData());
+            }
+            intermediateResult[0].getEmitData().getMetadataList().get(0).set("PipesResult",
+                    result.getStatus().toString());
+            return new PipesResult(result.getStatus(),
+                    intermediateResult[0].getEmitData(), true);
+        }
+    }
+
     private void pauseThenDestroy() throws InterruptedException {
         //wait just a little bit to let process end to get exit value
         //if there's a timeout on the server side
@@ -259,9 +286,11 @@ public class PipesClient implements Closeable {
         try {
             status = lookup(statusByte);
         } catch (IllegalArgumentException e) {
-            throw new IOException("problem reading response from server "
-                    + String.format(Locale.US, "%02x", statusByte),
-                    e);
+            String byteString = "-1";
+            if (statusByte > -1) {
+                byteString = String.format(Locale.US, "%02x", (byte)statusByte);
+            }
+            throw new IOException("problem reading response from server: " + byteString, e);
         }
 
         switch (status) {
@@ -292,6 +321,10 @@ public class PipesClient implements Closeable {
                 LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(),
                         millis);
                 return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
+            case INTERMEDIATE_RESULT:
+                LOG.debug("pipesClientId={} intermediate success: {} in {} ms", pipesClientId,
+                        t.getId(), millis);
+                return deserializeIntermediateResult(t.getEmitKey());
             case PARSE_SUCCESS:
                 //there may have been a parse exception, but the parse didn't crash
                 LOG.debug("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
@@ -349,6 +382,23 @@ public class PipesClient implements Closeable {
         }
     }
 
+    private PipesResult deserializeIntermediateResult(EmitKey emitKey) throws IOException {
+
+        int length = input.readInt();
+        byte[] bytes = new byte[length];
+        input.readFully(bytes);
+        try (ObjectInputStream objectInputStream = new ObjectInputStream(
+                new UnsynchronizedByteArrayInputStream(bytes))) {
+            Metadata metadata = (Metadata) objectInputStream.readObject();
+            EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata));
+            return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitData, true);
+        } catch (ClassNotFoundException e) {
+            LOG.error("class not found exception deserializing data", e);
+            //this should be catastrophic
+            throw new RuntimeException(e);
+        }
+    }
+
     private void restart() throws IOException, InterruptedException, TimeoutException {
         if (process != null) {
             LOG.debug("process still alive; trying to destroy it");
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index ace4f3724..639bfc437 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -20,6 +20,8 @@ import org.apache.tika.pipes.emitter.EmitData;
 
 public class PipesResult {
 
+    private boolean intermediate = false;
+
     public enum STATUS {
         CLIENT_UNAVAILABLE_WITHIN_MS,
         FETCHER_INITIALIZATION_EXCEPTION,
@@ -32,7 +34,8 @@ public class PipesResult {
         OOM, TIMEOUT, UNSPECIFIED_CRASH,
         NO_EMITTER_FOUND,
         EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
-        INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND;
+        INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND,
+        INTERMEDIATE_RESULT;
     }
 
     public static final PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -48,18 +51,19 @@ public class PipesResult {
     private final EmitData emitData;
     private final String message;
 
-    private PipesResult(STATUS status, EmitData emitData, String message) {
+    private PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) {
         this.status = status;
         this.emitData = emitData;
         this.message = message;
+        this.intermediate = intermediate;
     }
 
     public PipesResult(STATUS status) {
-        this(status, null, null);
+        this(status, null, null, false);
     }
 
     public PipesResult(STATUS status, String message) {
-        this(status, null, message);
+        this(status, null, message, false);
     }
 
     /**
@@ -68,7 +72,11 @@ public class PipesResult {
      * @param emitData
      */
     public PipesResult(EmitData emitData) {
-        this(STATUS.PARSE_SUCCESS, emitData, null);
+        this(STATUS.PARSE_SUCCESS, emitData, null, false);
+    }
+
+    public PipesResult(STATUS status, EmitData emitData, boolean intermediate) {
+        this(status, emitData, null, intermediate);
     }
 
     /**
@@ -79,7 +87,7 @@ public class PipesResult {
      * @param message
      */
     public PipesResult(EmitData emitData, String message) {
-        this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message);
+        this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message, false);
     }
 
     public STATUS getStatus() {
@@ -94,9 +102,13 @@ public class PipesResult {
         return message;
     }
 
+    public boolean isIntermediate() {
+        return intermediate;
+    }
+
     @Override
     public String toString() {
-        return "PipesResult{" + "status=" + status + ", emitData=" + emitData + ", message='" +
-                message + '\'' + '}';
+        return "PipesResult{" + "intermediate=" + intermediate + ", status=" + status +
+                ", emitData=" + emitData + ", message='" + message + '\'' + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index c97a4e39e..374cf7bc1 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -24,6 +24,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
@@ -37,12 +38,17 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 
 import org.apache.tika.config.TikaConfig;
+import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
@@ -76,6 +82,9 @@ public class PipesServer implements Runnable {
     //this has to be some number not close to 0-3
     //it looks like the server crashes with exit value 3 on OOM, for example
     public static final int TIMEOUT_EXIT_CODE = 17;
+    private DigestingParser.Digester digester;
+
+    private Detector detector;
 
     public enum STATUS {
         READY,
@@ -93,7 +102,8 @@ public class PipesServer implements Runnable {
         EMIT_EXCEPTION,
         OOM,
         TIMEOUT,
-        EMPTY_OUTPUT;
+        EMPTY_OUTPUT,
+        INTERMEDIATE_RESULT;
 
         byte getByte() {
             return (byte) (ordinal() + 1);
@@ -544,6 +554,7 @@ public class PipesServer implements Runnable {
                 handlerConfig.getMaxEmbeddedResources());
         ParseContext parseContext = new ParseContext();
         long start = System.currentTimeMillis();
+        preParse(fetchEmitTuple, stream, metadata, parseContext);
         try {
             rMetaParser.parse(stream, handler, metadata, parseContext);
         } catch (SAXException e) {
@@ -563,6 +574,39 @@ public class PipesServer implements Runnable {
         return handler.getMetadataList();
     }
 
+    private void preParse(FetchEmitTuple t, InputStream stream, Metadata metadata,
+                          ParseContext parseContext) {
+        try (TemporaryResources temporaryResources = new TemporaryResources()) {
+            TikaInputStream tis = TikaInputStream.cast(stream);
+            if (tis == null) {
+                tis = TikaInputStream.get(stream, temporaryResources, metadata);
+            }
+            _preParse(t.getId(), tis, metadata, parseContext);
+        } catch (IOException e) {
+            LOG.warn("something went wrong in pre-parse casting of the inputstream to a " +
+                    "TikaInputStream", e);
+            return;
+        }
+        writeIntermediate(t.getEmitKey(), metadata);
+    }
+
+    private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+                           ParseContext parseContext) {
+        if (digester != null) {
+            try (InputStream is = Files.newInputStream(tis.getPath())) {
+                digester.digest(is, metadata, parseContext);
+            } catch (IOException e) {
+                LOG.warn("problem digesting: " + id, e);
+            }
+        }
+        try {
+            MediaType mt = detector.detect(tis, metadata);
+            metadata.set(Metadata.CONTENT_TYPE, mt.toString());
+        } catch (IOException e) {
+            LOG.warn("problem detecting: " + id, e);
+        }
+    }
+
     private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
         for (String n : userMetadata.names()) {
             //overwrite whatever was there
@@ -609,10 +653,29 @@ public class PipesServer implements Runnable {
         this.fetcherManager = FetcherManager.load(tikaConfigPath);
         this.emitterManager = EmitterManager.load(tikaConfigPath);
         this.autoDetectParser = new AutoDetectParser(this.tikaConfig);
+        if (((AutoDetectParser)autoDetectParser).getAutoDetectParserConfig().getDigesterFactory() != null) {
+            this.digester = ((AutoDetectParser) autoDetectParser).
+                    getAutoDetectParserConfig().getDigesterFactory().build();
+        }
+        this.detector = ((AutoDetectParser)this.autoDetectParser).getDetector();
         this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
     }
 
 
+    private void writeIntermediate(EmitKey emitKey, Metadata metadata) {
+        EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata));
+        try {
+            UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
+            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+                objectOutputStream.writeObject(metadata);
+            }
+            write(STATUS.INTERMEDIATE_RESULT, bos.toByteArray());
+        } catch (IOException e) {
+            LOG.error("problem writing intermediate data (forking process shutdown?)", e);
+            exit(1);
+        }
+    }
+
     private void write(EmitData emitData) {
         try {
             UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 51b52af66..bc55cca5d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -33,6 +33,8 @@ public class AsyncConfig extends PipesConfigBase {
     private int queueSize = 10000;
     private int numEmitters = 1;
 
+    private boolean emitIntermediateResults = false;
+
     private PipesReporter pipesReporter = PipesReporter.NO_OP_REPORTER;
 
     public static AsyncConfig load(Path p) throws IOException, TikaConfigException {
@@ -107,4 +109,12 @@ public class AsyncConfig extends PipesConfigBase {
     public void setPipesReporter(PipesReporter pipesReporter) {
         this.pipesReporter = pipesReporter;
     }
+
+    public void setEmitIntermediateResults(boolean emitIntermediateResults) {
+        this.emitIntermediateResults = emitIntermediateResults;
+    }
+
+    public boolean isEmitIntermediateResults() {
+        return emitIntermediateResults;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 17292c047..3f51ef6ab 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -302,8 +302,9 @@ public class AsyncProcessor implements Closeable {
                                     System.currentTimeMillis() - start);
                         }
                         long offerStart = System.currentTimeMillis();
-                        if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
-                                result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
+
+                        if (shouldEmit(result)) {
+                            LOG.debug("adding result to emitter queue: " + result.getEmitData());
                             boolean offered = emitDataQueue.offer(result.getEmitData(),
                                     MAX_OFFER_WAIT_MS,
                                     TimeUnit.MILLISECONDS);
@@ -323,5 +324,14 @@ public class AsyncProcessor implements Closeable {
                 }
             }
         }
+
+        private boolean shouldEmit(PipesResult result) {
+
+            if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
+                    result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
+                return true;
+            }
+            return result.isIntermediate() && asyncConfig.isEmitIntermediateResults();
+        }
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 4104866c7..6c7748740 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -27,12 +27,12 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
@@ -60,8 +60,6 @@ public class AsyncProcessorTest {
 
     private final int totalFiles = 100;
 
-    private Path tikaConfigPath;
-
     @TempDir
     private Path inputDir;
 
@@ -74,9 +72,12 @@ public class AsyncProcessorTest {
     private int crash = 0;
 
 
-    @BeforeEach
-    public void setUp() throws SQLException, IOException {
-        tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml");
+    public Path setUp(boolean emitIntermediateResults) throws SQLException, IOException {
+        ok = 0;
+        oom = 0;
+        timeouts = 0;
+        crash = 0;
+        Path tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml");
         String xml =
                 "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" + "  <emitters>" +
                 "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
@@ -86,7 +87,12 @@ public class AsyncProcessorTest {
                 "      <name>mock</name>\n" + "      <basePath>" +
                 ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
                 "</basePath>\n" + "    </fetcher>" + "  </fetchers>" +
-                        "<async><tikaConfig>" +
+
+
+                "<async><pipesReporter class=\"org.apache.tika.pipes.async.MockReporter\"/>" +
+                        "<emitIntermediateResults>" + emitIntermediateResults +
+                        "</emitIntermediateResults>" +
+                        "<tikaConfig>" +
                         ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
                         "</tikaConfig><forkedJvmArgs><arg>-Xmx512m</arg" +
                         "></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
@@ -101,16 +107,19 @@ public class AsyncProcessorTest {
                 Files.write(inputDir.resolve(i + ".xml"), OOM.getBytes(StandardCharsets.UTF_8));
                 oom++;
             } else if (f < 0.10) {
-                Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8));
-                timeouts++;
-            } else if (f < 0.15) {
                 Files.write(inputDir.resolve(i + ".xml"), SYSTEM_EXIT.getBytes(StandardCharsets.UTF_8));
                 crash++;
+            } else if (f < 0.13) {
+                Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8));
+                timeouts++;
             } else {
                 Files.write(inputDir.resolve(i + ".xml"), OK.getBytes(StandardCharsets.UTF_8));
                 ok++;
             }
         }
+        MockEmitter.EMIT_DATA.clear();
+        MockReporter.RESULTS.clear();
+        return tikaConfigPath;
     }
 
 /*
@@ -128,9 +137,9 @@ public class AsyncProcessorTest {
 
     @Test
     public void testBasic() throws Exception {
-        AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+        AsyncProcessor processor = new AsyncProcessor(setUp(false));
         for (int i = 0; i < totalFiles; i++) {
-            FetchEmitTuple t = new FetchEmitTuple("myId",
+            FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
                     new FetchKey("mock", i + ".xml"),
                     new EmitKey("mock", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
@@ -148,5 +157,33 @@ public class AsyncProcessorTest {
             emitKeys.add(d.getEmitKey().getEmitKey());
         }
         assertEquals(ok, emitKeys.size());
+        assertEquals(100, MockReporter.RESULTS.size());
+        for (PipesResult r : MockReporter.RESULTS) {
+            assertEquals("application/mock+xml",
+                    r.getEmitData().getMetadataList().get(0).get(Metadata.CONTENT_TYPE));
+        }
+    }
+
+    @Test
+    public void testEmitIntermediate() throws Exception {
+        AsyncProcessor processor = new AsyncProcessor(setUp(true));
+        for (int i = 0; i < totalFiles; i++) {
+            FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey("mock", i + ".xml"),
+                    new EmitKey("mock", "emit-" + i), new Metadata());
+            processor.offer(t, 1000);
+        }
+        for (int i = 0; i < 10; i++) {
+            processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000);
+        }
+        //TODO clean this up
+        while (processor.checkActive()) {
+            Thread.sleep(100);
+        }
+        processor.close();
+        Set<String> emitKeys = new HashSet<>();
+        for (EmitData d : MockEmitter.EMIT_DATA) {
+            emitKeys.add(d.getEmitKey().getEmitKey());
+        }
+        assertEquals(totalFiles, emitKeys.size());
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index b8197bd82..6e8308c89 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.tika.pipes.async;
 
+import java.util.concurrent.ArrayBlockingQueue;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.PipesReporter;
@@ -23,11 +25,13 @@ import org.apache.tika.pipes.PipesResult;
 
 public class MockReporter extends PipesReporter {
 
+    static ArrayBlockingQueue<PipesResult> RESULTS = new ArrayBlockingQueue<>(10000);
+
     private String endpoint;
 
     @Override
     public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
-
+        RESULTS.add(result);
     }
 
     @Override