You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/06/10 14:54:06 UTC

[tika] branch branch_1x updated: TIKA-3441 -- improve likelihood that tesseract processes will be shutdown on jvm restart.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch branch_1x
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/branch_1x by this push:
     new 1224f88  TIKA-3441 -- improve likelihood that tesseract processes will be shutdown on jvm restart.
1224f88 is described below

commit 1224f881a1dc54282281cdf2fc9ecf3f3e429393
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jun 10 10:52:56 2021 -0400

    TIKA-3441 -- improve likelihood that tesseract processes will be shutdown on jvm restart.
---
 CHANGES.txt                                        |  6 +++
 .../tika/parser/AbstractExternalProcessParser.java | 57 ++++++++++++++++++++++
 .../apache/tika/parser/ocr/TesseractOCRParser.java |  8 ++-
 .../org/apache/tika/server/TikaServerWatchDog.java | 19 +++++---
 .../tika/server/TikaServerIntegrationTest.java     | 55 +++++++++++++++++++++
 5 files changed, 138 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d0df2e5..ac566ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,11 @@
 Release 1.27 - ???
 
+   * Prevent rare infinite loop in tika-server's -spawnChild mode
+     when restart fails because of failure to bind to the port (TIKA-3441).
+
+   * Improve likelihood that tesseract will not be orphaned on
+     jvm restart in tika-server (TIKA-3441).
+
    * Deprecate experimental PDFPreflightParser (TIKA-3437).
 
    * Apply encoding detection to zip entry names via Ryan421 (TIKA-3374).
diff --git a/tika-core/src/main/java/org/apache/tika/parser/AbstractExternalProcessParser.java b/tika-core/src/main/java/org/apache/tika/parser/AbstractExternalProcessParser.java
new file mode 100644
index 0000000..090c91a
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/parser/AbstractExternalProcessParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.parser;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstract base class for parsers that call external processes.  This
+ * adds one more layer of 'hope' that processes won't be orphaned if
+ * the jvm has to be restarted.  This does not guarantee that the
+ * processes won't be orphaned in case of, e.g. kill -9, but this
+ * increases the chances that under normal circumstances or if the jvm
+ * itself exits, that external processes won't be orphaned.
+ *
+ * @since Apache Tika 1.27
+ */
+public abstract class AbstractExternalProcessParser extends AbstractParser {
+
+    /**
+     * Serial version UID.
+     */
+    private static final long serialVersionUID = 7186985395903074255L;
+
+    private static final ConcurrentHashMap<String, Process> PROCESS_MAP = new ConcurrentHashMap<>();
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            PROCESS_MAP.forEachValue(1, Process::destroyForcibly);
+        }));
+    }
+
+    protected String register(Process p) {
+        String id = UUID.randomUUID().toString();
+        PROCESS_MAP.put(id, p);
+        return id;
+    }
+
+    protected Process release(String id) {
+        return PROCESS_MAP.remove(id);
+    }
+}
+
diff --git a/tika-parsers/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java b/tika-parsers/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
index fa52248..5a3aa67 100644
--- a/tika-parsers/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
+++ b/tika-parsers/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
@@ -33,6 +33,7 @@ import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.mime.MediaType;
 import org.apache.tika.mime.MediaTypeRegistry;
+import org.apache.tika.parser.AbstractExternalProcessParser;
 import org.apache.tika.parser.AbstractParser;
 import org.apache.tika.parser.CompositeParser;
 import org.apache.tika.parser.ParseContext;
@@ -99,7 +100,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  *
  *
  */
