You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/11/19 19:50:07 UTC
[tika] branch master updated: TIKA-2785 -- switch communication
from child to parent to a shared memory-mapped file in -spawnChild mode in
tika-server.
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/master by this push:
new a88b8ab TIKA-2785 -- switch communication from child to parent to a shared memory-mapped file in -spawnChild mode in tika-server.
a88b8ab is described below
commit a88b8ab64c75869c08701aad9607fc56020c2e1b
Author: TALLISON <ta...@apache.org>
AuthorDate: Mon Nov 19 14:49:53 2018 -0500
TIKA-2785 -- switch communication from child to parent to a shared memory-mapped file in -spawnChild mode in tika-server.
---
CHANGES.txt | 3 +
.../org/apache/tika/io/MappedBufferCleaner.java | 159 ++++++++++++
.../java/org/apache/tika/server/ServerStatus.java | 16 +-
.../apache/tika/server/ServerStatusWatcher.java | 61 +++--
.../org/apache/tika/server/ServerTimeouts.java | 20 ++
.../java/org/apache/tika/server/TikaServerCli.java | 26 +-
.../org/apache/tika/server/TikaServerWatchDog.java | 274 +++++++++++++--------
.../tika/server/TikaServerIntegrationTest.java | 67 +++--
.../src/test/resources/logging/log4j_child.xml | 8 +-
.../{heavy_hand_100.xml => heavy_hang_100.xml} | 2 +-
10 files changed, 473 insertions(+), 163 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 63b76e0..843b25b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Release 2.0.0 - ???
Release 1.20 - ???
+ * Switch child to parent communication to a shared memory-mapped
+ file in tika-server's -spawnChild mode.
+
* Fix bug in tika-server when run in legacy mode (not -spawnChild)
that caused it to return 503 on documents submitted after
it hit an OutOfMemoryError (TIKA-2776).
diff --git a/tika-core/src/main/java/org/apache/tika/io/MappedBufferCleaner.java b/tika-core/src/main/java/org/apache/tika/io/MappedBufferCleaner.java
new file mode 100644
index 0000000..1eda6d8
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/io/MappedBufferCleaner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.io;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Objects;
+import static java.lang.invoke.MethodHandles.Lookup;
+import static java.lang.invoke.MethodHandles.constant;
+import static java.lang.invoke.MethodHandles.dropArguments;
+import static java.lang.invoke.MethodHandles.filterReturnValue;
+import static java.lang.invoke.MethodHandles.guardWithTest;
+import static java.lang.invoke.MethodHandles.lookup;
+import static java.lang.invoke.MethodType.methodType;
+
+/**
+ * Copied/pasted from the Apache Lucene/Solr project.
+ */
+public class MappedBufferCleaner {
+ /** Reference to a BufferCleaner that does unmapping; {@code null} if not supported. */
+ private static final BufferCleaner CLEANER;
+
+ /**
+ * <code>true</code>, if this platform supports unmapping mmapped files.
+ */
+ public static final boolean UNMAP_SUPPORTED;
+
+ /**
+ * if {@link #UNMAP_SUPPORTED} is {@code false}, this contains the reason why unmapping is not supported.
+ */
+ public static final String UNMAP_NOT_SUPPORTED_REASON;
+
+ static {
+ final Object hack = AccessController.doPrivileged((PrivilegedAction<Object>) MappedBufferCleaner::unmapHackImpl);
+ if (hack instanceof BufferCleaner) {
+ CLEANER = (BufferCleaner) hack;
+ UNMAP_SUPPORTED = true;
+ UNMAP_NOT_SUPPORTED_REASON = null;
+ } else {
+ CLEANER = null;
+ UNMAP_SUPPORTED = false;
+ UNMAP_NOT_SUPPORTED_REASON = hack.toString();
+ }
+ }
+
+ /**
+ * If a cleaner is available, this buffer will be cleaned.
+ * Otherwise, this is a no-op.
+ *
+ * @param b buffer to clean; no-op if buffer is null
+ * @throws IOException
+ */
+ public static void freeBuffer(ByteBuffer b) throws IOException {
+ if (CLEANER != null && b != null) {
+ CLEANER.freeBuffer("", b);
+ }
+ }
+ //Copied/pasted from Lucene's MMapDirectory
+ private interface BufferCleaner {
+ void freeBuffer(String resourceDescription, ByteBuffer b) throws IOException;
+ }
+
+ //"Needs access to private APIs in DirectBuffer, sun.misc.Cleaner, and sun.misc.Unsafe to enable hack")
+ private static Object unmapHackImpl() {
+ final Lookup lookup = lookup();
+ try {
+ try {
+ // *** sun.misc.Unsafe unmapping (Java 9+) ***
+ final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
+ // first check if Unsafe has the right method, otherwise we can give up
+ // without doing any security critical stuff:
+ final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner",
+ methodType(void.class, ByteBuffer.class));
+ // fetch the unsafe instance and bind it to the virtual MH:
+ final Field f = unsafeClass.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ final Object theUnsafe = f.get(null);
+ return newBufferCleaner(ByteBuffer.class, unmapper.bindTo(theUnsafe));
+ } catch (SecurityException se) {
+ // rethrow to report errors correctly (we need to catch it here, as we also catch RuntimeException below!):
+ throw se;
+ } catch (ReflectiveOperationException | RuntimeException e) {
+ // *** sun.misc.Cleaner unmapping (Java 8) ***
+ final Class<?> directBufferClass = Class.forName("java.nio.DirectByteBuffer");
+
+ final Method m = directBufferClass.getMethod("cleaner");
+ m.setAccessible(true);
+ final MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
+ final Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
+
+ /* "Compile" a MH that basically is equivalent to the following code:
+ * void unmapper(ByteBuffer byteBuffer) {
+ * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
+ * if (Objects.nonNull(cleaner)) {
+ * cleaner.clean();
+ * } else {
+ * noop(cleaner); // the noop is needed because MethodHandles#guardWithTest always needs ELSE
+ * }
+ * }
+ */
+ final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
+ final MethodHandle nonNullTest = lookup.findStatic(Objects.class, "nonNull", methodType(boolean.class, Object.class))
+ .asType(methodType(boolean.class, cleanerClass));
+ final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
+ final MethodHandle unmapper = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop))
+ .asType(methodType(void.class, ByteBuffer.class));
+ return newBufferCleaner(directBufferClass, unmapper);
+ }
+ } catch (SecurityException se) {
+ return "Unmapping is not supported, because not all required permissions are given to the Tika JAR file: " + se +
+ " [Please grant at least the following permissions: RuntimePermission(\"accessClassInPackage.sun.misc\") " +
+ " and ReflectPermission(\"suppressAccessChecks\")]";
+ } catch (ReflectiveOperationException | RuntimeException e) {
+ return "Unmapping is not supported on this platform, because internal Java APIs are not compatible with this Lucene version: " + e;
+ }
+ }
+
+ private static BufferCleaner newBufferCleaner(final Class<?> unmappableBufferClass, final MethodHandle unmapper) {
+ assert Objects.equals(methodType(void.class, ByteBuffer.class), unmapper.type());
+ return (String resourceDescription, ByteBuffer buffer) -> {
+ if (!buffer.isDirect()) {
+ throw new IllegalArgumentException("unmapping only works with direct buffers");
+ }
+ if (!unmappableBufferClass.isInstance(buffer)) {
+ throw new IllegalArgumentException("buffer is not an instance of " + unmappableBufferClass.getName());
+ }
+ final Throwable error = AccessController.doPrivileged((PrivilegedAction<Throwable>) () -> {
+ try {
+ unmapper.invokeExact(buffer);
+ return null;
+ } catch (Throwable t) {
+ return t;
+ }
+ });
+ if (error != null) {
+ throw new IOException("Unable to unmap the mapped buffer: " + resourceDescription, error);
+ }
+ };
+ }
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
index f7843fb..255ce70 100644
--- a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
@@ -39,12 +39,14 @@ public class ServerStatus {
}
public enum STATUS {
- OPERATING(0),
- HIT_MAX(1),
- TIMEOUT(2),
- ERROR(3),
- PARENT_REQUESTED_SHUTDOWN(4),
- PARENT_EXCEPTION(5);
+ INITIALIZING(0),
+ OPERATING(1),
+ HIT_MAX(2),
+ TIMEOUT(3),
+ ERROR(4),
+ PARENT_REQUESTED_SHUTDOWN(5),
+ PARENT_EXCEPTION(6),
+ OFF(7);
private final int shutdownCode;
@@ -63,7 +65,7 @@ public class ServerStatus {
int getShutdownCode() {
return shutdownCode;
}
- byte getByte() { return (byte) shutdownCode;}
+ int getInt() { return shutdownCode;}
}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
index 8023e94..a00c1a5 100644
--- a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
@@ -21,9 +21,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
@@ -33,39 +36,38 @@ public class ServerStatusWatcher implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ServerStatusWatcher.class);
private final ServerStatus serverStatus;
private final DataInputStream fromParent;
- private final DataOutputStream toParent;
private final long maxFiles;
private final ServerTimeouts serverTimeouts;
+ private final FileChannel childStatusChannel;
+ private final MappedByteBuffer toParent;
private volatile Instant lastPing = null;
public ServerStatusWatcher(ServerStatus serverStatus,
- InputStream inputStream, OutputStream outputStream,
+ InputStream inputStream, Path childStatusFile,
long maxFiles,
- ServerTimeouts serverTimeouts) {
+ ServerTimeouts serverTimeouts) throws IOException {
this.serverStatus = serverStatus;
this.maxFiles = maxFiles;
this.serverTimeouts = serverTimeouts;
-
+ this.childStatusChannel = FileChannel.open(childStatusFile,
+ StandardOpenOption.DSYNC, StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE, StandardOpenOption.READ,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ this.toParent= childStatusChannel.map(FileChannel.MapMode.READ_WRITE,
+ 0, 16);//8 for timestamp long, 4 for status int, 4 for numactivetasks int
+ serverStatus.setStatus(ServerStatus.STATUS.OPERATING);
+ writeStatus();
this.fromParent = new DataInputStream(inputStream);
- this.toParent = new DataOutputStream(outputStream);
Thread statusWatcher = new Thread(new StatusWatcher());
statusWatcher.setDaemon(true);
statusWatcher.start();
+
}
@Override
public void run() {
- //let parent know child is alive
- try {
- toParent.writeByte(ServerStatus.STATUS.OPERATING.getByte());
- toParent.flush();
- } catch (Exception e) {
- LOG.warn("Exception writing startup ping to parent", e);
- serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION);
- shutdown(ServerStatus.STATUS.PARENT_EXCEPTION);
- }
byte directive = (byte)-1;
while (true) {
@@ -83,8 +85,7 @@ public class ServerStatusWatcher implements Runnable {
checkForTaskTimeouts();
}
try {
- toParent.writeByte(serverStatus.getStatus().getByte());
- toParent.flush();
+ writeStatus();
} catch (Exception e) {
LOG.warn("Exception writing to parent", e);
serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION);
@@ -94,9 +95,9 @@ public class ServerStatusWatcher implements Runnable {
LOG.info("Parent requested shutdown");
serverStatus.setStatus(ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN);
shutdown(ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN);
- } else if (directive == ServerStatus.DIRECTIVES.PING_ACTIVE_SERVER_TASKS.getByte()) { try {
- toParent.writeInt(serverStatus.getTasks().size());
- toParent.flush();
+ } else if (directive == ServerStatus.DIRECTIVES.PING_ACTIVE_SERVER_TASKS.getByte()) {
+ try {
+ writeStatus();
} catch (Exception e) {
LOG.warn("Exception writing to parent", e);
serverStatus.setStatus(ServerStatus.STATUS.PARENT_EXCEPTION);
@@ -106,6 +107,12 @@ public class ServerStatusWatcher implements Runnable {
}
}
+ private void writeStatus() throws IllegalArgumentException {
+ toParent.putLong(0, Instant.now().toEpochMilli());
+ toParent.putInt(8, serverStatus.getStatus().getInt());
+ toParent.putInt(12, serverStatus.getTasks().size());
+ }
+
private void checkForHitMaxFiles() {
if (maxFiles < 0) {
return;
@@ -134,7 +141,17 @@ public class ServerStatusWatcher implements Runnable {
}
private void shutdown(ServerStatus.STATUS status) {
- LOG.info("Shutting down child process with status: " +status.name());
+
+ toParent.putLong(0, Instant.now().toEpochMilli());
+ toParent.putInt(8, serverStatus.getStatus().getInt());
+ toParent.putInt(12, 0);
+ toParent.force();
+ try {
+ childStatusChannel.close();
+ } catch (IOException e) {
+ LOG.warn("problem closing status channel", e);
+ }
+ LOG.info("Shutting down child process with status: {}", status.name());
System.exit(status.getShutdownCode());
}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java b/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java
index 34df411..17959bf 100644
--- a/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerTimeouts.java
@@ -52,6 +52,11 @@ public class ServerTimeouts {
*/
public static final long DEFAULT_TASK_TIMEOUT_MILLIS = 120000;
+ /**
+ * Number of milliseconds to wait for child process to startup
+ */
+ public static final long DEFAULT_CHILD_STARTUP_MILLIS = 120000;
+
private int maxRestarts = -1;
private long taskTimeoutMillis = DEFAULT_TASK_TIMEOUT_MILLIS;
@@ -60,6 +65,8 @@ public class ServerTimeouts {
private long pingPulseMillis = DEFAULT_PING_PULSE_MILLIS;
+ private long maxChildStartupMillis = DEFAULT_CHILD_STARTUP_MILLIS;
+
/**
* How long to wait for a task before shutting down the child server process
@@ -113,4 +120,17 @@ public class ServerTimeouts {
public void setMaxRestarts(int maxRestarts) {
this.maxRestarts = maxRestarts;
}
+
+ /**
+ * Maximum time in millis to allow for the child process to startup
+ * or restart
+ * @return
+ */
+ public long getMaxChildStartupMillis() {
+ return maxChildStartupMillis;
+ }
+
+ public void setMaxChildStartupMillis(long maxChildStartupMillis) {
+ this.maxChildStartupMillis = maxChildStartupMillis;
+ }
}
diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
index 970acd2..63ddc24 100644
--- a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
+++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
@@ -19,7 +19,7 @@ package org.apache.tika.server;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
-import java.io.PrintStream;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -86,7 +86,8 @@ public class TikaServerCli {
private static final List<String> ONLY_IN_SPAWN_CHILD_MODE =
Arrays.asList(new String[] { "taskTimeoutMillis", "taskPulseMillis",
- "pingTimeoutMillis", "pingPulseMillis", "maxFiles", "javaHome", "maxRestarts"});
+ "pingTimeoutMillis", "pingPulseMillis", "maxFiles", "javaHome", "maxRestarts",
+ "childStatusFile", "maxChildStartupMillis", "tmpFilePrefix"});
private static Options getOptions() {
Options options = new Options();
@@ -106,11 +107,16 @@ public class TikaServerCli {
options.addOption("taskPulseMillis", true, "Only in spawn child mode: how often to check if a task has timed out.");
options.addOption("pingTimeoutMillis", true, "Only in spawn child mode: how long to wait to wait for a ping and/or ping response.");
options.addOption("pingPulseMillis", true, "Only in spawn child mode: how often to check if a ping has timed out.");
+ options.addOption("maxChildStartupMillis", true, "Only in spawn child mode: Maximum number of millis to wait for the child process to startup.");
options.addOption("maxRestarts", true, "Only in spawn child mode: how many times to restart child process, default is -1 (always restart)");
options.addOption("maxFiles", true, "Only in spawn child mode: shutdown server after this many files -- use only in 'spawnChild' mode");
- options.addOption("javaHome", true, "Override system property JAVA_HOME for calling java for the child process");
- options.addOption("child", false, "this process is a child process -- EXPERT -- " +
- "should normally only be invoked by parent process");
+ options.addOption("javaHome", true, "Only in spawn child mode: override system property JAVA_HOME for calling java for the child process");
+ options.addOption("child", false, "Only in spawn child mode: this process is a child process -- do not use this! " +
+ "Should only be invoked by parent process");
+ options.addOption("childStatusFile", true, "Only in spawn child mode: temporary file used as mmap to communicate " +
+ "with parent process -- do not use this! Should only be invoked by parent process.");
+ options.addOption("tmpFilePrefix", true, "Only in spawn child mode: prefix for temp file - for debugging only");
+
return options;
}
@@ -261,7 +267,6 @@ public class TikaServerCli {
if (line.hasOption("child")) {
serverStatus = new ServerStatus();
//redirect!!!
- PrintStream out = System.out;
InputStream in = System.in;
System.setIn(new ByteArrayInputStream(new byte[0]));
System.setOut(System.err);
@@ -272,10 +277,10 @@ public class TikaServerCli {
}
ServerTimeouts serverTimeouts = configureServerTimeouts(line);
-
+ String childStatusFile = line.getOptionValue("childStatusFile");
Thread serverThread =
new Thread(new ServerStatusWatcher(serverStatus, in,
- out, maxFiles, serverTimeouts));
+ Paths.get(childStatusFile), maxFiles, serverTimeouts));
serverThread.start();
} else {
@@ -362,6 +367,11 @@ public class TikaServerCli {
serverTimeouts.setMaxRestarts(Integer.parseInt(line.getOptionValue("maxRestarts")));
}
+ if (line.hasOption("maxChildStartupMillis")) {
+ serverTimeouts.setMaxChildStartupMillis(
+ Long.parseLong(line.getOptionValue("maxChildStartupMillis")));
+ }
+
return serverTimeouts;
}
diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java
index 5bf9bf3..4c4028e 100644
--- a/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java
+++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerWatchDog.java
@@ -17,23 +17,23 @@
package org.apache.tika.server;
+import org.apache.tika.io.MappedBufferCleaner;
import org.apache.tika.utils.ProcessUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
public class TikaServerWatchDog {
@@ -45,6 +45,7 @@ public class TikaServerWatchDog {
}
private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class);
+ private static final String DEFAULT_CHILD_STATUS_FILE_PREFIX = "tika-server-child-process-mmap-";
private Object[] childStatusLock = new Object[0];
private volatile CHILD_STATUS childStatus = CHILD_STATUS.INITIALIZING;
@@ -54,10 +55,49 @@ public class TikaServerWatchDog {
public void execute(String[] args, ServerTimeouts serverTimeouts) throws Exception {
LOG.info("server watch dog is starting up");
+ startPingTimer(serverTimeouts);
+
+ try {
+ childProcess = new ChildProcess(args, serverTimeouts);
+ setChildStatus(CHILD_STATUS.RUNNING);
+ int restarts = 0;
+ while (true) {
+ if (!childProcess.ping()) {
+ LOG.debug("bad ping, initializing");
+ setChildStatus(CHILD_STATUS.INITIALIZING);
+ lastPing = null;
+ childProcess.close();
+ LOG.debug("About to restart the child process");
+ childProcess = new ChildProcess(args, serverTimeouts);
+ LOG.info("Successfully restarted child process -- {} restarts so far)", restarts);
+ setChildStatus(CHILD_STATUS.RUNNING);
+ restarts++;
+ if (serverTimeouts.getMaxRestarts() > -1 && restarts >= serverTimeouts.getMaxRestarts()) {
+ LOG.warn("hit max restarts: "+restarts+". Stopping now");
+ break;
+ }
+ }
+ Thread.sleep(serverTimeouts.getPingPulseMillis());
+ }
+ } catch (InterruptedException e) {
+ //interrupted...shutting down
+ } finally {
+ setChildStatus(CHILD_STATUS.SHUTTING_DOWN);
+ LOG.debug("about to shutdown");
+ if (childProcess != null) {
+ LOG.info("about to shutdown process");
+ childProcess.close();
+ }
+ }
+ }
+
+ private void startPingTimer(ServerTimeouts serverTimeouts) {
//if the child thread is in stop-the-world mode, and isn't
- //responding to the ping, this thread checks to make sure
- //that the parent ping is sent and received often enough
- //If it isn't, this force destroys the child process.
+ //reading the ping, this thread checks to make sure
+ //that the parent ping is sent often enough.
+ //The write() in ping() could block.
+ //If there isn't a successful ping often enough,
+ //this force destroys the child process.
Thread pingTimer = new Thread(new Runnable() {
@Override
public void run() {
@@ -74,10 +114,13 @@ public class TikaServerWatchDog {
Process processToDestroy = null;
try {
processToDestroy = childProcess.process;
+ LOG.warn("{} ms have elapsed since last successful ping. Destroying child now",
+ elapsed);
+ destroyChildForcibly(processToDestroy);
+ childProcess.close();
} catch (NullPointerException e) {
//ignore
}
- destroyChildForcibly(processToDestroy);
}
}
try {
@@ -91,36 +134,7 @@ public class TikaServerWatchDog {
);
pingTimer.setDaemon(true);
pingTimer.start();
- try {
- childProcess = new ChildProcess(args);
- setChildStatus(CHILD_STATUS.RUNNING);
- int restarts = 0;
- while (true) {
- if (!childProcess.ping()) {
- setChildStatus(CHILD_STATUS.INITIALIZING);
- lastPing = null;
- childProcess.close();
- LOG.info("About to restart the child process");
- childProcess = new ChildProcess(args);
- LOG.info("Successfully restarted child process -- {} restarts so far)", ++restarts);
- setChildStatus(CHILD_STATUS.RUNNING);
- restarts++;
- if (serverTimeouts.getMaxRestarts() > -1 && restarts >= serverTimeouts.getMaxRestarts()) {
- LOG.warn("hit max restarts: "+restarts+". Stopping now");
- break;
- }
- }
- Thread.sleep(serverTimeouts.getPingPulseMillis());
- }
- } catch (InterruptedException e) {
- //interrupted...shutting down
- } finally {
- setChildStatus(CHILD_STATUS.SHUTTING_DOWN);
- if (childProcess != null) {
- childProcess.close();
- }
- }
}
private void setChildStatus(CHILD_STATUS status) {
@@ -191,55 +205,70 @@ public class TikaServerWatchDog {
private class ChildProcess {
private Thread SHUTDOWN_HOOK = null;
- Process process;
- DataInputStream fromChild;
- DataOutputStream toChild;
-
+ private final Process process;
+ private final FileChannel fromChildChannel;
+ private final MappedByteBuffer fromChild;
+ private final DataOutputStream toChild;
+ private final ServerTimeouts serverTimeouts;
+ private final Path childStatusFile;
+ private ChildProcess(String[] args, ServerTimeouts serverTimeouts) throws Exception {
+ String prefix = DEFAULT_CHILD_STATUS_FILE_PREFIX;
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-tmpFilePrefix")) {
+ prefix = args[i+1];
+ }
+ }
+ this.childStatusFile = Files.createTempFile(prefix, "");
+ this.serverTimeouts = serverTimeouts;
+ this.process = startProcess(args, childStatusFile);
+
+ //wait for file to be written/initialized by child process
+ Instant start = Instant.now();
+ long elapsed = Duration.between(start, Instant.now()).toMillis();
+ while (Files.size(childStatusFile) < 12
+ && elapsed < serverTimeouts.getMaxChildStartupMillis()) {
+ if (!process.isAlive()) {
+ close();
+ throw new RuntimeException("Failed to start child process");
+ }
+ Thread.sleep(50);
+ elapsed = Duration.between(start, Instant.now()).toMillis();
+ }
- private ChildProcess(String[] args) throws Exception {
- this.process = startProcess(args);
+ if (elapsed > serverTimeouts.getMaxChildStartupMillis()) {
+ close();
+ throw new RuntimeException("Child process failed to start after "+elapsed + " (ms)");
+ }
+ this.fromChildChannel = FileChannel.open(childStatusFile,
+ StandardOpenOption.READ,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ this.fromChild = fromChildChannel.map(
+ FileChannel.MapMode.READ_ONLY, 0, 12);
- this.fromChild = new DataInputStream(process.getInputStream());
this.toChild = new DataOutputStream(process.getOutputStream());
- //if logger's debug=true, there can be a bunch of stuff that
- //was written to the process's inputstream _before_
- //we did the redirect.
- //These bytes need to be read from fromChild before the child has actually
- //started...allow 64,000 bytes...completely arbitrary.
- //this is admittedly hacky...If the logger writes 0, we'd
- //interpret that as "OPERATING"...need to figure out
- //better way to siphon statically written bytes before
- //we do the redirect of streams.
- int maxStartBytes = 64000;
- int status = fromChild.readByte();
- int read = 0;
- while (status > -1 && read < maxStartBytes && status != ServerStatus.STATUS.OPERATING.getByte()) {
- status = fromChild.readByte();
- read++;
- }
- if (status != ServerStatus.STATUS.OPERATING.getByte()) {
- try {
- ServerStatus.STATUS currStatus = ServerStatus.STATUS.lookup(status);
- throw new IOException("bad status from child process: "+
- currStatus);
- } catch (ArrayIndexOutOfBoundsException e) {
- //swallow
+ elapsed = Duration.between(start, Instant.now()).toMillis();
+ //wait for child process to write something to the file
+ while (elapsed < serverTimeouts.getMaxChildStartupMillis()) {
+ int status = fromChild.getInt(8);
+ if (status == ServerStatus.STATUS.OPERATING.getInt()) {
+ break;
}
- int len = process.getInputStream().available();
- byte[] msg = new byte[len+1];
- msg[0] = (byte)status;
- process.getInputStream().read(msg, 1, len);
-
- throw new IOException(
- "Unrecognized status code; message:\n"+new String(msg, StandardCharsets.UTF_8));
-
+ Thread.sleep(50);
+ elapsed = Duration.between(start, Instant.now()).toMillis();
+ }
+ if (elapsed > serverTimeouts.getMaxChildStartupMillis()) {
+ close();
+ throw new RuntimeException("Child process failed to start after "+elapsed + " (ms)");
}
lastPing = Instant.now();
}
public boolean ping() {
- lastPing = Instant.now();
+ if (!process.isAlive()) {
+ LOG.debug("process is not alive");
+ return false;
+ }
try {
toChild.writeByte(ServerStatus.DIRECTIVES.PING.getByte());
toChild.flush();
@@ -247,50 +276,95 @@ public class TikaServerWatchDog {
LOG.warn("Exception pinging child process", e);
return false;
}
+ long lastUpdate = -1;
+ int status = -1;
try {
- byte status = fromChild.readByte();
- if (status != ServerStatus.STATUS.OPERATING.getByte()) {
- LOG.warn("Received status from child: {}",
- ServerStatus.STATUS.lookup(status));
- return false;
- }
- } catch (Exception e) {
+ lastUpdate = fromChild.getLong(0);
+ status = fromChild.getInt(8);
+ } catch (IndexOutOfBoundsException e) {
+ //something went wrong with the tmp file
LOG.warn("Exception receiving status from child", e);
return false;
}
+
+ if (status != ServerStatus.STATUS.OPERATING.getInt()) {
+ LOG.warn("Received non-operating status from child: {}",
+ ServerStatus.STATUS.lookup(status));
+ return false;
+ }
+
+ long elapsedSinceLastUpdate =
+ Duration.between(Instant.ofEpochMilli(lastUpdate), Instant.now()).toMillis();
+ LOG.trace("last update: {}, elapsed:{}, status:{}", lastUpdate, elapsedSinceLastUpdate, status);
+
+ if (elapsedSinceLastUpdate >
+ serverTimeouts.getPingTimeoutMillis()) {
+ //child hasn't written a status update in a longer time than allowed
+ LOG.warn("Child's last update exceeded ping timeout: {} (ms) with status {}",
+ elapsedSinceLastUpdate, status);
+ return false;
+ }
+
+ lastPing = Instant.now();
return true;
}
private void close() {
+
try {
- toChild.writeByte(ServerStatus.DIRECTIVES.SHUTDOWN.getByte());
- toChild.flush();
- } catch (Exception e) {
- LOG.warn("Exception asking child to shutdown", e);
- }
- //TODO: add a gracefully timed shutdown routine
- try {
- fromChild.close();
- } catch (Exception e) {
- LOG.warn("Problem shutting down reader from child", e);
+ if (toChild != null) {
+ toChild.writeByte(ServerStatus.DIRECTIVES.SHUTDOWN.getByte());
+ toChild.flush();
+ }
+ } catch (IOException e) {
+ LOG.debug("Exception asking child to shutdown", e);
}
try {
- toChild.close();
- } catch (Exception e) {
- LOG.warn("Problem shutting down writer to child", e);
+ if (toChild != null) {
+ toChild.close();
+ }
+ } catch (IOException e) {
+ LOG.debug("Problem shutting down writer to child", e);
}
destroyChildForcibly(process);
+ try {
+ MappedBufferCleaner.freeBuffer(fromChild);
+ } catch (IOException e) {
+ LOG.warn("problem freeing buffer");
+ }
+ try {
+ if (fromChildChannel != null) {
+ fromChildChannel.close();
+ }
+ } catch (IOException e) {
+ LOG.debug("Problem closing child channel", e);
+ }
+ if (childStatusFile != null) {
+ try {
+ if (Files.isRegularFile(childStatusFile)) {
+ Files.delete(childStatusFile);
+ }
+ } catch (IOException e) {
+ LOG.warn("problem deleting child status file", e);
+ }
+ }
+
}
- private Process startProcess(String[] args) throws IOException {
+ private Process startProcess(String[] args, Path childStatusFile) throws IOException {
+
ProcessBuilder builder = new ProcessBuilder();
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
+ builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
List<String> argList = new ArrayList<>();
String javaPath = extractJavaPath(args);
List<String> jvmArgs = extractJVMArgs(args);
List<String> childArgs = extractArgs(args);
+ childArgs.add("-childStatusFile");
+ childArgs.add(ProcessUtils.escapeCommandLine(childStatusFile.toAbsolutePath().toString()));
+
argList.add(javaPath);
if (! jvmArgs.contains("-cp") && ! jvmArgs.contains("--classpath")) {
String cp = System.getProperty("java.class.path");
@@ -304,10 +378,11 @@ public class TikaServerWatchDog {
LOG.debug("child process commandline: " +argList.toString());
builder.command(argList);
Process process = builder.start();
+
if (SHUTDOWN_HOOK != null) {
Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
}
- SHUTDOWN_HOOK = new Thread(() -> process.destroyForcibly());
+ SHUTDOWN_HOOK = new Thread(() -> this.close());
Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
return process;
@@ -323,7 +398,6 @@ public class TikaServerWatchDog {
"Shutting down the parent.");
System.exit(1);
}
-
} catch (InterruptedException e) {
//swallow
}
diff --git a/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java
index b23d178..2f8cd84 100644
--- a/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java
+++ b/tika-server/src/test/java/org/apache/tika/server/TikaServerIntegrationTest.java
@@ -39,6 +39,7 @@ import java.security.Permission;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -83,7 +84,7 @@ public class TikaServerIntegrationTest extends TikaTest {
@Before
public void setUp() throws Exception {
- SecurityManager existingSecurityManager = System.getSecurityManager();
+ existingSecurityManager = System.getSecurityManager();
System.setSecurityManager(new SecurityManager() {
@Override
public void checkExit(int status) {
@@ -121,7 +122,8 @@ public class TikaServerIntegrationTest extends TikaTest {
new String[]{
"-maxFiles", "2000",
"-spawnChild",
- "-p", INTEGRATION_TEST_PORT
+ "-p", INTEGRATION_TEST_PORT,
+ "-tmpFilePrefix", "basic-"
});
}
};
@@ -131,8 +133,6 @@ public class TikaServerIntegrationTest extends TikaTest {
} finally {
serverThread.interrupt();
}
-
-
}
@Test
@@ -143,7 +143,7 @@ public class TikaServerIntegrationTest extends TikaTest {
public void run() {
TikaServerCli.main(
new String[]{
- "-p", INTEGRATION_TEST_PORT,
+ "-p", INTEGRATION_TEST_PORT
});
}
};
@@ -181,7 +181,9 @@ public class TikaServerIntegrationTest extends TikaTest {
new String[]{
"-spawnChild", "-JXmx256m",
"-p", INTEGRATION_TEST_PORT,
- "-pingPulseMillis", "100"
+ "-pingPulseMillis", "100",
+ "-tmpFilePrefix", "tika-server-oom"
+
});
}
};
@@ -217,7 +219,9 @@ public class TikaServerIntegrationTest extends TikaTest {
TikaServerCli.main(
new String[]{
"-spawnChild",
- "-p", INTEGRATION_TEST_PORT
+ "-p", INTEGRATION_TEST_PORT,
+ "-tmpFilePrefix", "tika-server-systemexit"
+
});
}
};
@@ -252,7 +256,9 @@ public class TikaServerIntegrationTest extends TikaTest {
new String[]{
"-spawnChild", "-p", INTEGRATION_TEST_PORT,
"-taskTimeoutMillis", "10000", "-taskPulseMillis", "500",
- "-pingPulseMillis", "500"
+ "-pingPulseMillis", "500",
+ "-tmpFilePrefix", "tika-server-timeoutok"
+
});
}
};
@@ -285,7 +291,9 @@ public class TikaServerIntegrationTest extends TikaTest {
new String[]{
"-spawnChild", "-p", INTEGRATION_TEST_PORT,
"-taskTimeoutMillis", "10000", "-taskPulseMillis", "500",
- "-pingPulseMillis", "500"
+ "-pingPulseMillis", "500",
+ "-tmpFilePrefix", "tika-server-timeout"
+
});
}
};
@@ -317,7 +325,9 @@ public class TikaServerIntegrationTest extends TikaTest {
TikaServerCli.main(
new String[]{
"-spawnChild", "-JXms20m", "-JXmx10m",
- "-p", INTEGRATION_TEST_PORT
+ "-p", INTEGRATION_TEST_PORT,
+ "-tmpFilePrefix", "tika-server-badargs"
+
});
}
};
@@ -344,7 +354,9 @@ public class TikaServerIntegrationTest extends TikaTest {
"-spawnChild",
"-p", INTEGRATION_TEST_PORT,
"-taskTimeoutMillis", "10000", "-taskPulseMillis", "500",
- "-pingPulseMillis", "100"
+ "-pingPulseMillis", "100",
+ "-tmpFilePrefix", "tika-server-stderr"
+
});
}
};
@@ -368,7 +380,7 @@ public class TikaServerIntegrationTest extends TikaTest {
}
@Test
- @Ignore("TIKA-2784")
+ @Ignore("This works, but prints too much junk to the console. Figure out how to gobble/redirect.")
public void testStaticStdErrOutBasic() throws Exception {
final AtomicInteger i = new AtomicInteger();
Thread serverThread = new Thread() {
@@ -415,7 +427,8 @@ public class TikaServerIntegrationTest extends TikaTest {
"-p", INTEGRATION_TEST_PORT,
"-taskTimeoutMillis", "10000", "-taskPulseMillis", "500",
"-pingPulseMillis", "100", "-maxRestarts", "0",
- "-JDlog4j.configuration=file:"+ LOG_FILE.toAbsolutePath()
+ "-JDlog4j.configuration=file:"+ LOG_FILE.toAbsolutePath(),
+ "-tmpFilePrefix", "tika-server-stderrlogging"
});
}
};
@@ -464,7 +477,7 @@ public class TikaServerIntegrationTest extends TikaTest {
@Ignore("turn this into a real test")
public void testMaxFiles() throws Exception {
//this isn't a real regression test yet.
- //Can watch logs for confirmation.
+ //Can watch logs at least for confirmation of behavior
//TODO: convert to real test
Thread serverThread = new Thread() {
@Override
@@ -473,22 +486,33 @@ public class TikaServerIntegrationTest extends TikaTest {
new String[]{
"-maxFiles", "10",
"-spawnChild",
+ "-taskTimeoutMillis", "10000", "-taskPulseMillis", "500",
"-p", INTEGRATION_TEST_PORT
});
}
};
serverThread.start();
awaitServerStartup();
+ Random r = new Random();
for (int i = 0; i < 100; i++) {
System.out.println("FILE # "+i);
boolean ex = false;
Response response = null;
+ String file = TEST_RECURSIVE_DOC;
try {
+ if (r.nextFloat() < 0.01) {
+ file = TEST_SYSTEM_EXIT;
+ } else if (r.nextFloat() < 0.015) {
+ file = TEST_OOM;
+ } else if (r.nextFloat() < 0.02) {
+ file = TEST_HEAVY_HANG;
+ }
+ System.out.println("about to process: "+file);
response = WebClient
.create(endPoint + META_PATH)
.accept("application/json")
.put(ClassLoader
- .getSystemResourceAsStream(TEST_RECURSIVE_DOC));
+ .getSystemResourceAsStream(file));
} catch (Exception e) {
ex = true;
}
@@ -500,12 +524,13 @@ public class TikaServerIntegrationTest extends TikaTest {
System.out.println("done awaiting");
continue;
}
- Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
- List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
- assertEquals(12, metadataList.size());
- assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION));
- assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content"));
-
+ if (file.equals(TEST_RECURSIVE_DOC)) {
+ Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+ List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
+ assertEquals(12, metadataList.size());
+ assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION));
+ assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content"));
+ }
//assertEquals("a38e6c7b38541af87148dee9634cb811", metadataList.get(10).get("X-TIKA:digest:MD5"));
}
serverThread.interrupt();
diff --git a/tika-server/src/test/resources/logging/log4j_child.xml b/tika-server/src/test/resources/logging/log4j_child.xml
index 7391faf..ef09a2f 100644
--- a/tika-server/src/test/resources/logging/log4j_child.xml
+++ b/tika-server/src/test/resources/logging/log4j_child.xml
@@ -17,12 +17,12 @@
<param name="ConversionPattern" value="%m%n"/>
</layout>
</appender>
- <logger name="org.apache.tika" additivity="true">
- <level value="debug"/>
+ <logger name="org.apache" additivity="true">
+ <level value="info"/>
<appender-ref ref="stdout"/>
</logger>
- <logger name="org.apache.tika" additivity="true">
- <level value="debug"/>
+ <logger name="org.apache.cxf" additivity="true">
+ <level value="info"/>
<appender-ref ref="stderr"/>
</logger>
diff --git a/tika-server/src/test/resources/mock/heavy_hand_100.xml b/tika-server/src/test/resources/mock/heavy_hang_100.xml
similarity index 93%
rename from tika-server/src/test/resources/mock/heavy_hand_100.xml
rename to tika-server/src/test/resources/mock/heavy_hang_100.xml
index f1f5b67..b1413bc 100644
--- a/tika-server/src/test/resources/mock/heavy_hand_100.xml
+++ b/tika-server/src/test/resources/mock/heavy_hang_100.xml
@@ -21,5 +21,5 @@
<mock>
<metadata action="add" name="author">Nikolai Lobachevsky</metadata>
<write element="p">some content</write>
- <hang millis="30000" heavy="true" pulse_millis="100" />
+ <hang millis="100" heavy="true" pulse_millis="50" />
</mock>
\ No newline at end of file