You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 15:00:02 UTC

[tika] 03/03: TIKA-3929 -- add a crash option for PipesReporter

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 46901964b2221fd882b2c550e784f766f2b726bd
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 09:59:46 2022 -0500

    TIKA-3929 -- add a crash option for PipesReporter
---
 CHANGES.txt                                        |  4 +++
 .../apache/tika/pipes/CompositePipesReporter.java  | 14 +++++++++
 .../apache/tika/pipes/LoggingPipesReporter.java    | 10 +++++++
 .../java/org/apache/tika/pipes/PipesReporter.java  | 24 +++++++++++++++
 .../apache/tika/pipes/async/AsyncProcessor.java    |  3 +-
 .../org/apache/tika/pipes/async/AsyncStatus.java   | 18 ++++++++---
 .../org/apache/tika/pipes/async/MockReporter.java  | 10 +++++++
 .../reporters/fs/FileSystemStatusReporter.java     | 35 ++++++++++++++++++++--
 .../opensearch/OpenSearchPipesReporter.java        | 10 +++++++
 9 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f8c546d24..c7995493b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+Release 2.6.1 - ???
+
+   * Downgraded logging in PipesClient for each parse from info to debug.
+
 Release 2.6.0 - 11/3/2022
 
    * Add optional Siegfried detector (TIKA-3901).
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
index da34f3f98..4f78b6be8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
@@ -37,6 +37,20 @@ public class CompositePipesReporter extends PipesReporter implements Initializab
 
     }
 
+    @Override
+    public void error(Throwable t) {
+        for (PipesReporter reporter : pipesReporters) {
+            reporter.error(t);
+        }
+    }
+
+    @Override
+    public void error(String msg) {
+        for (PipesReporter reporter : pipesReporters) {
+            reporter.error(msg);
+        }
+    }
+
     @Field
     public void setPipesReporters(List<PipesReporter> pipesReporters) {
         this.pipesReporters = pipesReporters;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
index bf7eb45c3..5f00880ba 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
@@ -30,4 +30,14 @@ public class LoggingPipesReporter extends PipesReporter {
     public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
         LOGGER.debug("{} {} {}", t, result, elapsed);
     }
+
+    @Override
+    public void error(Throwable t) {
+        LOGGER.error("pipes error", t);
+    }
+
+    @Override
+    public void error(String msg) {
+        LOGGER.error("error {}", msg);
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
index 32a7c61a6..18db3fe1d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
@@ -39,8 +39,20 @@ public abstract class PipesReporter implements Closeable {
         public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
 
         }
+
+        @Override
+        public void error(Throwable t) {
+
+        }
+
+        @Override
+        public void error(String msg) {
+
+        }
     };
 
+    //Implementers are responsible for preventing reporting after
+    //crashes if that is the desired behavior.
     public abstract void report(FetchEmitTuple t, PipesResult result, long elapsed);
 
 
@@ -69,4 +81,16 @@ public abstract class PipesReporter implements Closeable {
     public void close() throws IOException {
         //no-op
     }
+
+    /**
+     * This is called if the process has crashed.
+     * Implementers should not rely on close() to be called after this.
+     * @param t
+     */
+    public abstract void error(Throwable t);
+    /**
+     * This is called if the process has crashed.
+     * Implementers should not rely on close() to be called after this.
+     * @param msg
+     */public abstract void error(String msg);
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 476e4df58..7a71f08c9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -117,7 +117,7 @@ public class AsyncProcessor implements Closeable {
             }
         } catch (Exception e) {
             executorService.shutdownNow();
-            asyncConfig.getPipesReporter().close();
+            asyncConfig.getPipesReporter().error(e);
             throw e;
         }
     }
@@ -222,6 +222,7 @@ public class AsyncProcessor implements Closeable {
                 }
             } catch (ExecutionException e) {
                 LOG.error("execution exception", e);
+                asyncConfig.getPipesReporter().error(e);
                 throw new RuntimeException(e);
             }
         }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