-public class TesseractOCRParser extends AbstractParser implements Initializable {
+public class TesseractOCRParser extends AbstractExternalProcessParser implements Initializable {
     private static final Logger LOG = LoggerFactory.getLogger(TesseractOCRParser.class);
 
     private static volatile boolean HAS_WARNED = false;
@@ -531,13 +532,18 @@ public class TesseractOCRParser extends AbstractParser implements Initializable
         ProcessBuilder pb = new ProcessBuilder(cmd);
         setEnv(config, pb);
         Process process = null;
+        String id = null;
         try {
             process = pb.start();
+            id = register(process);
             runOCRProcess(process, config.getTimeout());
         } finally {
             if (process != null) {
                 process.destroyForcibly();
             }
+            if (id != null) {
+                release(id);
+            }
         }
     }
 
diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java
index 03b023c..10f9b04 100644
--- a/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java
+++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java
@@ -58,6 +58,9 @@ public class TikaServerWatchDog {
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class);
     private static final String DEFAULT_CHILD_STATUS_FILE_PREFIX = "tika-server-child-process-mmap-";
 
+    //this is the shutdown hook for the child (forked) process
+    private static Thread SHUTDOWN_HOOK = null;
+
     private Object[] childStatusLock = new Object[0];
     private volatile CHILD_STATUS childStatus = CHILD_STATUS.INITIALIZING;
     private volatile Instant lastPing = null;
@@ -251,7 +254,6 @@ public class TikaServerWatchDog {
     }
 
     private class ChildProcess {
-        private Thread SHUTDOWN_HOOK = null;
 
         private final Process process;
         private final DataOutputStream toChild;
@@ -439,12 +441,12 @@ public class TikaServerWatchDog {
             //redirect stdout to parent stderr to avoid error msgs
             //from maven during build: Corrupted STDOUT by directly writing to native stream in forked
             redirectIO(process.getInputStream(), System.err);
-            if (SHUTDOWN_HOOK != null) {
-                Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
-            }
+            Thread oldHook = SHUTDOWN_HOOK;
             SHUTDOWN_HOOK = new Thread(() -> this.close());
             Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
-
+            if (oldHook != null) {
+                Runtime.getRuntime().removeShutdownHook(oldHook);
+            }
             return process;
         }
     }
@@ -470,9 +472,12 @@ public class TikaServerWatchDog {
     }
 
     private static synchronized void destroyChildForcibly(Process process) {
-        process = process.destroyForcibly();
 
         try {
+            //wait for process to stop itself -- this can help prevent
+            // orphaned processes, e.g.tesseract
+            process.waitFor(10, TimeUnit.SECONDS);
+            process = process.destroyForcibly();
             boolean destroyed = process.waitFor(60, TimeUnit.SECONDS);
             if (! destroyed) {
                 LOG.error("Child process still alive after 60 seconds. " +
@@ -490,6 +495,8 @@ public class TikaServerWatchDog {
         } catch (InterruptedException e) {
             LOG.warn("interrupted exception while trying to destroy the forked process");
             //swallow
+        } finally {
+            process.destroyForcibly();
         }
     }
 
diff --git a/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java
index abce3cf..78ea1fd 100644
--- a/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java
+++ b/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java
@@ -637,4 +637,59 @@ public class TikaServerIntegrationTest extends TikaTest {
         assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION));
         assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content"));
     }
+
+    /*
+      this is useful for manual tests that spawned tesseract processes are correctly cleaned up
+      fairly often. :(
+
+      java -jar tika-server-1.27-SNAPSHOT.jar -taskTimeoutMillis 20000 -spawnChild
+      -c tika-config-ocr-only.xml -p 9999 -JXmx256m
+
+
+    @Test
+    public void loadTest() throws Exception {
+        List<Thread> threads = new ArrayList<>();
+        //this should tie up tesseract for longer than -taskTimeoutMillis
+        Path largePDF = Paths.get("..../testPDF_childAttachments.pdf");
+        //this should cause an oom
+        Path largeDocx = Paths.get("..../mobydick.docx");
+        for (int t = 0; t < 6; t++) {
+            final int num = t;
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+
+                    Response response = null;
+                    for (int i = 0; i < 10000; i++) {
+                        try {
+                            if (num < 5) {
+                                response = WebClient.create(endPoint + META_PATH).accept("application/json")
+                                        .put(Files.newInputStream(largePDF));
+                            } else if ( num == 5) {
+                                Thread.sleep(8000);
+                                response = WebClient.create(endPoint + META_PATH).accept("application/json")
+                                        .put(Files.newInputStream(largeDocx));
+                            }
+
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException interruptedException) {
+                                interruptedException.printStackTrace();
+                            }
+                            //oom may or may not cause an exception depending
+                            //on the timing
+                        }
+                    }
+                }
+            });
+            threads.add(thread);
+            thread.start();
+        }
+        for (Thread t : threads) {
+            t.join();
+        }
+    }
+    */
 }