You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/16 18:15:16 UTC
[tika] branch main updated: TIKA-3482 -- improve handling of fetch
exceptions, add basic logging to tika-app -a
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new dd5f49f TIKA-3482 -- improve handling of fetch exceptions, add basic logging to tika-app -a
new 7224d63 Merge remote-tracking branch 'origin/main' into main
dd5f49f is described below
commit dd5f49fc5ac751a8aa67e29e4c4c6963ca8ea65e
Author: tallison <ta...@apache.org>
AuthorDate: Fri Jul 16 14:14:38 2021 -0400
TIKA-3482 -- improve handling of fetch exceptions, add basic logging to tika-app -a
---
.../src/main/java/org/apache/tika/cli/TikaCLI.java | 4 +
.../java/org/apache/tika/pipes/PipesClient.java | 67 +++++--
.../java/org/apache/tika/pipes/PipesResult.java | 8 +-
.../java/org/apache/tika/pipes/PipesServer.java | 211 ++++++++++++---------
.../apache/tika/pipes/async/AsyncProcessor.java | 29 ++-
.../pipesiterator/FileSystemPipesIteratorTest.java | 15 +-
.../resources/test-documents/subdir/example.xml | 47 +++++
7 files changed, 255 insertions(+), 126 deletions(-)
diff --git a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
index e740e89..84e9700 100644
--- a/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
+++ b/tika-app/src/main/java/org/apache/tika/cli/TikaCLI.java
@@ -286,6 +286,7 @@ public class TikaCLI {
}
}
PipesIterator pipesIterator = PipesIterator.build(Paths.get(tikaConfigPath));
+ long start = System.currentTimeMillis();
try (AsyncProcessor processor = new AsyncProcessor(Paths.get(tikaConfigPath))) {
for (FetchEmitTuple t : pipesIterator) {
processor.offer(t, 2000);
@@ -298,6 +299,9 @@ public class TikaCLI {
break;
}
}
+ long elapsed = System.currentTimeMillis() - start;
+ LOG.info("Successfully finished processing {} files in {} ms",
+ processor.getTotalProcessed(), elapsed);
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 995e33a..cd070c7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -16,6 +16,12 @@
*/
package org.apache.tika.pipes;
+import static org.apache.tika.pipes.PipesServer.STATUS.CALL;
+import static org.apache.tika.pipes.PipesServer.STATUS.PING;
+import static org.apache.tika.pipes.PipesServer.STATUS.READY;
+import static org.apache.tika.pipes.PipesServer.STATUS.lookup;
+import static org.apache.tika.pipes.PipesServer.TIMEOUT_EXIT_CODE;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
@@ -71,10 +77,10 @@ public class PipesClient implements Closeable {
return false;
}
try {
- output.write(PipesServer.PING);
+ output.write(PING.getByte());
output.flush();
int ping = input.read();
- if (ping == PipesServer.PING) {
+ if (ping == PING.getByte()) {
return true;
}
} catch (IOException e) {
@@ -111,7 +117,7 @@ public class PipesClient implements Closeable {
objectOutputStream.writeObject(t);
}
byte[] bytes = bos.toByteArray();
- output.write(PipesServer.CALL);
+ output.write(CALL.getByte());
output.writeInt(bytes.length);
output.write(bytes);
output.flush();
@@ -128,7 +134,7 @@ public class PipesClient implements Closeable {
} catch (ExecutionException e) {
long elapsed = System.currentTimeMillis() - start;
destroyWithPause();
- if (!process.isAlive() && PipesServer.TIMEOUT_EXIT_CODE == process.exitValue()) {
+ if (!process.isAlive() && TIMEOUT_EXIT_CODE == process.exitValue()) {
LOG.warn("server timeout: {} in {} ms", t.getId(), elapsed);
return PipesResult.TIMEOUT;
}
@@ -167,34 +173,57 @@ public class PipesClient implements Closeable {
}
private PipesResult readResults(FetchEmitTuple t, long start) throws IOException {
- int status = input.read();
+ int statusByte = input.read();
long millis = System.currentTimeMillis() - start;
+ PipesServer.STATUS status = null;
+ try {
+ status = lookup(statusByte);
+ } catch (IllegalArgumentException e) {
+ throw new IOException("problem reading response from server " + status);
+ }
switch (status) {
- case PipesServer.OOM:
+ case OOM:
LOG.warn("oom: {} in {} ms", t.getId(), millis);
return PipesResult.OOM;
- case PipesServer.TIMEOUT:
+ case TIMEOUT:
LOG.warn("server response timeout: {} in {} ms", t.getId(), millis);
return PipesResult.TIMEOUT;
- case PipesServer.EMIT_EXCEPTION:
+ case EMIT_EXCEPTION:
LOG.warn("emit exception: {} in {} ms", t.getId(), millis);
return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
- case PipesServer.NO_EMITTER_FOUND:
- LOG.warn("no emitter found: " + t.getId());
- return PipesResult.NO_EMITTER_FOUND;
- case PipesServer.PARSE_SUCCESS:
- case PipesServer.PARSE_EXCEPTION_EMIT:
+ case EMITTER_NOT_FOUND:
+ LOG.warn("emitter not found: {} in {} ms", t.getId(), millis);
+ return readMessage(PipesResult.STATUS.NO_EMITTER_FOUND);
+ case FETCHER_NOT_FOUND:
+ LOG.warn("fetcher not found: {} in {} ms", t.getId(), millis);
+ return readMessage(PipesResult.STATUS.NO_FETCHER_FOUND);
+ case FETCHER_INITIALIZATION_EXCEPTION:
+ LOG.warn("fetcher initialization exception: {} in {} ms", t.getId(), millis);
+ return readMessage(PipesResult.STATUS.FETCHER_INITIALIZATION_EXCEPTION);
+ case FETCH_EXCEPTION:
+ LOG.warn("fetch exception: {} in {} ms", t.getId(), millis);
+ return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
+ case PARSE_SUCCESS:
+ case PARSE_EXCEPTION_EMIT:
LOG.info("parse success: {} in {} ms", t.getId(), millis);
return deserializeEmitData();
- case PipesServer.PARSE_EXCEPTION_NO_EMIT:
+ case PARSE_EXCEPTION_NO_EMIT:
return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
- case PipesServer.EMIT_SUCCESS:
+ case EMIT_SUCCESS:
LOG.info("emit success: {} in {} ms", t.getId(), millis);
return PipesResult.EMIT_SUCCESS;
- case PipesServer.EMIT_SUCCESS_PARSE_EXCEPTION:
+ case EMIT_SUCCESS_PARSE_EXCEPTION:
return readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
+ case EMPTY_OUTPUT:
+ return PipesResult.EMPTY_OUTPUT;
+ //fall through
+ case READY:
+ case CALL:
+ case PING:
+ case FAILED_TO_START:
+ throw new IOException("Not expecting this status: " + status);
default :
- throw new IOException("problem reading response from server " + status);
+ throw new IOException("Need to handle procesing for: " + status);
}
}
@@ -239,7 +268,7 @@ public class PipesClient implements Closeable {
int b = input.read();
int read = 1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- while (read < MAX_BYTES_BEFORE_READY && b != PipesServer.READY) {
+ while (read < MAX_BYTES_BEFORE_READY && b != READY.getByte()) {
if (b == -1) {
throw new RuntimeException("Couldn't start server: " +
"read EOF before 'ready' byte.\n" +
@@ -313,7 +342,7 @@ public class PipesClient implements Closeable {
if (! hasHeadless) {
commandLine.add("-Djava.awt.headless=true");
}
- if (! hasExitOnOOM) {
+ if (hasExitOnOOM) {
LOG.warn("I notice that you have an exit/crash on OOM. If you run heavy external processes " +
"like tesseract, this setting may result in orphaned processes which could be disastrous" +
" for performance.");
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
index 8023d75..39c32e3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java
@@ -22,12 +22,15 @@ public class PipesResult {
public enum STATUS {
CLIENT_UNAVAILABLE_WITHIN_MS,
+ FETCHER_INITIALIZATION_EXCEPTION,
+ FETCH_EXCEPTION,
+ EMPTY_OUTPUT,
PARSE_EXCEPTION_NO_EMIT,
PARSE_EXCEPTION_EMIT, PARSE_SUCCESS,
OOM, TIMEOUT, UNSPECIFIED_CRASH,
NO_EMITTER_FOUND,
EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION,
- INTERRUPTED_EXCEPTION
+ INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND;
}
public static PipesResult CLIENT_UNAVAILABLE_WITHIN_MS =
@@ -36,8 +39,9 @@ public class PipesResult {
public static PipesResult OOM = new PipesResult(STATUS.OOM);
public static PipesResult UNSPECIFIED_CRASH = new PipesResult(STATUS.UNSPECIFIED_CRASH);
public static PipesResult EMIT_SUCCESS = new PipesResult(STATUS.EMIT_SUCCESS);
- public static PipesResult NO_EMITTER_FOUND = new PipesResult(STATUS.NO_EMITTER_FOUND);
public static PipesResult INTERRUPTED_EXCEPTION = new PipesResult(STATUS.INTERRUPTED_EXCEPTION);
+ public static PipesResult EMPTY_OUTPUT =
+ new PipesResult(STATUS.EMPTY_OUTPUT);
private final STATUS status;
private final EmitData emitData;
private final String message;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 7e0ab03..0e117dd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -48,7 +48,6 @@ import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -72,40 +71,43 @@ public class PipesServer implements Runnable {
//it looks like the server crashes with exit value 3 on OOM, for example
public static final int TIMEOUT_EXIT_CODE = 17;
- public static final byte READY = 1;
-
- public static final byte CALL = 2;
-
- public static final byte PING = 3;
-
- public static final byte FAILED_TO_START = 4;
-
- public static final byte PARSE_SUCCESS = 5;
-
- /**
- * This will return the parse exception stack trace
- */
- public static final byte PARSE_EXCEPTION_NO_EMIT = 6;
-
- /**
- * This will return the metadata list
- */
- public static final byte PARSE_EXCEPTION_EMIT = 7;
-
- public static final byte EMIT_SUCCESS = 8;
-
- public static final byte EMIT_SUCCESS_PARSE_EXCEPTION = 9;
-
- public static final byte EMIT_EXCEPTION = 10;
-
- public static final byte NO_EMITTER_FOUND = 11;
-
- public static final byte OOM = 12;
-
- public static final byte TIMEOUT = 13;
+ public enum STATUS {
+ READY,
+ CALL,
+ PING,
+ FAILED_TO_START,
+ FETCHER_NOT_FOUND,
+ EMITTER_NOT_FOUND,
+ FETCHER_INITIALIZATION_EXCEPTION,
+ FETCH_EXCEPTION,
+ PARSE_SUCCESS,
+ PARSE_EXCEPTION_NO_EMIT,
+ PARSE_EXCEPTION_EMIT,
+ EMIT_SUCCESS,
+ EMIT_SUCCESS_PARSE_EXCEPTION,
+ EMIT_EXCEPTION,
+ OOM,
+ TIMEOUT,
+ EMPTY_OUTPUT;
+
+ byte getByte() {
+ return (byte) (ordinal() + 1);
+ }
- public static final byte EMPTY_OUTPUT = 14;
+ public static STATUS lookup(int val) {
+ int i = val - 1;
+ if (i < 0) {
+ throw new IllegalArgumentException("byte must be > 0");
+ }
+ STATUS[] statuses = STATUS.values();
+ if (i >= statuses.length) {
+ throw new IllegalArgumentException("byte with index " +
+ i + " must be < " + statuses.length);
+ }
+ return statuses[i];
+ }
+ }
private final Object[] lock = new Object[0];
private final Path tikaConfigPath;
@@ -186,7 +188,7 @@ public class PipesServer implements Runnable {
} catch (Throwable t) {
LOG.error("couldn't initialize parser", t);
try {
- output.writeByte(FAILED_TO_START);
+ output.writeByte(STATUS.FAILED_TO_START.getByte());
output.flush();
} catch (IOException e) {
LOG.warn("couldn't notify of failure to start", e);
@@ -195,16 +197,14 @@ public class PipesServer implements Runnable {
}
//main loop
try {
- output.write(READY);
- output.flush();
+ write(STATUS.READY);
while (true) {
int request = input.read();
if (request == -1) {
exit(1);
- } else if (request == PING) {
- output.writeByte(PING);
- output.flush();
- } else if (request == CALL) {
+ } else if (request == STATUS.PING.getByte()) {
+ write(STATUS.PING);
+ } else if (request == STATUS.CALL.getByte()) {
parseOne();
} else {
throw new IllegalStateException("Unexpected request");
@@ -238,36 +238,46 @@ public class PipesServer implements Runnable {
}
- private void emit(EmitData emitData, String parseExceptionStack) {
- Emitter emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
- if (emitter == null) {
- write(NO_EMITTER_FOUND, new byte[0]);
+ private void emit(String taskId, EmitData emitData, String parseExceptionStack) {
+ Emitter emitter = null;
+
+ try {
+ emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName());
+ } catch (IllegalArgumentException e) {
+ String noEmitterMsg = getNoEmitterMsg(taskId);
+ LOG.warn(noEmitterMsg);
+ write(STATUS.EMITTER_NOT_FOUND, noEmitterMsg);
return;
}
try {
emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList());
} catch (IOException | TikaEmitterException e) {
+ LOG.warn("emit exception", e);
String msg = ExceptionUtils.getStackTrace(e);
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
//for now, we're hiding the parse exception if there was also an emit exception
- write(EMIT_EXCEPTION, bytes);
+ write(STATUS.EMIT_EXCEPTION, bytes);
return;
}
if (StringUtils.isBlank(parseExceptionStack)) {
- write(EMIT_SUCCESS);
+ write(STATUS.EMIT_SUCCESS);
} else {
- write(EMIT_SUCCESS_PARSE_EXCEPTION, parseExceptionStack.getBytes(StandardCharsets.UTF_8));
+ write(STATUS.EMIT_SUCCESS_PARSE_EXCEPTION,
+ parseExceptionStack.getBytes(StandardCharsets.UTF_8));
}
}
-
- private void parseOne() throws FetchException {
+ private void parseOne() {
synchronized (lock) {
parsing = true;
since = System.currentTimeMillis();
}
+ FetchEmitTuple t = null;
try {
- actuallyParse();
+ t = readFetchEmitTuple();
+ actuallyParse(t);
+ } catch (OutOfMemoryError e) {
+ handleOOM(t.getId(), e);
} finally {
synchronized (lock) {
parsing = false;
@@ -276,25 +286,37 @@ public class PipesServer implements Runnable {
}
}
- public void actuallyParse() throws FetchException {
- FetchEmitTuple t = readFetchEmitTuple();
+ private void actuallyParse(FetchEmitTuple t) {
List<Metadata> metadataList = null;
- Fetcher fetcher = getFetcher(t.getFetchKey().getFetcherName());
+ Fetcher fetcher = null;
+ try {
+ fetcher = fetcherManager.getFetcher(t.getFetchKey().getFetcherName());
+ } catch (IllegalArgumentException e) {
+ String noFetcherMsg = getNoFetcherMsg(t.getFetchKey().getFetcherName());
+ LOG.warn(noFetcherMsg);
+ write(STATUS.FETCHER_NOT_FOUND, noFetcherMsg);
+ return;
+ } catch (IOException | TikaException e) {
+ LOG.warn("Couldn't initialize fetcher for fetch id '" +
+ t.getId() + "'", e);
+ write(STATUS.FETCHER_INITIALIZATION_EXCEPTION,
+ ExceptionUtils.getStackTrace(e));
+ return;
+ }
Metadata metadata = new Metadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
metadataList = parseMetadata(t, stream, metadata);
} catch (SecurityException e) {
+ LOG.error("security exception " + t.getId(), e);
throw e;
} catch (TikaException | IOException e) {
- LOG.warn("fetch exception", e);
- throw new FetchException(e);
- } catch (OutOfMemoryError e) {
- handleOOM(e);
+ LOG.warn("fetch exception " + t.getId(), e);
+ write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
if (metadataIsEmpty(metadataList)) {
- write(EMPTY_OUTPUT);
+ write(STATUS.EMPTY_OUTPUT);
return;
}
@@ -308,33 +330,51 @@ public class PipesServer implements Runnable {
}
EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
- emit(emitData, stack);
+ emit(t.getId(), emitData, stack);
} else {
write(emitData, stack);
}
} else {
- write(PARSE_EXCEPTION_NO_EMIT, stack.getBytes(StandardCharsets.UTF_8));
+ write(STATUS.PARSE_EXCEPTION_NO_EMIT,
+ stack.getBytes(StandardCharsets.UTF_8));
}
}
- private Fetcher getFetcher(String fetcherName) throws FetchException {
- try {
- return fetcherManager.getFetcher(fetcherName);
- } catch (TikaException | IOException e) {
- LOG.error("can't load fetcher", e);
- throw new FetchException(e);
+ private String getNoFetcherMsg(String fetcherName) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Fetcher '").append(fetcherName).append("'");
+ sb.append(" not found.");
+ sb.append("\nThe configured FetcherManager supports:");
+ int i = 0;
+ for (String f : fetcherManager.getSupported()) {
+ if (i++ > 0) {
+ sb.append(", ");
+ }
+ sb.append(f);
}
+ return sb.toString();
}
- private void handleOOM(OutOfMemoryError oom) {
- try {
- output.writeByte(OOM);
- output.flush();
- } catch (IOException e) {
- //swallow at this point
+ private String getNoEmitterMsg(String emitterName) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Emitter '").append(emitterName).append("'");
+ sb.append(" not found.");
+ sb.append("\nThe configured emitterManager supports:");
+ int i = 0;
+ for (String e : emitterManager.getSupported()) {
+ if (i++ > 0) {
+ sb.append(", ");
+ }
+ sb.append(e);
}
- LOG.error("oom", oom);
+ return sb.toString();
+ }
+
+
+ private void handleOOM(String taskId, OutOfMemoryError oom) {
+ write(STATUS.OOM);
+ LOG.error("oom: " + taskId, oom);
exit(1);
}
@@ -347,7 +387,6 @@ public class PipesServer implements Runnable {
handlerConfig.getMaxEmbeddedResources(),
tikaConfig.getMetadataFilter());
ParseContext parseContext = new ParseContext();
- FetchKey fetchKey = fetchEmitTuple.getFetchKey();
try {
parser.parse(stream, handler, metadata, parseContext);
} catch (SAXException e) {
@@ -359,10 +398,6 @@ public class PipesServer implements Runnable {
throw e;
} catch (Exception e) {
LOG.warn("exception: " + fetchEmitTuple.getId(), e);
- } catch (OutOfMemoryError e) {
- //TODO, maybe return file type gathered so far and then crash?
- //LOG.error("oom: " + fetchKey.getFetchKey());
- throw e;
}
return handler.getMetadataList();
}
@@ -377,9 +412,8 @@ public class PipesServer implements Runnable {
}
}
-
private void exit(int exitCode) {
- LOG.warn("exiting: {}", exitCode);
+ LOG.error("exiting: {}", exitCode);
System.exit(exitCode);
}
@@ -421,22 +455,28 @@ public class PipesServer implements Runnable {
}
private void write(EmitData emitData, String stack) {
+ //TODO -- what do we do with the stack?
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
objectOutputStream.writeObject(emitData);
}
- write(PARSE_SUCCESS, bos.toByteArray());
+ write(STATUS.PARSE_SUCCESS, bos.toByteArray());
} catch (IOException e) {
LOG.error("problem writing emit data (forking process shutdown?)", e);
exit(1);
}
}
- private void write(byte status, byte[] bytes) {
+ private void write(STATUS status, String msg) {
+ byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+ write(status, bytes);
+ }
+
+ private void write(STATUS status, byte[] bytes) {
try {
int len = bytes.length;
- output.write(status);
+ output.write(status.getByte());
output.writeInt(len);
output.write(bytes);
output.flush();
@@ -446,14 +486,13 @@ public class PipesServer implements Runnable {
}
}
- private void write(byte status) {
+ private void write(STATUS status) {
try {
- output.write(status);
+ output.write(status.getByte());
output.flush();
} catch (IOException e) {
LOG.error("problem writing data (forking process shutdown?)", e);
exit(1);
}
}
-
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 0bdb86f..926f8eb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -28,8 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
-import org.xml.sax.SAXException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
@@ -48,6 +47,9 @@ import org.apache.tika.pipes.pipesiterator.PipesIterator;
public class AsyncProcessor implements Closeable {
static final int PARSER_FUTURE_CODE = 1;
+
+ static final AtomicLong TOTAL_PROCESSED = new AtomicLong(0);
+
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitData;
private final ExecutorCompletionService<Integer> executorCompletionService;
@@ -58,7 +60,7 @@ public class AsyncProcessor implements Closeable {
private int finished = 0;
boolean isShuttingDown = false;
- public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
+ public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException {
this.asyncConfig = AsyncConfig.load(tikaConfigPath);
this.fetchEmitTuples = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
this.emitData = new ArrayBlockingQueue<>(100);
@@ -162,6 +164,10 @@ public class AsyncProcessor implements Closeable {
executorService.shutdownNow();
}
+ public long getTotalProcessed() {
+ return TOTAL_PROCESSED.get();
+ }
+
private class FetchEmitWorker implements Callable<Integer> {
private final AsyncConfig asyncConfig;
@@ -175,6 +181,7 @@ public class AsyncProcessor implements Closeable {
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataQueue = emitDataQueue;
}
+
@Override
public Integer call() throws Exception {
@@ -192,19 +199,11 @@ public class AsyncProcessor implements Closeable {
} catch (IOException e) {
result = PipesResult.UNSPECIFIED_CRASH;
}
- switch (result.getStatus()) {
- case PARSE_SUCCESS:
- //TODO -- add timeout, this currently hangs forever
- emitDataQueue.offer(result.getEmitData());
- break;
- case EMIT_SUCCESS:
- break;
- case EMIT_EXCEPTION:
- case UNSPECIFIED_CRASH:
- case OOM:
- case TIMEOUT:
- break;
+ if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS) {
+ //TODO -- add timeout, this currently hangs forever
+ emitDataQueue.offer(result.getEmitData());
}
+ TOTAL_PROCESSED.incrementAndGet();
}
checkActive();
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
index e8e3271..d0ee652 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
@@ -16,9 +16,10 @@
*/
package org.apache.tika.pipes.pipesiterator;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -48,8 +49,9 @@ public class FileSystemPipesIteratorTest {
@Test(timeout = 30000)
public void testBasic() throws Exception {
- Path root = Paths.get(".");
-
+ URL url =
+ FileSystemPipesIteratorTest.class.getResource("/test-documents");
+ Path root = Paths.get(url.toURI());
List<Path> files = listFiles(root);
Set<String> truthSet = new HashSet<>();
for (Path p : files) {
@@ -67,6 +69,11 @@ public class FileSystemPipesIteratorTest {
iteratorSet.add(p.getFetchKey().getFetchKey());
}
- assertEquals(truthSet, iteratorSet);
+ for (String t : truthSet) {
+ assertTrue("missing in iterator set " + t, iteratorSet.contains(t));
+ }
+ for (String i : iteratorSet) {
+ assertTrue("missing in truth set " + i, truthSet.contains(i));
+ }
}
}
diff --git a/tika-core/src/test/resources/test-documents/subdir/example.xml b/tika-core/src/test/resources/test-documents/subdir/example.xml
new file mode 100644
index 0000000..1004305
--- /dev/null
+++ b/tika-core/src/test/resources/test-documents/subdir/example.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<mock>
+ <!-- this file offers all of the options as documentation
+ Parsing should stop at an IOException, of course
+ -->
+
+ <!-- action can be "add" or "set" -->
+ <metadata action="add" name="dc:creator">Nikolai Lobachevsky</metadata>
+ <!-- element is the name of the sax event to write, p=paragraph
+ if the element is not specified, the default is <p> -->
+ <write element="p">hello world! the quick brown fox jumped over the lazy dog</write>
+ <!-- write something to System.out -->
+ <print_out>writing to System.out</print_out>
+ <!-- write something to System.err -->
+ <print_err>writing to System.err</print_err>
+ <!-- hang
+ millis: how many milliseconds to pause. The actual hang time will probably
+ be a bit longer than the value specified. heavy: whether or not the hang should do something computationally expensive.
+ If the value is false, this just does a Thread.sleep(millis).
+ This attribute is optional, with default of heavy=false.
+ pulse_millis: (required if "heavy" is true), how often to check to see
+ whether the thread was interrupted or that the total hang time exceeded the millis
+ interruptible: whether or not the parser will check to see if its thread
+ has been interrupted; this attribute is optional with default of true
+ -->
+
+</mock>
\ No newline at end of file