index 30408f04a..46a58ff2b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
@@ -22,22 +22,24 @@ import java.util.Map;
 
 import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.utils.StringUtils;
 
 public class AsyncStatus {
 
     public enum ASYNC_STATUS {
         STARTED,
-        COMPLETED
-        //CRASHED TODO: need to figure out how to set this?
+        COMPLETED,
+        CRASHED
     }
     private final Instant started;
 
-
     private Instant lastUpdate;
     private TotalCountResult totalCountResult = new TotalCountResult(0, TotalCountResult.STATUS.NOT_COMPLETED);
     private Map<PipesResult.STATUS, Long> statusCounts = new HashMap<>();
     private ASYNC_STATUS asyncStatus = ASYNC_STATUS.STARTED;
 
+    private String crashMessage = StringUtils.EMPTY;
+
     public AsyncStatus() {
         started = Instant.now();
         lastUpdate = started;
@@ -51,6 +53,10 @@ public class AsyncStatus {
         this.asyncStatus = status;
     }
 
+    public void updateCrash(String msg) {
+        this.crashMessage = msg;
+    }
+
     public Instant getStarted() {
         return started;
     }
@@ -71,10 +77,14 @@ public class AsyncStatus {
         return asyncStatus;
     }
 
+    public String getCrashMessage() {
+        return crashMessage;
+    }
+
     @Override
     public String toString() {
         return "AsyncStatus{" + "started=" + started + ", lastUpdate=" + lastUpdate +
                 ", totalCountResult=" + totalCountResult + ", statusCounts=" + statusCounts +
-                ", status=" + asyncStatus + '}';
+                ", asyncStatus=" + asyncStatus + ", crashMessage='" + crashMessage + '\'' + '}';
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index 112ace4c9..b8197bd82 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -30,6 +30,16 @@ public class MockReporter extends PipesReporter {
 
     }
 
+    @Override
+    public void error(Throwable t) {
+
+    }
+
+    @Override
+    public void error(String msg) {
+
+    }
+
     @Field
     public void setEndpoint(String endpoint) {
         this.endpoint = endpoint;
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
index 92c3d7675..b48745a6c 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
@@ -44,6 +44,7 @@ import org.apache.tika.pipes.PipesReporter;
 import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.async.AsyncStatus;
 import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.utils.ExceptionUtils;
 
 /**
  * This is intended to write summary statistics to disk
@@ -67,6 +68,8 @@ public class FileSystemStatusReporter extends PipesReporter
 
     private long reportUpdateMillis = 1000;
 
+    private volatile boolean crashed = false;
+
     Thread reporterThread;
     private ConcurrentHashMap<PipesResult.STATUS, LongAdder> counts = new ConcurrentHashMap<>();
     private AsyncStatus asyncStatus = new AsyncStatus();
@@ -114,7 +117,15 @@ public class FileSystemStatusReporter extends PipesReporter
         try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) {
             objectMapper.writeValue(writer, asyncStatus);
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.warn("couldn't write report", e);
+        }
+    }
+
+    private synchronized void crash(String crashMessage) {
+        asyncStatus.updateCrash(crashMessage);
+        try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) {
+            objectMapper.writeValue(writer, asyncStatus);
+        } catch (IOException e) {
             LOG.warn("couldn't write report", e);
         }
     }
@@ -137,13 +148,33 @@ public class FileSystemStatusReporter extends PipesReporter
     @Override
     public void close() throws IOException {
         LOG.debug("finishing and writing last report");
+        interuptThread();
+        if (! crashed) {
+            report(AsyncStatus.ASYNC_STATUS.COMPLETED);
+        }
+    }
+
+    private void interuptThread() {
         reporterThread.interrupt();
         try {
             reporterThread.join(1000);
         } catch (InterruptedException e) {
             //swallow
         }
-        report(AsyncStatus.ASYNC_STATUS.COMPLETED);
+    }
+
+    @Override
+    public void error(Throwable t) {
+        crashed = true;
+        interuptThread();
+        crash(ExceptionUtils.getStackTrace(t));
+    }
+
+    @Override
+    public void error(String msg) {
+        crashed = true;
+        interuptThread();
+        crash(msg);
     }
 
     @Override
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
index 8945678d6..7dbe13621 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
@@ -102,6 +102,16 @@ public class OpenSearchPipesReporter extends PipesReporter implements Initializa
         }
     }
 
+    @Override
+    public void error(Throwable t) {
+        LOG.error("crashed", t);
+    }
+
+    @Override
+    public void error(String msg) {
+        LOG.error("crashed {}", msg);
+    }
+
     private boolean shouldReport(PipesResult result) {
         if (includeStatus.size() > 0) {
             if (includeStatus.contains(result.getStatus().name())) {