You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/16 18:15:16 UTC

[tika] branch main updated: TIKA-3482 -- improve handling of fetch exceptions, add basic logging to tika-app -a

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new dd5f49f  TIKA-3482 -- improve handling of fetch exceptions, add basic logging to tika-app -a
     new 7224d63  Merge remote-tracking branch 'origin/main' into main
dd5f49f is described below

commit dd5f49fc5ac751a8aa67e29e4c4c6963ca8ea65e
Author: tallison <ta...@apache.org>
AuthorDate: Fri Jul 16 14:14:38 2021 -0400

    TIKA-3482 -- improve handling of fetch exceptions, add basic logging to tika-app -a
---
 .../src/main/java/org/apache/tika/cli/TikaCLI.java |   4 +
 .../java/org/apache/tika/pipes/PipesClient.java    |  67 +++++--
 .../java/org/apache/tika/pipes/PipesResult.java    |   8 +-
 .../java/org/apache/tika/pipes/PipesServer.java    | 211 ++++++++++++---------
 .../apache/tika/pipes/async/AsyncProcessor.java    |  29 ++-
 .../pipesiterator/FileSystemPipesIteratorTest.java |  15 +-
 .../resources/test-documents/subdir/example.xml    |  47 +++++
 7 files changed, 255 insertions(+), 126 deletions(-)

diff --git a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
index e740e89..84e9700 100644
--- a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
+++ b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
@@ -286,6 +286,7 @@ public class TikaCLI {
             }
         }
         PipesIterator pipesIterator = PipesIterator.build(Paths.get(tikaConfigPath));
+        long start = System.currentTimeMillis();
         try (AsyncProcessor processor = new AsyncProcessor(Paths.get(tikaConfigPath))) {
             for (FetchEmitTuple t : pipesIterator) {
                 processor.offer(t, 2000);
@@ -298,6 +299,9 @@ public class TikaCLI {
                     break;
                 }
             }
+            long elapsed = System.currentTimeMillis() - start;
+            LOG.info("Successfully finished processing {} files in {} ms",
+                    processor.getTotalProcessed(), elapsed);
         }
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 995e33a..cd070c7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -16,6 +16,12 @@
  */
 package org.apache.tika.pipes;
 
