You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 15:00:02 UTC
[tika] 03/03: TIKA-3929 -- add a crash option for PipesReporter
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 46901964b2221fd882b2c550e784f766f2b726bd
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 09:59:46 2022 -0500
TIKA-3929 -- add a crash option for PipesReporter
---
CHANGES.txt | 4 +++
.../apache/tika/pipes/CompositePipesReporter.java | 14 +++++++++
.../apache/tika/pipes/LoggingPipesReporter.java | 10 +++++++
.../java/org/apache/tika/pipes/PipesReporter.java | 24 +++++++++++++++
.../apache/tika/pipes/async/AsyncProcessor.java | 3 +-
.../org/apache/tika/pipes/async/AsyncStatus.java | 18 ++++++++---
.../org/apache/tika/pipes/async/MockReporter.java | 10 +++++++
.../reporters/fs/FileSystemStatusReporter.java | 35 ++++++++++++++++++++--
.../opensearch/OpenSearchPipesReporter.java | 10 +++++++
9 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f8c546d24..c7995493b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+Release 2.6.1 - ???
+
+ * Downgraded logging in PipesClient for each parse from info to debug.
+
Release 2.6.0 - 11/3/2022
* Add optional Siegfried detector (TIKA-3901).
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
index da34f3f98..4f78b6be8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
@@ -37,6 +37,20 @@ public class CompositePipesReporter extends PipesReporter implements Initializab
}
+ @Override
+ public void error(Throwable t) {
+ for (PipesReporter reporter : pipesReporters) {
+ reporter.error(t);
+ }
+ }
+
+ @Override
+ public void error(String msg) {
+ for (PipesReporter reporter : pipesReporters) {
+ reporter.error(msg);
+ }
+ }
+
@Field
public void setPipesReporters(List<PipesReporter> pipesReporters) {
this.pipesReporters = pipesReporters;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
index bf7eb45c3..5f00880ba 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
@@ -30,4 +30,14 @@ public class LoggingPipesReporter extends PipesReporter {
public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
LOGGER.debug("{} {} {}", t, result, elapsed);
}
+
+ @Override
+ public void error(Throwable t) {
+ LOGGER.error("pipes error", t);
+ }
+
+ @Override
+ public void error(String msg) {
+ LOGGER.error("error {}", msg);
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
index 32a7c61a6..18db3fe1d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
@@ -39,8 +39,20 @@ public abstract class PipesReporter implements Closeable {
public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
}
+
+ @Override
+ public void error(Throwable t) {
+
+ }
+
+ @Override
+ public void error(String msg) {
+
+ }
};
+ //Implementers are responsible for preventing reporting after
+ //crashes if that is the desired behavior.
public abstract void report(FetchEmitTuple t, PipesResult result, long elapsed);
@@ -69,4 +81,16 @@ public abstract class PipesReporter implements Closeable {
public void close() throws IOException {
//no-op
}
+
+ /**
+ * This is called if the process has crashed.
+ * Implementers should not rely on close() to be called after this.
+ * @param t
+ */
+ public abstract void error(Throwable t);
+ /**
+ * This is called if the process has crashed.
+ * Implementers should not rely on close() to be called after this.
+ * @param msg
+ */public abstract void error(String msg);
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 476e4df58..7a71f08c9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -117,7 +117,7 @@ public class AsyncProcessor implements Closeable {
}
} catch (Exception e) {
executorService.shutdownNow();
- asyncConfig.getPipesReporter().close();
+ asyncConfig.getPipesReporter().error(e);
throw e;
}
}
@@ -222,6 +222,7 @@ public class AsyncProcessor implements Closeable {
}
} catch (ExecutionException e) {
LOG.error("execution exception", e);
+ asyncConfig.getPipesReporter().error(e);
throw new RuntimeException(e);
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
index 30408f04a..46a58ff2b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
@@ -22,22 +22,24 @@ import java.util.Map;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.utils.StringUtils;
public class AsyncStatus {
public enum ASYNC_STATUS {
STARTED,
- COMPLETED
- //CRASHED TODO: need to figure out how to set this?
+ COMPLETED,
+ CRASHED
}
private final Instant started;
-
private Instant lastUpdate;
private TotalCountResult totalCountResult = new TotalCountResult(0, TotalCountResult.STATUS.NOT_COMPLETED);
private Map<PipesResult.STATUS, Long> statusCounts = new HashMap<>();
private ASYNC_STATUS asyncStatus = ASYNC_STATUS.STARTED;
+ private String crashMessage = StringUtils.EMPTY;
+
public AsyncStatus() {
started = Instant.now();
lastUpdate = started;
@@ -51,6 +53,10 @@ public class AsyncStatus {
this.asyncStatus = status;
}
+ public void updateCrash(String msg) {
+ this.crashMessage = msg;
+ }
+
public Instant getStarted() {
return started;
}
@@ -71,10 +77,14 @@ public class AsyncStatus {
return asyncStatus;
}
+ public String getCrashMessage() {
+ return crashMessage;
+ }
+
@Override
public String toString() {
return "AsyncStatus{" + "started=" + started + ", lastUpdate=" + lastUpdate +
", totalCountResult=" + totalCountResult + ", statusCounts=" + statusCounts +
- ", status=" + asyncStatus + '}';
+ ", asyncStatus=" + asyncStatus + ", crashMessage='" + crashMessage + '\'' + '}';
}
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index 112ace4c9..b8197bd82 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -30,6 +30,16 @@ public class MockReporter extends PipesReporter {
}
+ @Override
+ public void error(Throwable t) {
+
+ }
+
+ @Override
+ public void error(String msg) {
+
+ }
+
@Field
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
index 92c3d7675..b48745a6c 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
@@ -44,6 +44,7 @@ import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.async.AsyncStatus;
import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.utils.ExceptionUtils;
/**
* This is intended to write summary statistics to disk
@@ -67,6 +68,8 @@ public class FileSystemStatusReporter extends PipesReporter
private long reportUpdateMillis = 1000;
+ private volatile boolean crashed = false;
+
Thread reporterThread;
private ConcurrentHashMap<PipesResult.STATUS, LongAdder> counts = new ConcurrentHashMap<>();
private AsyncStatus asyncStatus = new AsyncStatus();
@@ -114,7 +117,15 @@ public class FileSystemStatusReporter extends PipesReporter
try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) {
objectMapper.writeValue(writer, asyncStatus);
} catch (IOException e) {
- e.printStackTrace();
+ LOG.warn("couldn't write report", e);
+ }
+ }
+
+ private synchronized void crash(String crashMessage) {
+ asyncStatus.updateCrash(crashMessage);
+ try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) {
+ objectMapper.writeValue(writer, asyncStatus);
+ } catch (IOException e) {
LOG.warn("couldn't write report", e);
}
}
@@ -137,13 +148,33 @@ public class FileSystemStatusReporter extends PipesReporter
@Override
public void close() throws IOException {
LOG.debug("finishing and writing last report");
+ interuptThread();
+ if (! crashed) {
+ report(AsyncStatus.ASYNC_STATUS.COMPLETED);
+ }
+ }
+
+ private void interuptThread() {
reporterThread.interrupt();
try {
reporterThread.join(1000);
} catch (InterruptedException e) {
//swallow
}
- report(AsyncStatus.ASYNC_STATUS.COMPLETED);
+ }
+
+ @Override
+ public void error(Throwable t) {
+ crashed = true;
+ interuptThread();
+ crash(ExceptionUtils.getStackTrace(t));
+ }
+
+ @Override
+ public void error(String msg) {
+ crashed = true;
+ interuptThread();
+ crash(msg);
}
@Override
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
index 8945678d6..7dbe13621 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
@@ -102,6 +102,16 @@ public class OpenSearchPipesReporter extends PipesReporter implements Initializa
}
}
+ @Override
+ public void error(Throwable t) {
+ LOG.error("crashed", t);
+ }
+
+ @Override
+ public void error(String msg) {
+ LOG.error("crashed {}", msg);
+ }
+
private boolean shouldReport(PipesResult result) {
if (includeStatus.size() > 0) {
if (includeStatus.contains(result.getStatus().name())) {