You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 14:59:59 UTC

[tika] branch main updated (0ccc3bf5e -> 46901964b)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


    from 0ccc3bf5e TIKA-3917: update spring
     new c8b4c697c downgrade logging from info -> debug in PipesClient
     new 95445d6dd Merge remote-tracking branch 'origin/main'
     new 46901964b TIKA-3929 -- add a crash option for PipesReporter

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  4 +++
 .../apache/tika/pipes/CompositePipesReporter.java  | 14 +++++++++
 .../apache/tika/pipes/LoggingPipesReporter.java    | 10 +++++++
 .../java/org/apache/tika/pipes/PipesClient.java    |  4 +--
 .../java/org/apache/tika/pipes/PipesReporter.java  | 24 +++++++++++++++
 .../apache/tika/pipes/async/AsyncProcessor.java    |  3 +-
 .../org/apache/tika/pipes/async/AsyncStatus.java   | 18 ++++++++---
 .../org/apache/tika/pipes/async/MockReporter.java  | 10 +++++++
 .../reporters/fs/FileSystemStatusReporter.java     | 35 ++++++++++++++++++++--
 .../opensearch/OpenSearchPipesReporter.java        | 10 +++++++
 10 files changed, 123 insertions(+), 9 deletions(-)


[tika] 01/03: downgrade logging from info -> debug in PipesClient

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit c8b4c697c9e3098720af7fe8220d14155cc48175
Author: tballison <ta...@apache.org>
AuthorDate: Wed Nov 16 17:13:35 2022 -0500

    downgrade logging from info -> debug in PipesClient
---
 tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 137388102..6068d519a 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -291,13 +291,13 @@ public class PipesClient implements Closeable {
                 return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
             case PARSE_SUCCESS:
                 //there may have been a parse exception, but the parse didn't crash
-                LOG.info("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
+                LOG.debug("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
                         millis);
                 return deserializeEmitData();
             case PARSE_EXCEPTION_NO_EMIT:
                 return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
             case EMIT_SUCCESS:
-                LOG.info("pipesClientId={} emit success: {} in {} ms", pipesClientId, t.getId(),
+                LOG.debug("pipesClientId={} emit success: {} in {} ms", pipesClientId, t.getId(),
                         millis);
                 return PipesResult.EMIT_SUCCESS;
             case EMIT_SUCCESS_PARSE_EXCEPTION:


[tika] 02/03: Merge remote-tracking branch 'origin/main'

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 95445d6dd699ab9e426db233c48baf2b711743de
Merge: c8b4c697c 0ccc3bf5e
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 09:59:29 2022 -0500

    Merge remote-tracking branch 'origin/main'

 tika-parent/pom.xml | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)


[tika] 03/03: TIKA-3929 -- add a crash option for PipesReporter

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 46901964b2221fd882b2c550e784f766f2b726bd
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 09:59:46 2022 -0500

    TIKA-3929 -- add a crash option for PipesReporter
---
 CHANGES.txt                                        |  4 +++
 .../apache/tika/pipes/CompositePipesReporter.java  | 14 +++++++++
 .../apache/tika/pipes/LoggingPipesReporter.java    | 10 +++++++
 .../java/org/apache/tika/pipes/PipesReporter.java  | 24 +++++++++++++++
 .../apache/tika/pipes/async/AsyncProcessor.java    |  3 +-
 .../org/apache/tika/pipes/async/AsyncStatus.java   | 18 ++++++++---
 .../org/apache/tika/pipes/async/MockReporter.java  | 10 +++++++
 .../reporters/fs/FileSystemStatusReporter.java     | 35 ++++++++++++++++++++--
 .../opensearch/OpenSearchPipesReporter.java        | 10 +++++++
 9 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f8c546d24..c7995493b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+Release 2.6.1 - ???
+
+   * Downgraded logging in PipesClient for each parse from info to debug.
+
 Release 2.6.0 - 11/3/2022
 
    * Add optional Siegfried detector (TIKA-3901).
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
index da34f3f98..4f78b6be8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
@@ -37,6 +37,20 @@ public class CompositePipesReporter extends PipesReporter implements Initializab
 
     }
 
+    @Override
+    public void error(Throwable t) {
+        for (PipesReporter reporter : pipesReporters) {
+            reporter.error(t);
+        }
+    }
+
+    @Override
+    public void error(String msg) {
+        for (PipesReporter reporter : pipesReporters) {
+            reporter.error(msg);
+        }
+    }
+
     @Field
     public void setPipesReporters(List<PipesReporter> pipesReporters) {
         this.pipesReporters = pipesReporters;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
index bf7eb45c3..5f00880ba 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java
@@ -30,4 +30,14 @@ public class LoggingPipesReporter extends PipesReporter {
     public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
         LOGGER.debug("{} {} {}", t, result, elapsed);
     }
+
+    @Override
+    public void error(Throwable t) {
+        LOGGER.error("pipes error", t);
+    }
+
+    @Override
+    public void error(String msg) {
+        LOGGER.error("error {}", msg);
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
index 32a7c61a6..18db3fe1d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
@@ -39,8 +39,20 @@ public abstract class PipesReporter implements Closeable {
         public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
 
         }
+
+        @Override
+        public void error(Throwable t) {
+
+        }
+
+        @Override
+        public void error(String msg) {
+
+        }
     };
 
+    //Implementers are responsible for preventing reporting after
+    //crashes if that is the desired behavior.
     public abstract void report(FetchEmitTuple t, PipesResult result, long elapsed);
 
 
@@ -69,4 +81,16 @@ public abstract class PipesReporter implements Closeable {
     public void close() throws IOException {
         //no-op
     }
+
+    /**
+     * This is called if the process has crashed.
+     * Implementers should not rely on close() to be called after this.
+     * @param t
+     */
+    public abstract void error(Throwable t);
+    /**
+     * This is called if the process has crashed.
+     * Implementers should not rely on close() to be called after this.
+     * @param msg
+     */public abstract void error(String msg);
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 476e4df58..7a71f08c9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -117,7 +117,7 @@ public class AsyncProcessor implements Closeable {
             }
         } catch (Exception e) {
             executorService.shutdownNow();
-            asyncConfig.getPipesReporter().close();
+            asyncConfig.getPipesReporter().error(e);
             throw e;
         }
     }
@@ -222,6 +222,7 @@ public class AsyncProcessor implements Closeable {
                 }
             } catch (ExecutionException e) {
                 LOG.error("execution exception", e);
+                asyncConfig.getPipesReporter().error(e);
                 throw new RuntimeException(e);
             }
         }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
