You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/06/10 16:23:14 UTC

[tika] branch main updated: TIKA-3441 -- increase chances that TesseractOCRParser will not orphan tesseract process and ensure that tika-server never enters an infinite loop on a bind exception.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 51fb33f  TIKA-3441 -- increase chances that TesseractOCRParser will not orphan tesseract process and ensure that tika-server never enters an infinite loop on a bind exception.
51fb33f is described below

commit 51fb33f3067bda551905056110bcf251d4bb3d23
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jun 10 12:22:52 2021 -0400

    TIKA-3441 -- increase chances that TesseractOCRParser will not orphan tesseract process and ensure that tika-server never enters an infinite loop on a bind exception.
---
 .../tika/parser/AbstractExternalProcessParser.java | 57 ++++++++++++++++++++++
 .../java/org/apache/tika/pipes/PipesClient.java    |  4 +-
 .../apache/tika/parser/ocr/TesseractOCRParser.java |  9 +++-
 .../apache/tika/server/core/TikaServerProcess.java | 45 ++++++++++++-----
 .../tika/server/core/TikaServerWatchDog.java       | 36 ++++++++++++--
 5 files changed, 132 insertions(+), 19 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/parser/AbstractExternalProcessParser.java b/tika-core/src/main/java/org/apache/tika/parser/AbstractExternalProcessParser.java
