You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/05/31 21:10:52 UTC

[tika] 01/01: TIKA-3941 -- WIP ... more remains

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3941
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 2fa4a2cd4004848841a85f9cae11f379fd097ac8
Author: tballison <ta...@apache.org>
AuthorDate: Wed May 31 17:10:41 2023 -0400

    TIKA-3941 -- WIP ... more remains
---
 .../org/apache/tika/parser/AutoDetectParser.java   |  4 ++
 .../java/org/apache/tika/pipes/PipesClient.java    | 62 +++++++++++++++++++--
 .../java/org/apache/tika/pipes/PipesResult.java    | 28 +++++++---
 .../java/org/apache/tika/pipes/PipesServer.java    | 65 +++++++++++++++++++++-
 .../org/apache/tika/pipes/async/AsyncConfig.java   | 10 ++++
 .../apache/tika/pipes/async/AsyncProcessor.java    | 14 ++++-
 .../tika/pipes/async/AsyncProcessorTest.java       | 61 ++++++++++++++++----
 .../org/apache/tika/pipes/async/MockReporter.java  |  6 +-
 8 files changed, 220 insertions(+), 30 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
index 45e972c20..d333c2e9a 100644
--- a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
+++ b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java
@@ -157,6 +157,10 @@ public class AutoDetectParser extends CompositeParser {
         this.autoDetectParserConfig = autoDetectParserConfig;
     }
 
+    public AutoDetectParserConfig getAutoDetectParserConfig() {
+        return this.autoDetectParserConfig;
+    }
+
     public void parse(InputStream stream, ContentHandler handler, Metadata metadata,
                       ParseContext context) throws IOException, SAXException, TikaException {
         if (autoDetectParserConfig.getMetadataWriteFilterFactory() != null) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 7a4e6eecf..3db897f79 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -30,6 +30,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.ExecutionException;
@@ -45,7 +46,9 @@ import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.utils.ProcessUtils;
 import org.apache.tika.utils.StringUtils;
 
@@ -145,6 +148,7 @@ public class PipesClient implements Closeable {
 
     private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedException {
         long start = System.currentTimeMillis();
+        final PipesResult[] intermediateResult = new PipesResult[1];
         FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
 
             UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
@@ -167,6 +171,10 @@ public class PipesClient implements Closeable {
                 throw new InterruptedException("thread interrupt");
             }
             PipesResult result = readResults(t, start);
+            while (result.getStatus().equals(PipesResult.STATUS.INTERMEDIATE_RESULT)) {
+                intermediateResult[0] = result;
+                result = readResults(t, start);
+            }
             if (LOG.isDebugEnabled()) {
                 long elapsed = System.currentTimeMillis() - readStart;
                 LOG.debug("finished reading result in {} ms", elapsed);
@@ -177,6 +185,9 @@ public class PipesClient implements Closeable {
                         pipesClientId,
                         System.currentTimeMillis() - readStart);
             }
+            if (result.getStatus() == PipesResult.STATUS.OOM) {
+                return buildFatalResult(result, intermediateResult);
+            }
             return result;
         });
 
@@ -197,7 +208,7 @@ public class PipesClient implements Closeable {
             if (!process.isAlive() && TIMEOUT_EXIT_CODE == process.exitValue()) {
                 LOG.warn("pipesClientId={} server timeout: {} in {} ms", pipesClientId, t.getId(),
                         elapsed);
-                return PipesResult.TIMEOUT;
+                return buildFatalResult(PipesResult.TIMEOUT, intermediateResult);
             }
             process.waitFor(500, TimeUnit.MILLISECONDS);
             if (process.isAlive()) {
@@ -207,18 +218,34 @@ public class PipesClient implements Closeable {
                 LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}", pipesClientId,
                         t.getId(), elapsed, process.exitValue());
             }
-            return PipesResult.UNSPECIFIED_CRASH;
+            return buildFatalResult(PipesResult.UNSPECIFIED_CRASH, intermediateResult);
         } catch (TimeoutException e) {
             long elapsed = System.currentTimeMillis() - start;
             destroyForcibly();
             LOG.warn("pipesClientId={} client timeout: {} in {} ms", pipesClientId, t.getId(),
                     elapsed);
-            return PipesResult.TIMEOUT;
+            return buildFatalResult(PipesResult.TIMEOUT, intermediateResult);
         } finally {
             futureTask.cancel(true);
         }
     }
 
