You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/09/18 18:13:34 UTC

[tika] 01/11: TIKA-2725 -- checkpoint commit ... basic child process is started...need to integrate actual statuswatcher, etc.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tika.git

commit bf1b2418d47e9fa47e0634825341921d7edd1ee8
Author: TALLISON <ta...@apache.org>
AuthorDate: Fri Sep 7 19:16:32 2018 -0400

    TIKA-2725 -- checkpoint commit ... basic child process is started...need to integrate actual statuswatcher, etc.
    
    # Conflicts:
    #	tika-parsers/src/test/resources/test-documents/testPST.pst
    #	tika-parsers/src/test/resources/test-documents/testPST_variousBodyTypes.pst
---
 .../tika/server/FileCountExceededException.java    |   9 ++
 .../java/org/apache/tika/server/ServerStatus.java  |  98 +++++++++++++++
 .../apache/tika/server/ServerStatusWatcher.java    |  76 ++++++++++++
 .../java/org/apache/tika/server/TaskStatus.java    |  41 +++++++
 .../java/org/apache/tika/server/TikaServerCli.java | 132 ++++++++++++++++++++-
 .../apache/tika/server/ServerIntegrationTest.java  |  73 ++++++++++++
 .../org/apache/tika/server/ServerStatusTest.java   | 100 ++++++++++++++++
 7 files changed, 524 insertions(+), 5 deletions(-)