index 30408f04a..46a58ff2b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java
@@ -22,22 +22,24 @@ import java.util.Map;
 
 import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.utils.StringUtils;
 
 public class AsyncStatus {
 
     public enum ASYNC_STATUS {
         STARTED,
-        COMPLETED
-        //CRASHED TODO: need to figure out how to set this?
+        COMPLETED,
+        CRASHED
     }
     private final Instant started;
 
-
     private Instant lastUpdate;
     private TotalCountResult totalCountResult = new TotalCountResult(0, TotalCountResult.STATUS.NOT_COMPLETED);
     private Map<PipesResult.STATUS, Long> statusCounts = new HashMap<>();
     private ASYNC_STATUS asyncStatus = ASYNC_STATUS.STARTED;
 
+    private String crashMessage = StringUtils.EMPTY;
+
     public AsyncStatus() {
         started = Instant.now();
         lastUpdate = started;
@@ -51,6 +53,10 @@ public class AsyncStatus {
         this.asyncStatus = status;
     }
 
+    public void updateCrash(String msg) {
+        this.crashMessage = msg;
+    }
+
     public Instant getStarted() {
         return started;
     }
@@ -71,10 +77,14 @@ public class AsyncStatus {
         return asyncStatus;
     }
 
+    public String getCrashMessage() {
+        return crashMessage;
+    }
+
     @Override
     public String toString() {
         return "AsyncStatus{" + "started=" + started + ", lastUpdate=" + lastUpdate +
                 ", totalCountResult=" + totalCountResult + ", statusCounts=" + statusCounts +
-                ", status=" + asyncStatus + '}';
+                ", asyncStatus=" + asyncStatus + ", crashMessage='" + crashMessage + '\'' + '}';
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
index 112ace4c9..b8197bd82 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -30,6 +30,16 @@ public class MockReporter extends PipesReporter {
 
     }
 
+    @Override
+    public void error(Throwable t) {
+
+    }
+
+    @Override
+    public void error(String msg) {
+
+    }
+
     @Field
     public void setEndpoint(String endpoint) {
         this.endpoint = endpoint;
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
index 92c3d7675..b48745a6c 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java
@@ -44,6 +44,7 @@ import org.apache.tika.pipes.PipesReporter;
 import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.async.AsyncStatus;
 import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+import org.apache.tika.utils.ExceptionUtils;
 
 /**
  * This is intended to write summary statistics to disk
@@ -67,6 +68,8 @@ public class FileSystemStatusReporter extends PipesReporter
 
     private long reportUpdateMillis = 1000;
 
+    private volatile boolean crashed = false;
+
     Thread reporterThread;
     private ConcurrentHashMap<PipesResult.STATUS, LongAdder> counts = new ConcurrentHashMap<>();
     private AsyncStatus asyncStatus = new AsyncStatus();
@@ -114,7 +117,15 @@ public class FileSystemStatusReporter extends PipesReporter
         try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) {
             objectMapper.writeValue(writer, asyncStatus);
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.warn("couldn't write report", e);
+        }
+    }
+
+    private synchronized void crash(String crashMessage) {
+        asyncStatus.updateCrash(crashMessage);
+        try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) {
+            objectMapper.writeValue(writer, asyncStatus);
+        } catch (IOException e) {
             LOG.warn("couldn't write report", e);
         }
     }
@@ -137,13 +148,33 @@ public class FileSystemStatusReporter extends PipesReporter
     @Override
     public void close() throws IOException {
         LOG.debug("finishing and writing last report");
+        interuptThread();
+        if (! crashed) {
+            report(AsyncStatus.ASYNC_STATUS.COMPLETED);
+        }
+    }
+
+    private void interuptThread() {
         reporterThread.interrupt();
         try {
             reporterThread.join(1000);
         } catch (InterruptedException e) {
             //swallow
         }
-        report(AsyncStatus.ASYNC_STATUS.COMPLETED);
+    }
+
+    @Override
+    public void error(Throwable t) {
+        crashed = true;
+        interuptThread();
+        crash(ExceptionUtils.getStackTrace(t));
+    }
+
+    @Override
+    public void error(String msg) {
+        crashed = true;
+        interuptThread();
+        crash(msg);
     }
 
     @Override
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
index 8945678d6..7dbe13621 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
@@ -102,6 +102,16 @@ public class OpenSearchPipesReporter extends PipesReporter implements Initializa
         }
     }
 
+    @Override
+    public void error(Throwable t) {
+        LOG.error("crashed", t);
+    }
+
+    @Override
+    public void error(String msg) {
+        LOG.error("crashed {}", msg);
+    }
+
     private boolean shouldReport(PipesResult result) {
         if (includeStatus.size() > 0) {
             if (includeStatus.contains(result.getStatus().name())) {