You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/09/18 18:13:34 UTC
[tika] 01/11: TIKA-2725 -- checkpoint commit ... basic child
process is started...need to integrate actual statuswatcher, etc.
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tika.git
commit bf1b2418d47e9fa47e0634825341921d7edd1ee8
Author: TALLISON <ta...@apache.org>
AuthorDate: Fri Sep 7 19:16:32 2018 -0400
TIKA-2725 -- checkpoint commit ... basic child process is started...need to integrate actual statuswatcher, etc.
# Conflicts:
# tika-parsers/src/test/resources/test-documents/testPST.pst
# tika-parsers/src/test/resources/test-documents/testPST_variousBodyTypes.pst
---
.../tika/server/FileCountExceededException.java | 9 ++
.../java/org/apache/tika/server/ServerStatus.java | 98 +++++++++++++++
.../apache/tika/server/ServerStatusWatcher.java | 76 ++++++++++++
.../java/org/apache/tika/server/TaskStatus.java | 41 +++++++
.../java/org/apache/tika/server/TikaServerCli.java | 132 ++++++++++++++++++++-
.../apache/tika/server/ServerIntegrationTest.java | 73 ++++++++++++
.../org/apache/tika/server/ServerStatusTest.java | 100 ++++++++++++++++
7 files changed, 524 insertions(+), 5 deletions(-)
diff --git a/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java
new file mode 100644
index 0000000..9920556
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java
@@ -0,0 +1,9 @@
+package org.apache.tika.server;
+
+/**
+ * Exception thrown by ServerStatusWatcher if tika-server exceeds
+ * the maximum number of files to process.
+ */
+public class FileCountExceededException extends Exception {
+
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
new file mode 100644
index 0000000..861007d
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServerStatus {
+
+ enum STATUS {
+ OPEN(0),
+ HIT_MAX(1),
+ TIMEOUT(2),
+ ERROR(3),
+ PARENT_REQUESTED_SHUTDOWN(4);
+
+ private final int shutdownCode;
+ STATUS(int shutdownCode) {
+ this.shutdownCode = shutdownCode;
+ }
+ int getShutdownCode() {
+ return shutdownCode;
+ }
+ }
+ enum TASK {
+ PARSE,
+ UNZIP,
+ DETECT,
+ METADATA
+ };
+
+ private final int maxFilesToProcess;
+ private AtomicInteger counter = new AtomicInteger(0);
+ private Map<Integer, TaskStatus> tasks = new HashMap<>();
+
+ private STATUS status = STATUS.OPEN;
+ public ServerStatus(int maxFilesToProcess) {
+ this.maxFilesToProcess = maxFilesToProcess;
+ }
+ public synchronized int start(TASK task, String fileName) throws FileCountExceededException {
+ int i = counter.incrementAndGet();
+ if (i == Integer.MAX_VALUE ||
+ (maxFilesToProcess > 0 && i >= maxFilesToProcess)) {
+ setStatus(STATUS.HIT_MAX);
+ throw new FileCountExceededException();
+ }
+ tasks.put(i, new TaskStatus(task, Instant.now(), fileName));
+ return i;
+ }
+
+ /**
+ * Removes the task from the collection of currently running tasks.
+ *
+ * @param taskId
+ * @throws IllegalArgumentException if there is no task by that taskId in the collection
+ */
+ public synchronized void complete(int taskId) throws IllegalArgumentException {
+ TaskStatus status = tasks.remove(taskId);
+ if (status == null) {
+ throw new IllegalArgumentException("TaskId is not in map:"+taskId);
+ }
+ }
+
+ public synchronized void setStatus(STATUS status) {
+ this.status = status;
+ }
+
+ public synchronized STATUS getStatus() {
+ return status;
+ }
+
+ public synchronized Map<Integer, TaskStatus> getTasks() {
+ Map<Integer, TaskStatus> ret = new HashMap<>();
+ ret.putAll(tasks);
+ return ret;
+ }
+
+ public synchronized int getFilesProcessed() {
+ return counter.get();
+ }
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
new file mode 100644
index 0000000..24b1ddb
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.server;
+
+import org.apache.tika.server.resource.TranslateResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Callable;
+
+public class ServerStatusWatcher implements Runnable {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServerStatusWatcher.class);
+
+ private final ServerStatus serverStatus;
+ private final long timeoutMillis;
+ private final long pulseMillis;
+
+ public ServerStatusWatcher(ServerStatus serverStatus, long timeoutMillis, long pulseMillis) {
+ this.serverStatus = serverStatus;
+ this.timeoutMillis = timeoutMillis;
+ this.pulseMillis = pulseMillis;
+ }
+
+ @Override
+ public void run() {
+ ServerStatus.STATUS status = serverStatus.getStatus();
+ while (status.equals(ServerStatus.STATUS.OPEN)) {
+ try {
+ Thread.sleep(pulseMillis);
+ } catch (InterruptedException e) {
+ }
+ checkForTimeouts();
+ status = serverStatus.getStatus();
+ }
+ if (! status.equals(ServerStatus.STATUS.OPEN)) {
+ LOG.warn("child process shutting down with status: {}", status);
+ System.exit(status.getShutdownCode());
+ }
+ }
+
+ private void checkForTimeouts() {
+ Instant now = Instant.now();
+ for (TaskStatus status : serverStatus.getTasks().values()) {
+ long millisElapsed = Duration.between(now, status.started).toMillis();
+ if (millisElapsed > timeoutMillis) {
+ serverStatus.setStatus(ServerStatus.STATUS.TIMEOUT);
+ if (status.fileName.isPresent()) {
+ LOG.error("Timeout task {}, millis elapsed {}, file {}",
+ status.task.toString(), Long.toString(millisElapsed), status.fileName.get());
+ } else {
+ LOG.error("Timeout task {}, millis elapsed {}",
+ status.task.toString(), Long.toString(millisElapsed));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java b/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java
new file mode 100644
index 0000000..1637d7d
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TaskStatus {
+ final ServerStatus.TASK task;
+ final Instant started;
+ final Optional<String> fileName;
+
+ TaskStatus(ServerStatus.TASK task, Instant started, String fileName) {
+ this.task = task;
+ this.started = started;
+ this.fileName = Optional.ofNullable(fileName);
+ }
+
+
+ @Override
+ public String toString() {
+ return "";
+ }
+
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
index 03d582e..af8fd8f 100644
--- a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
+++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
@@ -17,6 +17,7 @@
package org.apache.tika.server;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -62,6 +63,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TikaServerCli {
+
+
+ //used in spawn-child mode
+ private static final long PULSE_MILLIS = 100;
+ private static final int DEFAULT_MAX_FILES = -1;
+ private static final long DEFAULT_TIME_OUT_MS = 60000;
+ private static final long DEFAULT_PULSE_MS = 500;
+ private static Thread SHUTDOWN_HOOK = null;
+
+
public static final int DEFAULT_PORT = 9998;
private static final int DEFAULT_DIGEST_MARK_LIMIT = 20*1024*1024;
public static final String DEFAULT_HOST = "localhost";
@@ -88,14 +99,114 @@ public class TikaServerCli {
options.addOption("?", "help", false, "this help message");
options.addOption("enableUnsecureFeatures", false, "this is required to enable fileUrl.");
options.addOption("enableFileUrl", false, "allows user to pass in fileUrl instead of InputStream.");
-
+ options.addOption("spawnChild", false, "whether or not to spawn a child process for robustness");
+ options.addOption("maxFiles", false, "shutdown server after this many files -- use only in 'spawnChild' mode");
return options;
}
public static void main(String[] args) {
LOG.info("Starting {} server", new Tika());
+ try {
+ execute(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.error("Can't start", e);
+ System.exit(-1);
+ }
+ }
+
+ private static void execute(String[] args) throws Exception {
+ boolean spawnChild = false;
+ for (int i = 0; i < args.length; i++) {
+ if ("-spawnChild".equals(args[i]) || "--spawnChild".equals(args[i])) {
+ spawnChild = true;
+ break;
+ }
+ }
+ if (spawnChild) {
+ spawnChild(args);
+ } else {
+ executeLegacy(args);
+ }
+ }
+ private static void spawnChild(String[] args) throws Exception {
+ Process child = start(args);
try {
+ while (true) {
+ Thread.sleep(PULSE_MILLIS);
+
+ int exitValue = Integer.MAX_VALUE;
+ try {
+ exitValue = child.exitValue();
+ } catch (IllegalThreadStateException e) {
+ //process is still running
+ }
+ if (exitValue != Integer.MAX_VALUE) {
+ if (exitValue != ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN.getShutdownCode()) {
+ LOG.warn("child exited with code ({}) -- restarting, now", Integer.toString(exitValue));
+ child.destroyForcibly();
+ child = start(args);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ //interrupted...shutting down
+ } finally {
+ child.destroyForcibly();
+ }
+ }
+
+ private static Process start(String[] args) throws IOException {
+ ProcessBuilder builder = new ProcessBuilder();
+ builder.inheritIO();
+ List<String> argList = new ArrayList<>();
+ List<String> jvmArgs = extractJVMArgs(args);
+ List<String> childArgs = extractArgs(args);
+ argList.add("java");
+ if (! jvmArgs.contains("-cp") && ! jvmArgs.contains("--classpath")) {
+ String cp = System.getProperty("java.class.path");
+ jvmArgs.add("-cp");
+ jvmArgs.add(cp);
+ }
+ argList.addAll(jvmArgs);
+ argList.add("org.apache.tika.server.TikaServerCli");
+ argList.addAll(childArgs);
+
+ builder.command(argList);
+
+ Process process = builder.start();
+
+ if (SHUTDOWN_HOOK != null) {
+ Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
+ }
+ SHUTDOWN_HOOK = new Thread(() -> process.destroy());
+ Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
+ return process;
+ }
+
+ private static List<String> extractArgs(String[] args) {
+ List<String> argList = new ArrayList<>();
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].startsWith("-J") || args[i].equals("-spawnChild") || args[i].equals("--spawnChild")) {
+ continue;
+ }
+ argList.add(args[i]);
+ }
+ return argList;
+ }
+
+ private static List<String> extractJVMArgs(String[] args) {
+ List<String> jvmArgs = new ArrayList<>();
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].startsWith("-J")) {
+ jvmArgs.add("-"+args[i].substring(2));
+ }
+ }
+ return jvmArgs;
+ }
+
+ private static void executeLegacy(String[] args) throws Exception {
Options options = getOptions();
CommandLineParser cliParser = new GnuParser();
@@ -196,6 +307,21 @@ public class TikaServerCli {
inputStreamFactory = new DefaultInputStreamFactory();
}
+ int maxFiles = DEFAULT_MAX_FILES;
+ if (line.hasOption("maxFiles")) {
+ maxFiles = Integer.parseInt(line.getOptionValue("maxFiles"));
+ }
+
+ long timeoutMS = DEFAULT_TIME_OUT_MS;
+ if (line.hasOption("timeoutMS")) {
+ timeoutMS = Long.parseLong(line.getOptionValue("timeoutMS"));
+ }
+ long pulseMS = DEFAULT_PULSE_MS;
+ if (line.hasOption("pulseMS")) {
+ pulseMS = Long.parseLong(line.getOptionValue("pulseMS"));
+ }
+ ServerStatus serverStatus = new ServerStatus(maxFiles);
+ new Thread(new ServerStatusWatcher(serverStatus, timeoutMS, pulseMS)).start();
TikaResource.init(tika, digester, inputStreamFactory);
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
@@ -241,9 +367,5 @@ public class TikaServerCli {
manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory);
sf.create();
LOG.info("Started Apache Tika server at {}", url);
- } catch (Exception ex) {
- LOG.error("Can't start", ex);
- System.exit(-1);
- }
}
}
diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java
new file mode 100644
index 0000000..8568c6c
--- /dev/null
+++ b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.tika.TikaTest;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.OfficeOpenXMLExtended;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.junit.Test;
+
+import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+public class ServerIntegrationTest extends TikaTest {
+ private static final String TEST_RECURSIVE_DOC = "test_recursive_embedded.docx";
+ private static final String META_PATH = "/rmeta";
+ protected static final String endPoint =
+ "http://localhost:" + TikaServerCli.DEFAULT_PORT;
+
+ @Test
+ public void testBasic() throws Exception {
+
+ Thread serverThread = new Thread() {
+ @Override
+ public void run() {
+ TikaServerCli.main(
+ new String[]{
+ "-spawnChild", "-p", Integer.toString(TikaServerCli.DEFAULT_PORT)
+ });
+ }
+ };
+ serverThread.start();
+ //test for the server being available...rather than this sleep call
+ Thread.sleep(20000);
+ Response response = WebClient
+ .create(endPoint + META_PATH)
+ .accept("application/json")
+ .put(ClassLoader
+ .getSystemResourceAsStream(TEST_RECURSIVE_DOC));
+ Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+ List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
+ assertEquals(12, metadataList.size());
+ assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION));
+ assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content"));
+
+ //assertEquals("a38e6c7b38541af87148dee9634cb811", metadataList.get(10).get("X-TIKA:digest:MD5"));
+
+ serverThread.interrupt();
+
+
+ }
+}
diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java
new file mode 100644
index 0000000..23880ff
--- /dev/null
+++ b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ServerStatusTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadId() throws Exception {
+ ServerStatus status = new ServerStatus(-1);
+ status.complete(2);
+ }
+
+ @Test(timeout = 60000)
+ public void testBasicMultiThreading() throws Exception {
+ //make sure that synchronization is basically working
+ int numThreads = 100;
+ int filesToProcess = 100;
+ ExecutorService service = Executors.newFixedThreadPool(100);
+ ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
+ ServerStatus serverStatus = new ServerStatus(-1);
+ for (int i = 0; i < numThreads; i++) {
+ completionService.submit(new MockTask(serverStatus, filesToProcess));
+ }
+ int finished = 0;
+ int totalProcessed = 0;
+ while (finished < numThreads) {
+ Future<Integer> future = completionService.take();
+ if (future != null) {
+ finished++;
+ Integer completed = future.get();
+ totalProcessed += completed;
+ }
+ }
+ assertEquals(numThreads*filesToProcess, totalProcessed);
+ assertEquals(0, serverStatus.getTasks().size());
+ assertEquals(totalProcessed, serverStatus.getFilesProcessed());
+
+ }
+
+ private class MockTask implements Callable<Integer> {
+ Random r = new Random();
+ private final ServerStatus serverStatus;
+ private final int filesToProcess;
+ public MockTask(ServerStatus serverStatus, int filesToProcess) {
+ this.serverStatus = serverStatus;
+ this.filesToProcess = filesToProcess;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ int processed = 0;
+ for (int i = 0; i < filesToProcess; i++) {
+ sleepRandom(200);
+ int taskId = serverStatus.start(ServerStatus.TASK.PARSE, null);
+ sleepRandom(100);
+ serverStatus.complete(taskId);
+ processed++;
+ serverStatus.getStatus();
+ sleepRandom(10);
+ serverStatus.setStatus(ServerStatus.STATUS.OPEN);
+ sleepRandom(20);
+ Map<Integer, TaskStatus> tasks = serverStatus.getTasks();
+ assertNotNull(tasks);
+ }
+ return processed;
+ }
+
+ private void sleepRandom(int millis) throws InterruptedException {
+ int sleep = r.nextInt(millis);
+ Thread.sleep(sleep);
+ }
+ }
+}