+    private PipesResult buildFatalResult(PipesResult result,
+                                         PipesResult[] intermediateResult) {
+
+        if (intermediateResult[0] == null) {
+            return result;
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("intermediate2 result: {}", intermediateResult[0].getEmitData());
+            }
+            intermediateResult[0].getEmitData().getMetadataList().get(0).set("PipesResult",
+                    result.getStatus().toString());
+            return new PipesResult(result.getStatus(),
+                    intermediateResult[0].getEmitData(), true);
+        }
+    }
+
     private void pauseThenDestroy() throws InterruptedException {
         //wait just a little bit to let process end to get exit value
         //if there's a timeout on the server side
@@ -259,9 +286,11 @@ public class PipesClient implements Closeable {
         try {
             status = lookup(statusByte);
         } catch (IllegalArgumentException e) {
-            throw new IOException("problem reading response from server "
-                    + String.format(Locale.US, "%02x", statusByte),
-                    e);
+            String byteString = "-1";
+            if (statusByte > -1) {
+                byteString = String.format(Locale.US, "%02x", (byte)statusByte);
+            }
+            throw new IOException("problem reading response from server: " + byteString, e);
         }
 
         switch (status) {
@@ -292,6 +321,10 @@ public class PipesClient implements Closeable {
                 LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(),
                         millis);
                 return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
+            case INTERMEDIATE_RESULT:
+                LOG.debug("pipesClientId={} intermediate success: {} in {} ms", pipesClientId,
+                        t.getId(), millis);
+                return deserializeIntermediateResult(t.getEmitKey());
             case PARSE_SUCCESS:
                 //there may have been a parse exception, but the parse didn't crash
                 LOG.debug("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
@@ -349,6 +382,23 @@ public class PipesClient implements Closeable {
         }
     }
 
+    private PipesResult deserializeIntermediateResult(EmitKey emitKey) throws IOException {
+
+        int length = input.readInt();
+        byte[] bytes = new byte[length];
+        input.readFully(bytes);
+        try (ObjectInputStream objectInputStream = new ObjectInputStream(
+                new UnsynchronizedByteArrayInputStream(bytes))) {
+            Metadata metadata = (Metadata) objectInputStream.readObject();
+            EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata));
+            return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitData, true);
+        } catch (ClassNotFoundException e) {
+            LOG.error("class not found exception deserializing data", e);
+            //this should be catastrophic
+            throw new RuntimeException(e);
+        }
+    }
+
     private void restart() throws IOException, InterruptedException, TimeoutException {
         if (process != null) {
             LOG.debug("process still alive; trying to destroy it");
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index ace4f3724..639bfc437 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -20,6 +20,8 @@ import org.apache.tika.pipes.emitter.EmitData;
 
 public class PipesResult {
 
+    private boolean intermediate = false;
+
     public enum STATUS {
         CLIENT_UNAVAILABLE_WITHIN_MS,
         FETCHER_INITIALIZATION_EXCEPTION,
@@ -32,7 +34,8 @@ public class PipesResult {
         OOM, TIMEOUT, UNSPECIFIED_CRASH,
         NO_EMITTER_FOUND,
         EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
-        INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND;
+        INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND,
+        INTERMEDIATE_RESULT;
     }
 
     public static final PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -48,18 +51,19 @@ public class PipesResult {
     private final EmitData emitData;
     private final String message;
 
-    private PipesResult(STATUS status, EmitData emitData, String message) {
+    private PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) {
         this.status = status;
         this.emitData = emitData;
         this.message = message;
+        this.intermediate = intermediate;
     }
 
     public PipesResult(STATUS status) {
-        this(status, null, null);
+        this(status, null, null, false);
     }
 
     public PipesResult(STATUS status, String message) {
-        this(status, null, message);
+        this(status, null, message, false);
     }
 
     /**
@@ -68,7 +72,11 @@ public class PipesResult {
      * @param emitData
      */
     public PipesResult(EmitData emitData) {
-        this(STATUS.PARSE_SUCCESS, emitData, null);
+        this(STATUS.PARSE_SUCCESS, emitData, null, false);
+    }
+
+    public PipesResult(STATUS status, EmitData emitData, boolean intermediate) {
+        this(status, emitData, null, intermediate);
     }
 
     /**
@@ -79,7 +87,7 @@ public class PipesResult {
      * @param message
      */
     public PipesResult(EmitData emitData, String message) {
-        this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message);
+        this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message, false);
     }
 
     public STATUS getStatus() {
@@ -94,9 +102,13 @@ public class PipesResult {
         return message;
     }
 
+    public boolean isIntermediate() {
+        return intermediate;
+    }
+
     @Override
     public String toString() {
-        return "PipesResult{" + "status=" + status + ", emitData=" + emitData + ", message='" +
-                message + '\'' + '}';
+        return "PipesResult{" + "intermediate=" + intermediate + ", status=" + status +
+                ", emitData=" + emitData + ", message='" + message + '\'' + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index c97a4e39e..374cf7bc1 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -24,6 +24,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
@@ -37,12 +38,17 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 
 import org.apache.tika.config.TikaConfig;
+import org.apache.tika.detect.Detector;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
@@ -76,6 +82,9 @@ public class PipesServer implements Runnable {
     //this has to be some number not close to 0-3
     //it looks like the server crashes with exit value 3 on OOM, for example
     public static final int TIMEOUT_EXIT_CODE = 17;
+    private DigestingParser.Digester digester;
+
+    private Detector detector;
 
     public enum STATUS {
         READY,
@@ -93,7 +102,8 @@ public class PipesServer implements Runnable {
         EMIT_EXCEPTION,
         OOM,
         TIMEOUT,
-        EMPTY_OUTPUT;
+        EMPTY_OUTPUT,
+        INTERMEDIATE_RESULT;
 
         byte getByte() {
             return (byte) (ordinal() + 1);
@@ -544,6 +554,7 @@ public class PipesServer implements Runnable {
                 handlerConfig.getMaxEmbeddedResources());
         ParseContext parseContext = new ParseContext();
         long start = System.currentTimeMillis();
+        preParse(fetchEmitTuple, stream, metadata, parseContext);
         try {
             rMetaParser.parse(stream, handler, metadata, parseContext);
         } catch (SAXException e) {
@@ -563,6 +574,39 @@ public class PipesServer implements Runnable {
         return handler.getMetadataList();
     }
 
+    private void preParse(FetchEmitTuple t, InputStream stream, Metadata metadata,
+                          ParseContext parseContext) {
+        try (TemporaryResources temporaryResources = new TemporaryResources()) {
+            TikaInputStream tis = TikaInputStream.cast(stream);
+            if (tis == null) {
+                tis = TikaInputStream.get(stream, temporaryResources, metadata);
+            }
+            _preParse(t.getId(), tis, metadata, parseContext);
+        } catch (IOException e) {
+            LOG.warn("something went wrong in pre-parse casting of the inputstream to a " +
+                    "TikaInputStream", e);
+            return;
+        }
+        writeIntermediate(t.getEmitKey(), metadata);
+    }
+
+    private void _preParse(String id, TikaInputStream tis, Metadata metadata,
+                           ParseContext parseContext) {
+        if (digester != null) {
+            try (InputStream is = Files.newInputStream(tis.getPath())) {
+                digester.digest(is, metadata, parseContext);
+            } catch (IOException e) {
+                LOG.warn("problem digesting: " + id, e);
+            }
+        }
+        try {
+            MediaType mt = detector.detect(tis, metadata);
+            metadata.set(Metadata.CONTENT_TYPE, mt.toString());
+        } catch (IOException e) {
+            LOG.warn("problem detecting: " + id, e);
+        }
+    }
+
     private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
         for (String n : userMetadata.names()) {
             //overwrite whatever was there
@@ -609,10 +653,29 @@ public class PipesServer implements Runnable {
         this.fetcherManager = FetcherManager.load(tikaConfigPath);
         this.emitterManager = EmitterManager.load(tikaConfigPath);
         this.autoDetectParser = new AutoDetectParser(this.tikaConfig);
+        if (((AutoDetectParser)autoDetectParser).getAutoDetectParserConfig().getDigesterFactory() != null) {
+            this.digester = ((AutoDetectParser) autoDetectParser).
+                    getAutoDetectParserConfig().getDigesterFactory().build();
+        }
+        this.detector = ((AutoDetectParser)this.autoDetectParser).getDetector();
         this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
     }
 
 
+    private void writeIntermediate(EmitKey emitKey, Metadata metadata) {
+        EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata));
+        try {
+            UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
+            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+                objectOutputStream.writeObject(metadata);
+            }
+            write(STATUS.INTERMEDIATE_RESULT, bos.toByteArray());
+        } catch (IOException e) {
+            LOG.error("problem writing intermediate data (forking process shutdown?)", e);
+            exit(1);
+        }
+    }
+
     private void write(EmitData emitData) {
         try {
             UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream();
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 51b52af66..bc55cca5d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -33,6 +33,8 @@ public class AsyncConfig extends PipesConfigBase {
     private int queueSize = 10000;
     private int numEmitters = 1;
 
+    private boolean emitIntermediateResults = false;
+
     private PipesReporter pipesReporter = PipesReporter.NO_OP_REPORTER;
 
     public static AsyncConfig load(Path p) throws IOException, TikaConfigException {
@@ -107,4 +109,12 @@ public class AsyncConfig extends PipesConfigBase {
     public void setPipesReporter(PipesReporter pipesReporter) {
         this.pipesReporter = pipesReporter;
     }
+
+    public void setEmitIntermediateResults(boolean emitIntermediateResults) {
+        this.emitIntermediateResults = emitIntermediateResults;
+    }
+
+    public boolean isEmitIntermediateResults() {
+        return emitIntermediateResults;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 17292c047..3f51ef6ab 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -302,8 +302,9 @@ public class AsyncProcessor implements Closeable {
                                     System.currentTimeMillis() - start);
                         }
                         long offerStart = System.currentTimeMillis();
-                        if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
-                                result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
+
+                        if (shouldEmit(result)) {
+                            LOG.debug("adding result to emitter queue: " + result.getEmitData());
                             boolean offered = emitDataQueue.offer(result.getEmitData(),
                                     MAX_OFFER_WAIT_MS,
                                     TimeUnit.MILLISECONDS);
@@ -323,5 +324,14 @@ public class AsyncProcessor implements Closeable {
                 }
             }
         }
+
+        private boolean shouldEmit(PipesResult result) {
+
+            if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||
+                    result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) {
+                return true;
+            }
+            return result.isIntermediate() && asyncConfig.isEmitIntermediateResults();
+        }
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 4104866c7..6c7748740 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -27,12 +27,12 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
@@ -60,8 +60,6 @@ public class AsyncProcessorTest {
 
     private final int totalFiles = 100;
 
-    private Path tikaConfigPath;
-
     @TempDir
     private Path inputDir;
 
@@ -74,9 +72,12 @@ public class AsyncProcessorTest {
     private int crash = 0;
 
 
-    @BeforeEach
-    public void setUp() throws SQLException, IOException {
-        tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml");
+    public Path setUp(boolean emitIntermediateResults) throws SQLException, IOException {
+        ok = 0;
+        oom = 0;
+        timeouts = 0;
+        crash = 0;
+        Path tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml");
         String xml =
                 "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" + "  <emitters>" +
                 "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
@@ -86,7 +87,12 @@ public class AsyncProcessorTest {
                 "      <name>mock</name>\n" + "      <basePath>" +
                 ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) +
                 "</basePath>\n" + "    </fetcher>" + "  </fetchers>" +
-                        "<async><tikaConfig>" +
+
+
+                "<async><pipesReporter class=\"org.apache.tika.pipes.async.MockReporter\"/>" +
+                        "<emitIntermediateResults>" + emitIntermediateResults +
+                        "</emitIntermediateResults>" +
+                        "<tikaConfig>" +
                         ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) +
                         "</tikaConfig><forkedJvmArgs><arg>-Xmx512m</arg" +
                         "></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" +
@@ -101,16 +107,19 @@ public class AsyncProcessorTest {
                 Files.write(inputDir.resolve(i + ".xml"), OOM.getBytes(StandardCharsets.UTF_8));
                 oom++;
             } else if (f < 0.10) {
-                Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8));
-                timeouts++;
-            } else if (f < 0.15) {
                 Files.write(inputDir.resolve(i + ".xml"), SYSTEM_EXIT.getBytes(StandardCharsets.UTF_8));
                 crash++;
+            } else if (f < 0.13) {
+                Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8));
+                timeouts++;
             } else {
                 Files.write(inputDir.resolve(i + ".xml"), OK.getBytes(StandardCharsets.UTF_8));
                 ok++;
             }
         }
+        MockEmitter.EMIT_DATA.clear();
+        MockReporter.RESULTS.clear();
+        return tikaConfigPath;
     }
 
 /*
@@ -128,9 +137,9 @@ public class AsyncProcessorTest {
 
     @Test
     public void testBasic() throws Exception {
-        AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+        AsyncProcessor processor = new AsyncProcessor(setUp(false));
         for (int i = 0; i < totalFiles; i++) {
-            FetchEmitTuple t = new FetchEmitTuple("myId",
+            FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
                     new FetchKey("mock", i + ".xml"),
                     new EmitKey("mock", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
@@ -148,5 +157,33 @@ public class AsyncProcessorTest {
             emitKeys.add(d.getEmitKey().getEmitKey());
         }
         assertEquals(ok, emitKeys.size());
+        assertEquals(100, MockReporter.RESULTS.size());
+        for (PipesResult r : MockReporter.RESULTS) {
+            assertEquals("application/mock+xml",
+                    r.getEmitData().getMetadataList().get(0).get(Metadata.CONTENT_TYPE));
+        }
+    }
+
+    @Test
+    public void testEmitIntermediate() throws Exception {
+        AsyncProcessor processor = new AsyncProcessor(setUp(true));
+        for (int i = 0; i < totalFiles; i++) {
+            FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey("mock", i + ".xml"),
+                    new EmitKey("mock", "emit-" + i), new Metadata());
+            processor.offer(t, 1000);
+        }
+        for (int i = 0; i < 10; i++) {
+            processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000);
+        }
+        //TODO clean this up
+        while (processor.checkActive()) {
+            Thread.sleep(100);
+        }
+        processor.close();
+        Set<String> emitKeys = new HashSet<>();
+        for (EmitData d : MockEmitter.EMIT_DATA) {
+            emitKeys.add(d.getEmitKey().getEmitKey());
+        }
+        assertEquals(totalFiles, emitKeys.size());
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index b8197bd82..6e8308c89 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.tika.pipes.async;
 
+import java.util.concurrent.ArrayBlockingQueue;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.PipesReporter;
@@ -23,11 +25,13 @@ import org.apache.tika.pipes.PipesResult;
 
 public class MockReporter extends PipesReporter {
 
+    static ArrayBlockingQueue<PipesResult> RESULTS = new ArrayBlockingQueue<>(10000);
+
     private String endpoint;
 
     @Override
     public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
-
+        RESULTS.add(result);
     }
 
     @Override