You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/10/27 13:50:18 UTC

[tika] 03/03: TIKA-3586 -- fix race condition when starting multiple servers on multiple ports

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 3eaf766b323b2a3d804268ae0e2bba6bd8e08117
Author: tballison <ta...@apache.org>
AuthorDate: Wed Oct 27 09:50:04 2021 -0400

    TIKA-3586 -- fix race condition when starting multiple servers on multiple ports
---
 CHANGES.txt                                        |   3 +
 .../tika/server/core/TikaServerWatchDog.java       | 100 +++++++++++++++------
 2 files changed, 78 insertions(+), 25 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 368faca..97cb1c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
 Release 2.1.1 - ???
 
+   * Fix race condition when starting multiple forked
+     servers on multiple ports (TIKA-3586).
+
    * Add timeout per task to be configured via headers
      for tika-server's legacy endpoints /tika and /rmeta.
      Note that this timeout greater than taskTimeoutMillis (TIKA-3582).
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
index 192f520..b23c6ba 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
@@ -39,7 +39,9 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -51,8 +53,8 @@ import org.apache.tika.utils.ProcessUtils;
 public class TikaServerWatchDog implements Callable<WatchDogResult> {
 
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class);
-    private static Thread SHUTDOWN_HOOK = null;
-
+    private static Set<Process> PROCESSES = ConcurrentHashMap.newKeySet();
+    private static Set<ForkedProcess> FORKED_PROCESSES = ConcurrentHashMap.newKeySet();
     private final int port;
     private final String id;
     private final TikaServerConfig tikaServerConfig;
@@ -64,6 +66,23 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
     private volatile boolean shutDown = false;
 
 
+    static {
+        Thread shutdownHook = new Thread(() -> {
+            //prioritize destroying processes
+            for (Process process : PROCESSES) {
+                process.destroyForcibly();
+            }
+            //once that's done, try to clean up tmp files too
+            for (ForkedProcess forkedProcess : FORKED_PROCESSES) {
+                try {
+                    forkedProcess.close();
+                } catch (DoNotRestartException | InterruptedException e) {
+                    //swallow
+                }
+            }
+        });
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
 
     TikaServerWatchDog(int port, String id, TikaServerConfig tikaServerConfig) {
         this.port = port;
@@ -90,17 +109,20 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         gobbler.start();
     }
 
-    private static synchronized void destroyForkedForcibly(Process process) {
+    private static synchronized void destroyForkedForcibly(Process process)
+            throws InterruptedException {
+
         process = process.destroyForcibly();
         try {
             boolean destroyed = process.waitFor(60, TimeUnit.SECONDS);
+
             if (!destroyed) {
                 LOG.error("Forked process still alive after 60 seconds. " +
                         "Shutting down the forking process.");
                 System.exit(1);
             }
-        } catch (InterruptedException e) {
-            //swallow
+        } finally {
+            PROCESSES.remove(process);
         }
     }
 
@@ -127,19 +149,19 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                     if (exited) {
                         LOG.info("forked process exited with exit value {}",
                                 forkedProcess.process.exitValue());
-                        forkedProcess.close();
+                        closeForkedProcess(forkedProcess);
                         mustRestart = true;
                     } else {
                         ForkedStatus status = forkedProcess.readStatus();
                         if (status.status == FORKED_STATUS.FAILED_COMMUNICATION.ordinal()) {
                             LOG.info("failed to read from status file. Restarting now.");
-                            forkedProcess.close();
+                            closeForkedProcess(forkedProcess);
                             mustRestart = true;
                         } else if (status.status == FORKED_STATUS.SHUTTING_DOWN.ordinal()) {
                             LOG.info("Forked process is in shutting down mode.  Will wait a bit");
                             forkedProcess.process.waitFor(tikaServerConfig.getTaskTimeoutMillis(),
                                     TimeUnit.MILLISECONDS);
-                            forkedProcess.close();
+                            closeForkedProcess(forkedProcess);
                             mustRestart = true;
                         } else {
                             long elapsed = Duration.between(Instant.ofEpochMilli(status.timestamp),
@@ -149,7 +171,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                                         "{} ms have elapsed since forked process " +
                                                 "last updated status. " +
                                                 "Shutting down and restarting.", elapsed);
-                                forkedProcess.close();
+                                closeForkedProcess(forkedProcess);
                                 mustRestart = true;
                             }
                         }
@@ -163,11 +185,20 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
             LOG.debug("about to shutdown");
             if (forkedProcess != null) {
                 LOG.info("about to shutdown process");
-                forkedProcess.close();
+                closeForkedProcess(forkedProcess);
             }
         }
     }
 
+    private static void closeForkedProcess(ForkedProcess forkedProcess)
+            throws DoNotRestartException, InterruptedException {
+        try {
+            forkedProcess.close();
+        } finally {
+            FORKED_PROCESSES.remove(forkedProcess);
+        }
+    }
+
     private ForkedProcess startForkedProcess(int restarts) throws Exception {
         int consecutiveRestarts = 0;
         //if there's a bind exception, retry for 5 seconds to give the OS
@@ -175,7 +206,9 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         int maxBind = 5;
         while (consecutiveRestarts < maxBind) {
             try {
-                return new ForkedProcess(restarts);
+                ForkedProcess forkedProcess = new ForkedProcess(restarts);
+                FORKED_PROCESSES.add(forkedProcess);
+                return forkedProcess;
             } catch (BindException e) {
                 LOG.warn("WatchDog observes bind exception on retry {}. " +
                         "Will retry {} times.", consecutiveRestarts, maxBind);
@@ -186,7 +219,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                 }
             }
         }
-        throw new RuntimeException("Couldn't start child process");
+        throw new RuntimeException("Couldn't start forked process");
     }
 
 
@@ -307,7 +340,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
             return new ForkedStatus(-1, FORKED_STATUS.FAILED_COMMUNICATION.ordinal(), -1);
         }
 
-        private void close() throws DoNotRestartException {
+        private void close() throws DoNotRestartException, InterruptedException {
 
             try {
                 if (!process.isAlive()) {
@@ -374,25 +407,42 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                 builder.environment().put("TIKA_CONFIG", tikaConfigEnv);
             }
             Process process = builder.start();
+            PROCESSES.add(process);
             //redirect stdout to parent stderr to avoid error msgs
             //from maven during build: Corrupted STDOUT by directly writing to
             // native stream in forked
             redirectIO(process.getInputStream(), System.err);
             redirectIO(process.getErrorStream(), System.err);
-            if (SHUTDOWN_HOOK != null) {
-                Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
+            return process;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
             }
-            SHUTDOWN_HOOK = new Thread(() -> {
-                try {
-                    this.close();
-                } catch (DoNotRestartException e) {
-                    LOG.error("should not restart", e);
-                    shutDown();
-                }
-            });
-            Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
 
-            return process;
+            ForkedProcess that = (ForkedProcess) o;
+
+            if (!process.equals(that.process)) {
+                return false;
+            }
+            if (!forkedStatusFile.equals(that.forkedStatusFile)) {
+                return false;
+            }
+            return statusBuffer.equals(that.statusBuffer);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = process.hashCode();
+            result = 31 * result + forkedStatusFile.hashCode();
+            result = 31 * result + statusBuffer.hashCode();
+            return result;
         }
     }
+
 }