+import static org.apache.tika.pipes.PipesServer.STATUS.CALL;
+import static org.apache.tika.pipes.PipesServer.STATUS.PING;
+import static org.apache.tika.pipes.PipesServer.STATUS.READY;
+import static org.apache.tika.pipes.PipesServer.STATUS.lookup;
+import static org.apache.tika.pipes.PipesServer.TIMEOUT_EXIT_CODE;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
@@ -71,10 +77,10 @@ public class PipesClient implements Closeable {
             return false;
         }
         try {
-            output.write(PipesServer.PING);
+            output.write(PING.getByte());
             output.flush();
             int ping = input.read();
-            if (ping == PipesServer.PING) {
+            if (ping == PING.getByte()) {
                 return true;
             }
         } catch (IOException e) {
@@ -111,7 +117,7 @@ public class PipesClient implements Closeable {
                 objectOutputStream.writeObject(t);
             }
             byte[] bytes = bos.toByteArray();
-            output.write(PipesServer.CALL);
+            output.write(CALL.getByte());
             output.writeInt(bytes.length);
             output.write(bytes);
             output.flush();
@@ -128,7 +134,7 @@ public class PipesClient implements Closeable {
         } catch (ExecutionException e) {
             long elapsed = System.currentTimeMillis() - start;
             destroyWithPause();
-            if (!process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE == process.exitValue()) {
+            if (!process.isAlive() && TIMEOUT_EXIT_CODE == process.exitValue()) {
                 LOG.warn("server timeout: {} in {} ms", t.getId(), elapsed);
                 return PipesResult.TIMEOUT;
             }
@@ -167,34 +173,57 @@ public class PipesClient implements Closeable {
     }
 
     private PipesResult readResults(FetchEmitTuple t, long start) throws IOException {
-        int status = input.read();
+        int statusByte = input.read();
         long millis = System.currentTimeMillis() - start;
+        PipesServer.STATUS status = null;
+        try {
+            status = lookup(statusByte);
+        } catch (IllegalArgumentException e) {
+            throw new IOException("problem reading response from server " + status);
+        }
         switch (status) {
-            case PipesServer.OOM:
+            case OOM:
                 LOG.warn("oom: {} in {} ms", t.getId(), millis);
                 return PipesResult.OOM;
-            case PipesServer.TIMEOUT:
+            case TIMEOUT:
                 LOG.warn("server response timeout: {} in {} ms", t.getId(), millis);
                 return PipesResult.TIMEOUT;
-            case PipesServer.EMIT_EXCEPTION:
+            case EMIT_EXCEPTION:
                 LOG.warn("emit exception: {} in {} ms", t.getId(), millis);
                 return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
-            case PipesServer.NO_EMITTER_FOUND:
-                LOG.warn("no emitter found: " + t.getId());
-                return PipesResult.NO_EMITTER_FOUND;
-            case PipesServer.PARSE_SUCCESS:
-            case PipesServer.PARSE_EXCEPTION_EMIT:
+            case EMITTER_NOT_FOUND:
+                LOG.warn("emitter not found: {} in {} ms", t.getId(), millis);
+                return readMessage(PipesResult.STATUS.NO_EMITTER_FOUND);
+            case FETCHER_NOT_FOUND:
+                LOG.warn("fetcher not found: {} in {} ms", t.getId(), millis);
+                return readMessage(PipesResult.STATUS.NO_FETCHER_FOUND);
+            case FETCHER_INITIALIZATION_EXCEPTION:
+                LOG.warn("fetcher initialization exception: {} in {} ms", t.getId(), millis);
+                return readMessage(PipesResult.STATUS.FETCHER_INITIALIZATION_EXCEPTION);
+            case FETCH_EXCEPTION:
+                LOG.warn("fetch exception: {} in {} ms", t.getId(), millis);
+                return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
+            case PARSE_SUCCESS:
+            case PARSE_EXCEPTION_EMIT:
                 LOG.info("parse success: {} in {} ms", t.getId(), millis);
                 return deserializeEmitData();
-            case PipesServer.PARSE_EXCEPTION_NO_EMIT:
+            case PARSE_EXCEPTION_NO_EMIT:
                 return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
-            case PipesServer.EMIT_SUCCESS:
+            case EMIT_SUCCESS:
                 LOG.info("emit success: {} in {} ms", t.getId(), millis);
                 return PipesResult.EMIT_SUCCESS;
-            case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
+            case EMIT_SUCCESS_PARSE_EXCEPTION:
                 return readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
+            case EMPTY_OUTPUT:
+                return PipesResult.EMPTY_OUTPUT;
+            //fall through
+            case READY:
+            case CALL:
+            case PING:
+            case FAILED_TO_START:
+                throw new IOException("Not expecting this status: " + status);
             default :
-                throw new IOException("problem reading response from server " + status);
+                throw new IOException("Need to handle procesing for: " + status);
         }
 
     }
@@ -239,7 +268,7 @@ public class PipesClient implements Closeable {
             int b = input.read();
             int read = 1;
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            while (read < MAX_BYTES_BEFORE_READY && b != PipesServer.READY) {
+            while (read < MAX_BYTES_BEFORE_READY && b != READY.getByte()) {
                 if (b == -1) {
                     throw new RuntimeException("Couldn't start server: " +
                             "read EOF before 'ready' byte.\n" +
@@ -313,7 +342,7 @@ public class PipesClient implements Closeable {
         if (! hasHeadless) {
             commandLine.add("-Djava.awt.headless=true");
         }
-        if (! hasExitOnOOM) {
+        if (hasExitOnOOM) {
             LOG.warn("I notice that you have an exit/crash on OOM. If you run heavy external processes " +
                     "like tesseract, this setting may result in orphaned processes which could be disastrous" +
                     " for performance.");
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index 8023d75..39c32e3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -22,12 +22,15 @@ public class PipesResult {
 
     public enum STATUS {
         CLIENT_UNAVAILABLE_WITHIN_MS,
+        FETCHER_INITIALIZATION_EXCEPTION,
+        FETCH_EXCEPTION,
+        EMPTY_OUTPUT,
         PARSE_EXCEPTION_NO_EMIT,
         PARSE_EXCEPTION_EMIT, PARSE_SUCCESS,
         OOM, TIMEOUT, UNSPECIFIED_CRASH,
         NO_EMITTER_FOUND,
         EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
-        INTERRUPTED_EXCEPTION
+        INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND;
     }
 
     public static PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -36,8 +39,9 @@ public class PipesResult {
     public static PipesResult OOM = new PipesResult(STATUS.OOM);
     public static PipesResult UNSPECIFIED_CRASH = new PipesResult(STATUS.UNSPECIFIED_CRASH);
     public static PipesResult EMIT_SUCCESS = new PipesResult(STATUS.EMIT_SUCCESS);
-    public static PipesResult NO_EMITTER_FOUND = new PipesResult(STATUS.NO_EMITTER_FOUND);
     public static PipesResult INTERRUPTED_EXCEPTION = new PipesResult(STATUS.INTERRUPTED_EXCEPTION);
+    public static PipesResult EMPTY_OUTPUT =
+            new PipesResult(STATUS.EMPTY_OUTPUT);
     private final STATUS status;
     private final EmitData emitData;
     private final String message;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 7e0ab03..0e117dd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -48,7 +48,6 @@ import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -72,40 +71,43 @@ public class PipesServer implements Runnable {
     //it looks like the server crashes with exit value 3 on OOM, for example
     public static final int TIMEOUT_EXIT_CODE = 17;
 
-    public static final byte READY = 1;
-
-    public static final byte CALL = 2;
-
-    public static final byte PING = 3;
-
-    public static final byte FAILED_TO_START = 4;
-
-    public static final byte PARSE_SUCCESS = 5;
-
-    /**
-     * This will return the parse exception stack trace
-     */
-    public static final byte PARSE_EXCEPTION_NO_EMIT = 6;
-
-    /**
-     * This will return the metadata list
-     */
-    public static final byte PARSE_EXCEPTION_EMIT = 7;
-
-    public static final byte EMIT_SUCCESS = 8;
-
-    public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 9;
-
-    public static final byte EMIT_EXCEPTION = 10;
-
-    public static final byte NO_EMITTER_FOUND = 11;
-
-    public static final byte OOM = 12;
-
-    public static final byte TIMEOUT = 13;
+    public enum STATUS {
+        READY,
+        CALL,
+        PING,
+        FAILED_TO_START,
+        FETCHER_NOT_FOUND,
+        EMITTER_NOT_FOUND,
+        FETCHER_INITIALIZATION_EXCEPTION,
+        FETCH_EXCEPTION,
+        PARSE_SUCCESS,
+        PARSE_EXCEPTION_NO_EMIT,
+        PARSE_EXCEPTION_EMIT,
+        EMIT_SUCCESS,
+        EMIT_SUCCESS_PARSE_EXCEPTION,
+        EMIT_EXCEPTION,
+        OOM,
+        TIMEOUT,
+        EMPTY_OUTPUT;
+
+        byte getByte() {
+            return (byte) (ordinal() + 1);
+        }
 
-    public static final byte EMPTY_OUTPUT = 14;
+        public static STATUS lookup(int val) {
+            int i = val - 1;
+            if (i < 0) {
+                throw new IllegalArgumentException("byte must be > 0");
+            }
+            STATUS[] statuses = STATUS.values();
 
+            if (i >= statuses.length) {
+                throw new IllegalArgumentException("byte with index " +
+                        i + " must be < " + statuses.length);
+            }
+            return statuses[i];
+        }
+    }
 
     private final Object[] lock = new Object[0];
     private final Path tikaConfigPath;
@@ -186,7 +188,7 @@ public class PipesServer implements Runnable {
         } catch (Throwable t) {
             LOG.error("couldn't initialize parser", t);
             try {
-                output.writeByte(FAILED_TO_START);
+                output.writeByte(STATUS.FAILED_TO_START.getByte());
                 output.flush();
             } catch (IOException e) {
                 LOG.warn("couldn't notify of failure to start", e);
@@ -195,16 +197,14 @@ public class PipesServer implements Runnable {
         }
         //main loop
         try {
-            output.write(READY);
-            output.flush();
+            write(STATUS.READY);
             while (true) {
                 int request = input.read();
                 if (request == -1) {
                     exit(1);
-                } else if (request == PING) {
-                    output.writeByte(PING);
-                    output.flush();
-                } else if (request == CALL) {
+                } else if (request == STATUS.PING.getByte()) {
+                    write(STATUS.PING);
+                } else if (request == STATUS.CALL.getByte()) {
                     parseOne();
                 } else {
                     throw new IllegalStateException("Unexpected request");
@@ -238,36 +238,46 @@ public class PipesServer implements Runnable {
     }
 
 
-    private void emit(EmitData emitData, String parseExceptionStack) {
-        Emitter emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
-        if (emitter == null) {
-            write(NO_EMITTER_FOUND, new byte[0]);
+    private void emit(String taskId, EmitData emitData, String parseExceptionStack) {
+        Emitter emitter = null;
+
+        try {
+            emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+        } catch (IllegalArgumentException e) {
+            String noEmitterMsg = getNoEmitterMsg(taskId);
+            LOG.warn(noEmitterMsg);
+            write(STATUS.EMITTER_NOT_FOUND, noEmitterMsg);
             return;
         }
         try {
             emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList());
         } catch (IOException | TikaEmitterException e) {
+            LOG.warn("emit exception", e);
             String msg = ExceptionUtils.getStackTrace(e);
             byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
             //for now, we're hiding the parse exception if there was also an emit exception
-            write(EMIT_EXCEPTION, bytes);
+            write(STATUS.EMIT_EXCEPTION, bytes);
             return;
         }
         if (StringUtils.isBlank(parseExceptionStack)) {
-            write(EMIT_SUCCESS);
+            write(STATUS.EMIT_SUCCESS);
         } else {
-            write(EMIT_SUCCESS_PARSE_EXCEPTION, parseExceptionStack.getBytes(StandardCharsets.UTF_8));
+            write(STATUS.EMIT_SUCCESS_PARSE_EXCEPTION,
+                    parseExceptionStack.getBytes(StandardCharsets.UTF_8));
         }
     }
 
-
-    private void parseOne() throws FetchException {
+    private void parseOne() {
         synchronized (lock) {
             parsing = true;
             since = System.currentTimeMillis();
         }
+        FetchEmitTuple t = null;
         try {
-            actuallyParse();
+            t = readFetchEmitTuple();
+            actuallyParse(t);
+        } catch (OutOfMemoryError e) {
+            handleOOM(t.getId(), e);
         } finally {
             synchronized (lock) {
                 parsing = false;
@@ -276,25 +286,37 @@ public class PipesServer implements Runnable {
         }
     }
 
-    public void actuallyParse() throws FetchException {
-        FetchEmitTuple t = readFetchEmitTuple();
+    private void actuallyParse(FetchEmitTuple t) {
         List<Metadata> metadataList = null;
 
-        Fetcher fetcher = getFetcher(t.getFetchKey().getFetcherName());
+        Fetcher fetcher = null;
+        try {
+            fetcher = fetcherManager.getFetcher(t.getFetchKey().getFetcherName());
+        } catch (IllegalArgumentException e) {
+            String noFetcherMsg = getNoFetcherMsg(t.getFetchKey().getFetcherName());
+            LOG.warn(noFetcherMsg);
+            write(STATUS.FETCHER_NOT_FOUND, noFetcherMsg);
+            return;
+        } catch (IOException | TikaException e) {
+            LOG.warn("Couldn't initialize fetcher for fetch id '" +
+                    t.getId() + "'", e);
+            write(STATUS.FETCHER_INITIALIZATION_EXCEPTION,
+                    ExceptionUtils.getStackTrace(e));
+            return;
+        }
 
         Metadata metadata = new Metadata();
         try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
             metadataList = parseMetadata(t, stream, metadata);
         } catch (SecurityException e) {
+            LOG.error("security exception " + t.getId(), e);
             throw e;
         } catch (TikaException | IOException e) {
-            LOG.warn("fetch exception", e);
-            throw new FetchException(e);
-        } catch (OutOfMemoryError e) {
-            handleOOM(e);
+            LOG.warn("fetch exception " + t.getId(), e);
+            write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
         }
         if (metadataIsEmpty(metadataList)) {
-            write(EMPTY_OUTPUT);
+            write(STATUS.EMPTY_OUTPUT);
             return;
         }
 
@@ -308,33 +330,51 @@ public class PipesServer implements Runnable {
             }
             EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
             if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
-                emit(emitData, stack);
+                emit(t.getId(), emitData, stack);
             } else {
                 write(emitData, stack);
             }
         } else {
-            write(PARSE_EXCEPTION_NO_EMIT, stack.getBytes(StandardCharsets.UTF_8));
+            write(STATUS.PARSE_EXCEPTION_NO_EMIT,
+                    stack.getBytes(StandardCharsets.UTF_8));
         }
 
     }
 
-    private Fetcher getFetcher(String fetcherName) throws FetchException {
-        try {
-            return fetcherManager.getFetcher(fetcherName);
-        } catch (TikaException | IOException e) {
-            LOG.error("can't load fetcher", e);
-            throw new FetchException(e);
+    private String getNoFetcherMsg(String fetcherName) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Fetcher '").append(fetcherName).append("'");
+        sb.append(" not found.");
+        sb.append("\nThe configured FetcherManager supports:");
+        int i = 0;
+        for (String f : fetcherManager.getSupported()) {
+            if (i++ > 0) {
+                sb.append(", ");
+            }
+            sb.append(f);
         }
+        return sb.toString();
     }
 
-    private void handleOOM(OutOfMemoryError oom) {
-        try {
-            output.writeByte(OOM);
-            output.flush();
-        } catch (IOException e) {
-            //swallow at this point
+    private String getNoEmitterMsg(String emitterName) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Emitter '").append(emitterName).append("'");
+        sb.append(" not found.");
+        sb.append("\nThe configured emitterManager supports:");
+        int i = 0;
+        for (String e : emitterManager.getSupported()) {
+            if (i++ > 0) {
+                sb.append(", ");
+            }
+            sb.append(e);
         }
-        LOG.error("oom", oom);
+        return sb.toString();
+    }
+
+
+    private void handleOOM(String taskId, OutOfMemoryError oom) {
+        write(STATUS.OOM);
+        LOG.error("oom: " + taskId, oom);
         exit(1);
     }
 
@@ -347,7 +387,6 @@ public class PipesServer implements Runnable {
                 handlerConfig.getMaxEmbeddedResources(),
                 tikaConfig.getMetadataFilter());
         ParseContext parseContext = new ParseContext();
-        FetchKey fetchKey = fetchEmitTuple.getFetchKey();
         try {
             parser.parse(stream, handler, metadata, parseContext);
         } catch (SAXException e) {
@@ -359,10 +398,6 @@ public class PipesServer implements Runnable {
             throw e;
         } catch (Exception e) {
             LOG.warn("exception: " + fetchEmitTuple.getId(), e);
-        } catch (OutOfMemoryError e) {
-            //TODO, maybe return file type gathered so far and then crash?
-            //LOG.error("oom: " + fetchKey.getFetchKey());
-            throw e;
         }
         return handler.getMetadataList();
     }
@@ -377,9 +412,8 @@ public class PipesServer implements Runnable {
         }
     }
 
-
     private void exit(int exitCode) {
-        LOG.warn("exiting: {}", exitCode);
+        LOG.error("exiting: {}", exitCode);
         System.exit(exitCode);
     }
 
@@ -421,22 +455,28 @@ public class PipesServer implements Runnable {
     }
 
     private void write(EmitData emitData, String stack) {
+        //TODO -- what do we do with the stack?
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
                 objectOutputStream.writeObject(emitData);
             }
-            write(PARSE_SUCCESS, bos.toByteArray());
+            write(STATUS.PARSE_SUCCESS, bos.toByteArray());
         } catch (IOException e) {
             LOG.error("problem writing emit data (forking process shutdown?)", e);
             exit(1);
         }
     }
 
-    private void write(byte status, byte[] bytes) {
+    private void write(STATUS status, String msg) {
+        byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+        write(status, bytes);
+    }
+
+    private void write(STATUS status, byte[] bytes) {
         try {
             int len = bytes.length;
-            output.write(status);
+            output.write(status.getByte());
             output.writeInt(len);
             output.write(bytes);
             output.flush();
@@ -446,14 +486,13 @@ public class PipesServer implements Runnable {
         }
     }
 
-    private void write(byte status) {
+    private void write(STATUS status) {
         try {
-            output.write(status);
+            output.write(status.getByte());
             output.flush();
         } catch (IOException e) {
             LOG.error("problem writing data (forking process shutdown?)", e);
             exit(1);
         }
     }
-
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 0bdb86f..926f8eb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -28,8 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
-import org.xml.sax.SAXException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.FetchEmitTuple;
@@ -48,6 +47,9 @@ import org.apache.tika.pipes.pipesiterator.PipesIterator;
 public class AsyncProcessor implements Closeable {
 
     static final int PARSER_FUTURE_CODE = 1;
+
+    static final AtomicLong TOTAL_PROCESSED = new AtomicLong(0);
+
     private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
     private final ArrayBlockingQueue<EmitData> emitData;
     private final ExecutorCompletionService<Integer> executorCompletionService;
@@ -58,7 +60,7 @@ public class AsyncProcessor implements Closeable {
     private int finished = 0;
     boolean isShuttingDown = false;
 
-    public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
+    public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException {
         this.asyncConfig = AsyncConfig.load(tikaConfigPath);
         this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
         this.emitData = new ArrayBlockingQueue<>(100);
@@ -162,6 +164,10 @@ public class AsyncProcessor implements Closeable {
         executorService.shutdownNow();
     }
 
+    public long getTotalProcessed() {
+        return TOTAL_PROCESSED.get();
+    }
+
     private class FetchEmitWorker implements Callable<Integer> {
 
         private final AsyncConfig asyncConfig;
@@ -175,6 +181,7 @@ public class AsyncProcessor implements Closeable {
             this.fetchEmitTuples = fetchEmitTuples;
             this.emitDataQueue = emitDataQueue;
         }
+
         @Override
         public Integer call() throws Exception {
 
@@ -192,19 +199,11 @@ public class AsyncProcessor implements Closeable {
                         } catch (IOException e) {
                             result = PipesResult.UNSPECIFIED_CRASH;
                         }
-                        switch (result.getStatus()) {
-                            case PARSE_SUCCESS:
-                                //TODO -- add timeout, this currently hangs forever
-                                emitDataQueue.offer(result.getEmitData());
-                                break;
-                            case EMIT_SUCCESS:
-                                break;
-                            case EMIT_EXCEPTION:
-                            case UNSPECIFIED_CRASH:
-                            case OOM:
-                            case TIMEOUT:
-                                break;
+                        if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS) {
+                            //TODO -- add timeout, this currently hangs forever
+                            emitDataQueue.offer(result.getEmitData());
                         }
+                        TOTAL_PROCESSED.incrementAndGet();
                     }
                     checkActive();
                 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
index e8e3271..d0ee652 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
@@ -16,9 +16,10 @@
  */
 package org.apache.tika.pipes.pipesiterator;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -48,8 +49,9 @@ public class FileSystemPipesIteratorTest {
 
     @Test(timeout = 30000)
     public void testBasic() throws Exception {
-        Path root = Paths.get(".");
-
+        URL url =
+                FileSystemPipesIteratorTest.class.getResource("/test-documents");
+        Path root = Paths.get(url.toURI());
         List<Path> files = listFiles(root);
         Set<String> truthSet = new HashSet<>();
         for (Path p : files) {
@@ -67,6 +69,11 @@ public class FileSystemPipesIteratorTest {
             iteratorSet.add(p.getFetchKey().getFetchKey());
         }
 
-        assertEquals(truthSet, iteratorSet);
+        for (String t : truthSet) {
+            assertTrue("missing in iterator set " + t, iteratorSet.contains(t));
+        }
+        for (String i : iteratorSet) {
+            assertTrue("missing in truth set " + i, truthSet.contains(i));
+        }
     }
 }
diff --git a/tika-core/src/test/resources/test-documents/subdir/example.xml b/tika-core/src/test/resources/test-documents/subdir/example.xml
new file mode 100644
index 0000000..1004305
--- /dev/null
+++ b/tika-core/src/test/resources/test-documents/subdir/example.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<mock>
+    <!-- this file offers all of the options as documentation
+    Parsing should stop at an IOException, of course
+    -->
+
+    <!-- action can be "add" or "set" -->
+    <metadata action="add" name="dc:creator">Nikolai Lobachevsky</metadata>
+    <!-- element is the name of the sax event to write, p=paragraph
+        if the element is not specified, the default is <p> -->
+    <write element="p">hello world! the quick brown fox jumped over the lazy dog</write>
+    <!-- write something to System.out -->
+    <print_out>writing to System.out</print_out>
+    <!-- write something to System.err -->
+    <print_err>writing to System.err</print_err>
+    <!-- hang
+        millis: how many milliseconds to pause.  The actual hang time will probably
+            be a bit longer than the value specified.        heavy: whether or not the hang should do something computationally expensive.
+            If the value is false, this just does a Thread.sleep(millis).
+            This attribute is optional, with default of heavy=false.
+        pulse_millis: (required if "heavy" is true), how often to check to see
+            whether the thread was interrupted or that the total hang time exceeded the millis
+        interruptible: whether or not the parser will check to see if its thread
+            has been interrupted; this attribute is optional with default of true
+    -->
+
+</mock>
\ No newline at end of file