diff --git a/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java
new file mode 100644
index 0000000..9920556
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/FileCountExceededException.java
@@ -0,0 +1,9 @@
+package org.apache.tika.server;
+
+/**
+ * Exception thrown by ServerStatusWatcher if tika-server exceeds
+ * the maximum number of files to process.
+ */
+public class FileCountExceededException extends Exception {
+
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
new file mode 100644
index 0000000..861007d
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatus.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServerStatus {
+
+    enum STATUS {
+        OPEN(0),
+        HIT_MAX(1),
+        TIMEOUT(2),
+        ERROR(3),
+        PARENT_REQUESTED_SHUTDOWN(4);
+
+        private final int shutdownCode;
+        STATUS(int shutdownCode) {
+            this.shutdownCode = shutdownCode;
+        }
+        int getShutdownCode() {
+            return shutdownCode;
+        }
+    }
+    enum TASK {
+        PARSE,
+        UNZIP,
+        DETECT,
+        METADATA
+    };
+
+    private final int maxFilesToProcess;
+    private AtomicInteger counter = new AtomicInteger(0);
+    private Map<Integer, TaskStatus> tasks = new HashMap<>();
+
+    private STATUS status = STATUS.OPEN;
+    public ServerStatus(int maxFilesToProcess) {
+        this.maxFilesToProcess = maxFilesToProcess;
+    }
+    public synchronized int start(TASK task, String fileName) throws FileCountExceededException {
+        int i = counter.incrementAndGet();
+        if (i == Integer.MAX_VALUE ||
+                (maxFilesToProcess > 0 && i >= maxFilesToProcess)) {
+            setStatus(STATUS.HIT_MAX);
+            throw new FileCountExceededException();
+        }
+        tasks.put(i, new TaskStatus(task, Instant.now(), fileName));
+        return i;
+    }
+
+    /**
+     * Removes the task from the collection of currently running tasks.
+     *
+     * @param taskId
+     * @throws IllegalArgumentException if there is no task by that taskId in the collection
+     */
+    public synchronized void complete(int taskId) throws IllegalArgumentException {
+        TaskStatus status = tasks.remove(taskId);
+        if (status == null) {
+            throw new IllegalArgumentException("TaskId is not in map:"+taskId);
+        }
+    }
+
+    public synchronized void setStatus(STATUS status) {
+        this.status = status;
+    }
+
+    public synchronized STATUS getStatus() {
+        return status;
+    }
+
+    public synchronized Map<Integer, TaskStatus> getTasks() {
+        Map<Integer, TaskStatus> ret = new HashMap<>();
+        ret.putAll(tasks);
+        return ret;
+    }
+
+    public synchronized int getFilesProcessed() {
+        return counter.get();
+    }
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
new file mode 100644
index 0000000..24b1ddb
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/ServerStatusWatcher.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.server;
+
+import org.apache.tika.server.resource.TranslateResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Callable;
+
+public class ServerStatusWatcher implements Runnable {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServerStatusWatcher.class);
+
+    private final ServerStatus serverStatus;
+    private final long timeoutMillis;
+    private final long pulseMillis;
+
+    public ServerStatusWatcher(ServerStatus serverStatus, long timeoutMillis, long pulseMillis) {
+        this.serverStatus = serverStatus;
+        this.timeoutMillis = timeoutMillis;
+        this.pulseMillis = pulseMillis;
+    }
+
+    @Override
+    public void run() {
+        ServerStatus.STATUS status = serverStatus.getStatus();
+        while (status.equals(ServerStatus.STATUS.OPEN)) {
+            try {
+                Thread.sleep(pulseMillis);
+            } catch (InterruptedException e) {
+            }
+            checkForTimeouts();
+            status = serverStatus.getStatus();
+        }
+        if (! status.equals(ServerStatus.STATUS.OPEN)) {
+            LOG.warn("child process shutting down with status: {}", status);
+            System.exit(status.getShutdownCode());
+        }
+    }
+
+    private void checkForTimeouts() {
+        Instant now = Instant.now();
+        for (TaskStatus status : serverStatus.getTasks().values()) {
+            long millisElapsed = Duration.between(now, status.started).toMillis();
+            if (millisElapsed > timeoutMillis) {
+                serverStatus.setStatus(ServerStatus.STATUS.TIMEOUT);
+                if (status.fileName.isPresent()) {
+                    LOG.error("Timeout task {}, millis elapsed {}, file {}",
+                            status.task.toString(), Long.toString(millisElapsed), status.fileName.get());
+                } else {
+                    LOG.error("Timeout task {}, millis elapsed {}",
+                            status.task.toString(), Long.toString(millisElapsed));
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java b/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java
new file mode 100644
index 0000000..1637d7d
--- /dev/null
+++ b/tika-server/src/main/java/org/apache/tika/server/TaskStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TaskStatus {
+    final ServerStatus.TASK task;
+    final Instant started;
+    final Optional<String> fileName;
+
+    TaskStatus(ServerStatus.TASK task, Instant started, String fileName) {
+        this.task = task;
+        this.started = started;
+        this.fileName = Optional.ofNullable(fileName);
+    }
+
+
+    @Override
+    public String toString() {
+        return "";
+    }
+
+}
diff --git a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
index 03d582e..af8fd8f 100644
--- a/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
+++ b/tika-server/src/main/java/org/apache/tika/server/TikaServerCli.java
@@ -17,6 +17,7 @@
 
 package org.apache.tika.server;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -62,6 +63,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TikaServerCli {
+
+
+    //used in spawn-child mode
+    private static final long PULSE_MILLIS = 100;
+    private static final int DEFAULT_MAX_FILES = -1;
+    private static final long DEFAULT_TIME_OUT_MS = 60000;
+    private static final long DEFAULT_PULSE_MS = 500;
+    private static Thread SHUTDOWN_HOOK = null;
+
+
     public static final int DEFAULT_PORT = 9998;
     private static final int DEFAULT_DIGEST_MARK_LIMIT = 20*1024*1024;
     public static final String DEFAULT_HOST = "localhost";
@@ -88,14 +99,114 @@ public class TikaServerCli {
         options.addOption("?", "help", false, "this help message");
         options.addOption("enableUnsecureFeatures", false, "this is required to enable fileUrl.");
         options.addOption("enableFileUrl", false, "allows user to pass in fileUrl instead of InputStream.");
-
+        options.addOption("spawnChild", false, "whether or not to spawn a child process for robustness");
+        options.addOption("maxFiles", false, "shutdown server after this many files -- use only in 'spawnChild' mode");
         return options;
     }
 
     public static void main(String[] args) {
         LOG.info("Starting {} server", new Tika());
+        try {
+            execute(args);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error("Can't start", e);
+            System.exit(-1);
+        }
+    }
+
+    private static void execute(String[] args) throws Exception {
+        boolean spawnChild = false;
+        for (int i = 0; i < args.length; i++) {
+            if ("-spawnChild".equals(args[i]) || "--spawnChild".equals(args[i])) {
+                spawnChild = true;
+                break;
+            }
+        }
+        if (spawnChild) {
+            spawnChild(args);
+        } else {
+            executeLegacy(args);
+        }
+    }
 
+    private static void spawnChild(String[] args) throws Exception {
+        Process child = start(args);
         try {
+            while (true) {
+                Thread.sleep(PULSE_MILLIS);
+
+                int exitValue = Integer.MAX_VALUE;
+                try {
+                    exitValue = child.exitValue();
+                } catch (IllegalThreadStateException e) {
+                    //process is still running
+                }
+                if (exitValue != Integer.MAX_VALUE) {
+                    if (exitValue != ServerStatus.STATUS.PARENT_REQUESTED_SHUTDOWN.getShutdownCode()) {
+                        LOG.warn("child exited with code ({}) -- restarting, now", Integer.toString(exitValue));
+                        child.destroyForcibly();
+                        child = start(args);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            //interrupted...shutting down
+        } finally {
+            child.destroyForcibly();
+        }
+    }
+
+    private static Process start(String[] args) throws IOException {
+        ProcessBuilder builder = new ProcessBuilder();
+        builder.inheritIO();
+        List<String> argList = new ArrayList<>();
+        List<String> jvmArgs = extractJVMArgs(args);
+        List<String> childArgs = extractArgs(args);
+        argList.add("java");
+            if (! jvmArgs.contains("-cp") && ! jvmArgs.contains("--classpath")) {
+                String cp = System.getProperty("java.class.path");
+                jvmArgs.add("-cp");
+                jvmArgs.add(cp);
+            }
+        argList.addAll(jvmArgs);
+        argList.add("org.apache.tika.server.TikaServerCli");
+        argList.addAll(childArgs);
+
+        builder.command(argList);
+
+        Process process = builder.start();
+
+        if (SHUTDOWN_HOOK != null) {
+            Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
+        }
+        SHUTDOWN_HOOK = new Thread(() -> process.destroy());
+        Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
+        return process;
+    }
+
+    private static List<String> extractArgs(String[] args) {
+        List<String> argList = new ArrayList<>();
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].startsWith("-J") || args[i].equals("-spawnChild") || args[i].equals("--spawnChild")) {
+                continue;
+            }
+            argList.add(args[i]);
+        }
+        return argList;
+    }
+
+    private static List<String> extractJVMArgs(String[] args) {
+        List<String> jvmArgs = new ArrayList<>();
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].startsWith("-J")) {
+                jvmArgs.add("-"+args[i].substring(2));
+            }
+        }
+        return jvmArgs;
+    }
+
+    private static void executeLegacy(String[] args) throws Exception {
             Options options = getOptions();
 
             CommandLineParser cliParser = new GnuParser();
@@ -196,6 +307,21 @@ public class TikaServerCli {
                 inputStreamFactory = new DefaultInputStreamFactory();
             }
 
+            int maxFiles = DEFAULT_MAX_FILES;
+            if (line.hasOption("maxFiles")) {
+                maxFiles = Integer.parseInt(line.getOptionValue("maxFiles"));
+            }
+
+            long timeoutMS = DEFAULT_TIME_OUT_MS;
+            if (line.hasOption("timeoutMS")) {
+                timeoutMS = Long.parseLong(line.getOptionValue("timeoutMS"));
+            }
+            long pulseMS = DEFAULT_PULSE_MS;
+            if (line.hasOption("pulseMS")) {
+                pulseMS = Long.parseLong(line.getOptionValue("pulseMS"));
+            }
+            ServerStatus serverStatus = new ServerStatus(maxFiles);
+            new Thread(new ServerStatusWatcher(serverStatus, timeoutMS, pulseMS)).start();
             TikaResource.init(tika, digester, inputStreamFactory);
             JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
 
@@ -241,9 +367,5 @@ public class TikaServerCli {
             manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory);
             sf.create();
             LOG.info("Started Apache Tika server at {}", url);
-        } catch (Exception ex) {
-            LOG.error("Can't start", ex);
-            System.exit(-1);
-        }
     }
 }
diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java
new file mode 100644
index 0000000..8568c6c
--- /dev/null
+++ b/tika-server/src/test/java/org/apache/tika/server/ServerIntegrationTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.tika.TikaTest;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.OfficeOpenXMLExtended;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.junit.Test;
+
+import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+public class ServerIntegrationTest extends TikaTest {
+    private static final String TEST_RECURSIVE_DOC = "test_recursive_embedded.docx";
+    private static final String META_PATH = "/rmeta";
+    protected static final String endPoint =
+            "http://localhost:" + TikaServerCli.DEFAULT_PORT;
+
+    @Test
+    public void testBasic() throws Exception {
+
+        Thread serverThread = new Thread() {
+            @Override
+            public void run() {
+                TikaServerCli.main(
+                        new String[]{
+                                "-spawnChild", "-p", Integer.toString(TikaServerCli.DEFAULT_PORT)
+                        });
+            }
+        };
+        serverThread.start();
+        //test for the server being available...rather than this sleep call
+        Thread.sleep(20000);
+        Response response = WebClient
+                .create(endPoint + META_PATH)
+                .accept("application/json")
+                .put(ClassLoader
+                        .getSystemResourceAsStream(TEST_RECURSIVE_DOC));
+        Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+        List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
+        assertEquals(12, metadataList.size());
+        assertEquals("Microsoft Office Word", metadataList.get(0).get(OfficeOpenXMLExtended.APPLICATION));
+        assertContains("plundered our seas", metadataList.get(6).get("X-TIKA:content"));
+
+        //assertEquals("a38e6c7b38541af87148dee9634cb811", metadataList.get(10).get("X-TIKA:digest:MD5"));
+
+        serverThread.interrupt();
+
+
+    }
+}
diff --git a/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java
new file mode 100644
index 0000000..23880ff
--- /dev/null
+++ b/tika-server/src/test/java/org/apache/tika/server/ServerStatusTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ServerStatusTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBadId() throws Exception {
+        ServerStatus status = new ServerStatus(-1);
+        status.complete(2);
+    }
+
+    @Test(timeout = 60000)
+    public void testBasicMultiThreading() throws Exception {
+        //make sure that synchronization is basically working
+        int numThreads = 100;
+        int filesToProcess = 100;
+        ExecutorService service = Executors.newFixedThreadPool(100);
+        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
+        ServerStatus serverStatus = new ServerStatus(-1);
+        for (int i = 0; i < numThreads; i++) {
+            completionService.submit(new MockTask(serverStatus, filesToProcess));
+        }
+        int finished = 0;
+        int totalProcessed = 0;
+        while (finished < numThreads) {
+            Future<Integer> future = completionService.take();
+            if (future != null) {
+                finished++;
+                Integer completed = future.get();
+                totalProcessed += completed;
+            }
+        }
+        assertEquals(numThreads*filesToProcess, totalProcessed);
+        assertEquals(0, serverStatus.getTasks().size());
+        assertEquals(totalProcessed, serverStatus.getFilesProcessed());
+
+    }
+
+    private class MockTask implements Callable<Integer> {
+        Random r = new Random();
+        private final ServerStatus serverStatus;
+        private final int filesToProcess;
+        public MockTask(ServerStatus serverStatus, int filesToProcess) {
+            this.serverStatus = serverStatus;
+            this.filesToProcess = filesToProcess;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            int processed = 0;
+            for (int i = 0; i < filesToProcess; i++) {
+                sleepRandom(200);
+                int taskId = serverStatus.start(ServerStatus.TASK.PARSE, null);
+                sleepRandom(100);
+                serverStatus.complete(taskId);
+                processed++;
+                serverStatus.getStatus();
+                sleepRandom(10);
+                serverStatus.setStatus(ServerStatus.STATUS.OPEN);
+                sleepRandom(20);
+                Map<Integer, TaskStatus> tasks = serverStatus.getTasks();
+                assertNotNull(tasks);
+            }
+            return processed;
+        }
+
+        private void sleepRandom(int millis) throws InterruptedException {
+            int sleep = r.nextInt(millis);
+            Thread.sleep(sleep);
+        }
+    }
+}