You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/10/27 13:50:18 UTC
[tika] 03/03: TIKA-3586 -- fix race condition when starting
multiple servers on multiple ports
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 3eaf766b323b2a3d804268ae0e2bba6bd8e08117
Author: tballison <ta...@apache.org>
AuthorDate: Wed Oct 27 09:50:04 2021 -0400
TIKA-3586 -- fix race condition when starting multiple servers on multiple ports
---
CHANGES.txt | 3 +
.../tika/server/core/TikaServerWatchDog.java | 100 +++++++++++++++------
2 files changed, 78 insertions(+), 25 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 368faca..97cb1c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
Release 2.1.1 - ???
+ * Fix race condition when starting multiple forked
+ servers on multiple ports (TIKA-3586).
+
* Add timeout per task to be configured via headers
for tika-server's legacy endpoints /tika and /rmeta.
Note that this timeout greater than taskTimeoutMillis (TIKA-3582).
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
index 192f520..b23c6ba 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
@@ -39,7 +39,9 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -51,8 +53,8 @@ import org.apache.tika.utils.ProcessUtils;
public class TikaServerWatchDog implements Callable<WatchDogResult> {
private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class);
- private static Thread SHUTDOWN_HOOK = null;
-
+ private static Set<Process> PROCESSES = ConcurrentHashMap.newKeySet();
+ private static Set<ForkedProcess> FORKED_PROCESSES = ConcurrentHashMap.newKeySet();
private final int port;
private final String id;
private final TikaServerConfig tikaServerConfig;
@@ -64,6 +66,23 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
private volatile boolean shutDown = false;
+ static {
+ Thread shutdownHook = new Thread(() -> {
+ //prioritize destroying processes
+ for (Process process : PROCESSES) {
+ process.destroyForcibly();
+ }
+ //once that's done, try to clean up tmp files too
+ for (ForkedProcess forkedProcess : FORKED_PROCESSES) {
+ try {
+ forkedProcess.close();
+ } catch (DoNotRestartException | InterruptedException e) {
+ //swallow
+ }
+ }
+ });
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
TikaServerWatchDog(int port, String id, TikaServerConfig tikaServerConfig) {
this.port = port;
@@ -90,17 +109,20 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
gobbler.start();
}
- private static synchronized void destroyForkedForcibly(Process process) {
+ private static synchronized void destroyForkedForcibly(Process process)
+ throws InterruptedException {
+
process = process.destroyForcibly();
try {
boolean destroyed = process.waitFor(60, TimeUnit.SECONDS);
+
if (!destroyed) {
LOG.error("Forked process still alive after 60 seconds. " +
"Shutting down the forking process.");
System.exit(1);
}
- } catch (InterruptedException e) {
- //swallow
+ } finally {
+ PROCESSES.remove(process);
}
}
@@ -127,19 +149,19 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
if (exited) {
LOG.info("forked process exited with exit value {}",
forkedProcess.process.exitValue());
- forkedProcess.close();
+ closeForkedProcess(forkedProcess);
mustRestart = true;
} else {
ForkedStatus status = forkedProcess.readStatus();
if (status.status == FORKED_STATUS.FAILED_COMMUNICATION.ordinal()) {
LOG.info("failed to read from status file. Restarting now.");
- forkedProcess.close();
+ closeForkedProcess(forkedProcess);
mustRestart = true;
} else if (status.status == FORKED_STATUS.SHUTTING_DOWN.ordinal()) {
LOG.info("Forked process is in shutting down mode. Will wait a bit");
forkedProcess.process.waitFor(tikaServerConfig.getTaskTimeoutMillis(),
TimeUnit.MILLISECONDS);
- forkedProcess.close();
+ closeForkedProcess(forkedProcess);
mustRestart = true;
} else {
long elapsed = Duration.between(Instant.ofEpochMilli(status.timestamp),
@@ -149,7 +171,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
"{} ms have elapsed since forked process " +
"last updated status. " +
"Shutting down and restarting.", elapsed);
- forkedProcess.close();
+ closeForkedProcess(forkedProcess);
mustRestart = true;
}
}
@@ -163,11 +185,20 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
LOG.debug("about to shutdown");
if (forkedProcess != null) {
LOG.info("about to shutdown process");
- forkedProcess.close();
+ closeForkedProcess(forkedProcess);
}
}
}
+ private static void closeForkedProcess(ForkedProcess forkedProcess)
+ throws DoNotRestartException, InterruptedException {
+ try {
+ forkedProcess.close();
+ } finally {
+ FORKED_PROCESSES.remove(forkedProcess);
+ }
+ }
+
private ForkedProcess startForkedProcess(int restarts) throws Exception {
int consecutiveRestarts = 0;
//if there's a bind exception, retry for 5 seconds to give the OS
@@ -175,7 +206,9 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
int maxBind = 5;
while (consecutiveRestarts < maxBind) {
try {
- return new ForkedProcess(restarts);
+ ForkedProcess forkedProcess = new ForkedProcess(restarts);
+ FORKED_PROCESSES.add(forkedProcess);
+ return forkedProcess;
} catch (BindException e) {
LOG.warn("WatchDog observes bind exception on retry {}. " +
"Will retry {} times.", consecutiveRestarts, maxBind);
@@ -186,7 +219,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
}
}
}
- throw new RuntimeException("Couldn't start child process");
+ throw new RuntimeException("Couldn't start forked process");
}
@@ -307,7 +340,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
return new ForkedStatus(-1, FORKED_STATUS.FAILED_COMMUNICATION.ordinal(), -1);
}
- private void close() throws DoNotRestartException {
+ private void close() throws DoNotRestartException, InterruptedException {
try {
if (!process.isAlive()) {
@@ -374,25 +407,42 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
builder.environment().put("TIKA_CONFIG", tikaConfigEnv);
}
Process process = builder.start();
+ PROCESSES.add(process);
//redirect stdout to parent stderr to avoid error msgs
//from maven during build: Corrupted STDOUT by directly writing to
// native stream in forked
redirectIO(process.getInputStream(), System.err);
redirectIO(process.getErrorStream(), System.err);
- if (SHUTDOWN_HOOK != null) {
- Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
+ return process;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
}
- SHUTDOWN_HOOK = new Thread(() -> {
- try {
- this.close();
- } catch (DoNotRestartException e) {
- LOG.error("should not restart", e);
- shutDown();
- }
- });
- Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
- return process;
+ ForkedProcess that = (ForkedProcess) o;
+
+ if (!process.equals(that.process)) {
+ return false;
+ }
+ if (!forkedStatusFile.equals(that.forkedStatusFile)) {
+ return false;
+ }
+ return statusBuffer.equals(that.statusBuffer);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = process.hashCode();
+ result = 31 * result + forkedStatusFile.hashCode();
+ result = 31 * result + statusBuffer.hashCode();
+ return result;
}
}
+
}