new file mode 100644
index 0000000..090c91a
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/parser/AbstractExternalProcessParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.parser;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstract base class for parsers that call external processes.  This
+ * adds one more layer of 'hope' that processes won't be orphaned if
+ * the jvm has to be restarted.  This does not guarantee that the
+ * processes won't be orphaned in case of, e.g. kill -9, but this
+ * increases the chances that under normal circumstances or if the jvm
+ * itself exits, that external processes won't be orphaned.
+ *
+ * @since Apache Tika 1.27
+ */
+public abstract class AbstractExternalProcessParser extends AbstractParser {
+
+    /**
+     * Serial version UID.
+     */
+    private static final long serialVersionUID = 7186985395903074255L;
+
+    private static final ConcurrentHashMap<String, Process> PROCESS_MAP = new ConcurrentHashMap<>();
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            PROCESS_MAP.forEachValue(1, Process::destroyForcibly);
+        }));
+    }
+
+    protected String register(Process p) {
+        String id = UUID.randomUUID().toString();
+        PROCESS_MAP.put(id, p);
+        return id;
+    }
+
+    protected Process release(String id) {
+        return PROCESS_MAP.remove(id);
+    }
+}
+
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 330e5db..c4551a3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -290,7 +290,9 @@ public class PipesClient implements Closeable {
             commandLine.add("-Djava.awt.headless=true");
         }
         if (! hasExitOnOOM) {
-            //warn
+            LOG.warn("I notice that you have an exit/crash on OOM. If you run heavy external processes " +
+                    "like tesseract, this setting may result in orphaned processes which could be disastrous" +
+                    " for performance.");
         }
         if (! hasLog4j) {
             commandLine.add("-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
diff --git a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
index 8cb9b20..d32d470 100644
--- a/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
+++ b/tika-parsers/tika-parsers-standard/tika-parsers-standard-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
@@ -65,7 +65,7 @@ import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.Property;
 import org.apache.tika.mime.MediaType;
-import org.apache.tika.parser.AbstractParser;
+import org.apache.tika.parser.AbstractExternalProcessParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.external.ExternalParser;
 import org.apache.tika.sax.OfflineContentHandler;
@@ -85,7 +85,7 @@ import org.apache.tika.utils.XMLReaderUtils;
  * parseContext.set(TesseractOCRConfig.class, config);<br>
  * </p>
  */
-public class TesseractOCRParser extends AbstractParser implements Initializable {
+public class TesseractOCRParser extends AbstractExternalProcessParser implements Initializable {
 
     public static final String TESS_META = "tess:";
     public static final Property IMAGE_ROTATION = Property.externalRealSeq(TESS_META + "rotation");
@@ -342,13 +342,18 @@ public class TesseractOCRParser extends AbstractParser implements Initializable
         setEnv(pb);
 
         Process process = null;
+        String id = null;
         try {
             process = pb.start();
+            id = register(process);
             runOCRProcess(process, config.getTimeoutSeconds());
         } finally {
             if (process != null) {
                 process.destroyForcibly();
             }
+            if (id != null) {
+                release(id);
+            }
         }
     }
 
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index c01c046..2321686 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -20,6 +20,7 @@ package org.apache.tika.server.core;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.BindException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -87,6 +88,8 @@ public class TikaServerProcess {
     public static final Set<String> LOG_LEVELS = new HashSet<>(Arrays.asList("debug", "info"));
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerProcess.class);
     public static int DO_NOT_RESTART_EXIT_VALUE = -100;
+    public static final int BIND_EXCEPTION = 42;
+
 
     private static Options getOptions() {
         Options options = new Options();
@@ -120,7 +123,7 @@ public class TikaServerProcess {
             LOG.debug("forked config: {}", tikaServerConfig);
 
             ServerDetails serverDetails = initServer(tikaServerConfig);
-            startServer(serverDetails);
+            startServer(serverDetails, tikaServerConfig);
 
         } catch (Exception e) {
             LOG.error("Can't start: ", e);
@@ -128,15 +131,41 @@ public class TikaServerProcess {
         }
     }
 
-    private static void startServer(ServerDetails serverDetails) throws Exception {
+    private static boolean isBindException(Throwable e) {
+        if (e == null) {
+            return false;
+        }
+        if (e instanceof BindException) {
+            return true;
+        }
+        return isBindException(e.getCause());
+    }
+
+    private static void startServer(ServerDetails serverDetails, TikaServerConfig tikaServerConfig) throws Exception {
 
         try {
             //start the server
             Server server = serverDetails.sf.create();
         } catch (ServiceConstructionException e) {
             LOG.warn("exception starting server", e);
+            if (isBindException(e)) {
+                System.exit(BIND_EXCEPTION);
+            }
             System.exit(DO_NOT_RESTART_EXIT_VALUE);
         }
+
+        if (! tikaServerConfig.isNoFork()) {
+            //redirect
+            InputStream in = System.in;
+            System.setIn(new ByteArrayInputStream(new byte[0]));
+
+            String forkedStatusFile = tikaServerConfig.getForkedStatusFile();
+            Thread serverThread = new Thread(new ServerStatusWatcher(serverDetails.serverStatus, in,
+                    Paths.get(forkedStatusFile), tikaServerConfig));
+
+            serverThread.start();
+        }
+
         LOG.info("Started Apache Tika server {} at {}", serverDetails.serverId, serverDetails.url);
     }
 
@@ -199,17 +228,7 @@ public class TikaServerProcess {
             serverStatus = new ServerStatus(serverId, 0, true);
         } else {
             serverStatus = new ServerStatus(serverId, tikaServerConfig.getNumRestarts(), false);
-            //redirect!!!
-            InputStream in = System.in;
-            System.setIn(new ByteArrayInputStream(new byte[0]));
             System.setOut(System.err);
-
-            String forkedStatusFile = tikaServerConfig.getForkedStatusFile();
-            Thread serverThread = new Thread(
-                    new ServerStatusWatcher(serverStatus, in, Paths.get(forkedStatusFile),
-                            tikaServerConfig));
-
-            serverThread.start();
         }
         TikaResource.init(tika, tikaServerConfig, digester, inputStreamFactory, serverStatus);
         JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
@@ -240,6 +259,7 @@ public class TikaServerProcess {
         details.sf = sf;
         details.url = url;
         details.serverId = serverId;
+        details.serverStatus = serverStatus;
         return details;
     }
 
@@ -467,5 +487,6 @@ public class TikaServerProcess {
         JAXRSServerFactoryBean sf;
         String serverId;
         String url;
+        ServerStatus serverStatus;
     }
 }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
index 4d85021..192f520 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
+import java.net.BindException;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -50,8 +51,8 @@ import org.apache.tika.utils.ProcessUtils;
 public class TikaServerWatchDog implements Callable<WatchDogResult> {
 
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class);
-    private static final String DEFAULT_FORKED_STATUS_FILE_PREFIX =
-            "tika-server-forked-process-mmap-";
+    private static Thread SHUTDOWN_HOOK = null;
+
     private final int port;
     private final String id;
     private final TikaServerConfig tikaServerConfig;
@@ -61,6 +62,9 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
     private ForkedProcess forkedProcess = null;
     private int restarts = 0;
     private volatile boolean shutDown = false;
+
+
+
     TikaServerWatchDog(int port, String id, TikaServerConfig tikaServerConfig) {
         this.port = port;
         this.id = id;
@@ -114,7 +118,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
 
                 try {
                     if (mustRestart) {
-                        forkedProcess = new ForkedProcess(restarts++);
+                        forkedProcess = startForkedProcess(restarts++);
                         setForkedStatus(FORKED_STATUS.RUNNING);
                         mustRestart = false;
                     }
@@ -164,6 +168,28 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         }
     }
 
+    private ForkedProcess startForkedProcess(int restarts) throws Exception {
+        int consecutiveRestarts = 0;
+        //if there's a bind exception, retry for 5 seconds to give the OS
+        //a chance to release the port
+        int maxBind = 5;
+        while (consecutiveRestarts < maxBind) {
+            try {
+                return new ForkedProcess(restarts);
+            } catch (BindException e) {
+                LOG.warn("WatchDog observes bind exception on retry {}. " +
+                        "Will retry {} times.", consecutiveRestarts, maxBind);
+                consecutiveRestarts++;
+                Thread.sleep(1000);
+                if (consecutiveRestarts > maxBind) {
+                    throw e;
+                }
+            }
+        }
+        throw new RuntimeException("Couldn't start child process");
+    }
+
+
     public void shutDown() {
         shutDown = true;
     }
@@ -212,7 +238,6 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         //        private final DataOutputStream toForked;
         private final Path forkedStatusFile;
         private final ByteBuffer statusBuffer = ByteBuffer.allocate(16);
-        private Thread SHUTDOWN_HOOK = null;
 
         private ForkedProcess(int numRestarts) throws Exception {
             String prefix = tikaServerConfig.getTempFilePrefix();
@@ -242,6 +267,9 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
             }
             if (!process.isAlive()) {
                 close();
+                if (process.exitValue() == TikaServerProcess.BIND_EXCEPTION) {
+                    throw new BindException("couldn't bind");
+                }
                 throw new RuntimeException("Failed to start forked process -- forked is not alive");
             }
             if (!Files.exists(forkedStatusFile)) {