You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/05/13 13:22:11 UTC

[tika] 01/02: TIKA-1570 -- add a stop method to TikaServerCli

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit ba631c095a67ee9c11502147e9bc4f7735415373
Author: tallison <ta...@apache.org>
AuthorDate: Fri May 13 09:13:33 2022 -0400

    TIKA-1570 -- add a stop method to TikaServerCli
---
 CHANGES.txt                                        |  3 +
 .../org/apache/tika/server/core/TikaServerCli.java | 67 +++++++++++++++++-----
 .../apache/tika/server/core/TikaServerConfig.java  | 10 ++++
 .../tika/server/core/TikaServerWatchDog.java       | 22 ++++++-
 .../server/core/TikaServerIntegrationTest.java     | 33 +++++++++++
 5 files changed, 120 insertions(+), 15 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 39acd5973..4175b99e1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
 Release 2.4.1 - ???
 
+   * Add stop() method to TikaServerCli so that it can be run
+     with Apache Commons Daemon (TIKA-1570).
+
    * Fixed bug in ordering of Parsers during service loading (TIKA-3750).
 
    * Users can expand system properties from the forking
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index 7346b59d8..fe74035fc 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -42,6 +42,9 @@ public class TikaServerCli {
      * This value is set to the server's id in the forked process.
      */
     public static String TIKA_SERVER_ID_ENV = "tika.server.id";
+    private static List<TikaServerWatchDog> WATCHERS = new ArrayList<>();
+
+    private static boolean PREVENT_STOP = false;
 
     private static Options getOptions() {
         Options options = new Options();
@@ -63,23 +66,26 @@ public class TikaServerCli {
 
     public static void main(String[] args) {
         try {
-            execute(args);
+            Options options = getOptions();
+
+            CommandLineParser cliParser = new DefaultParser();
+
+            CommandLine line = cliParser.parse(options, args);
+            if (line.hasOption("help")) {
+                usage(options);
+            }
+            TikaServerConfig tikaServerConfig = TikaServerConfig.load(line);
+            PREVENT_STOP = tikaServerConfig.isPreventStopMethod();
+
+            execute(tikaServerConfig);
         } catch (Exception e) {
             LOG.error("Can't start: ", e);
             System.exit(-1);
         }
     }
 
-    private static void execute(String[] args) throws Exception {
-        Options options = getOptions();
-
-        CommandLineParser cliParser = new DefaultParser();
+    private static void execute(TikaServerConfig tikaServerConfig) throws Exception {
 
-        CommandLine line = cliParser.parse(options, args);
-        if (line.hasOption("help")) {
-            usage(options);
-        }
-        TikaServerConfig tikaServerConfig = TikaServerConfig.load(line);
         if (tikaServerConfig.isNoFork()) {
             noFork(tikaServerConfig);
         } else {
@@ -99,11 +105,11 @@ public class TikaServerCli {
         ExecutorService executorService = Executors.newFixedThreadPool(portIdPairs.size());
         ExecutorCompletionService<WatchDogResult> executorCompletionService =
                 new ExecutorCompletionService<>(executorService);
-        List<TikaServerWatchDog> watchers = new ArrayList<>();
+
         for (PortIdPair p : portIdPairs) {
             TikaServerWatchDog watcher = new TikaServerWatchDog(p.port, p.id, tikaServerConfig);
             executorCompletionService.submit(watcher);
-            watchers.add(watcher);
+            WATCHERS.add(watcher);
         }
 
         int finished = 0;
@@ -118,7 +124,7 @@ public class TikaServerCli {
                 }
             }
         } catch (InterruptedException e) {
-            for (TikaServerWatchDog w : watchers) {
+            for (TikaServerWatchDog w : WATCHERS) {
                 w.shutDown();
             }
             LOG.debug("thread interrupted", e);
@@ -127,6 +133,41 @@ public class TikaServerCli {
             executorService.shutdownNow();
         }
     }
+    public static void stop(String [] args) {
+        if (PREVENT_STOP) {
+            LOG.info("preventStopMethod was set to true in the server config. I'm not stopping.");
+        }
+        // process service stop function
+        try {
+            Options options = getStopOptions();
+            CommandLineParser cliParser = new DefaultParser();
+            CommandLine line = cliParser.parse(options, args);
+            LOG.debug("Seeing 'preventSystemExit' on stop's commandline; not exiting");
+            if (line.hasOption("preventSystemExit")) {
+                return;
+            }
+        } catch (org.apache.commons.cli.ParseException e) {
+            LOG.error("Can't parse stop arguments: ", e);
+            System.exit(-1);
+        }
+
+        for (TikaServerWatchDog watcher : WATCHERS) {
+            try {
+                watcher.close();
+            } catch (Exception e) {
+                LOG.warn("Exception trying to close watcher", e);
+            }
+        }
+        System.exit(0);
+    }
+
+    private static Options getStopOptions() {
+        Options options = new Options();
+        options.addOption("preventSystemExit", false,
+                "Prevent the stop method from calling system.exit, " +
+                        "which would terminate the JVM. This is useful for integration tests.");
+        return options;
+    }
 
     private static String[] stripForkedArgs(String[] args) {
         List<String> ret = new ArrayList<>();
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
index 75254a968..aa3d7c242 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
@@ -136,6 +136,8 @@ public class TikaServerConfig extends ConfigBase {
     private String forkedStatusFile;
     private int numRestarts = 0;
 
+    private boolean preventStopMethod = false;
+
     private TlsConfig tlsConfig = new TlsConfig();
     /**
      * Config with only the defaults
@@ -589,6 +591,14 @@ public class TikaServerConfig extends ConfigBase {
         this.maxforkedStartupMillis = maxforkedStartupMillis;
     }
 
+    public void setPreventStopMethod(boolean preventStopMethod) {
+        this.preventStopMethod = preventStopMethod;
+    }
+
+    public boolean isPreventStopMethod() {
+        return preventStopMethod;
+    }
+
     public int[] getPorts() {
         return getPorts(port);
     }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
index 91eaec464..fa8dae289 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
@@ -141,6 +141,14 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                 try {
                     if (mustRestart) {
                         forkedProcess = startForkedProcess(restarts++);
+                        if (forkedProcess == null) {
+                            if (! shutDown) {
+                                throw new IllegalArgumentException("forked process should not be " +
+                                        "null when not in shutdown mode");
+                            } else {
+                                return new WatchDogResult(port, id, restarts);
+                            }
+                        }
                         setForkedStatus(FORKED_STATUS.RUNNING);
                         mustRestart = false;
                     }
@@ -190,6 +198,13 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         }
     }
 
+    public synchronized void close() throws DoNotRestartException, InterruptedException {
+        setForkedStatus(FORKED_STATUS.SHUTTING_DOWN);
+        LOG.debug("received 'close()'; about to shutdown");
+        shutDown();
+        closeForkedProcess(forkedProcess);
+    }
+
     private static void closeForkedProcess(ForkedProcess forkedProcess)
             throws DoNotRestartException, InterruptedException {
         try {
@@ -199,12 +214,12 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         }
     }
 
-    private ForkedProcess startForkedProcess(int restarts) throws Exception {
+    private synchronized ForkedProcess startForkedProcess(int restarts) throws Exception {
         int consecutiveRestarts = 0;
         //if there's a bind exception, retry for 5 seconds to give the OS
         //a chance to release the port
         int maxBind = 5;
-        while (consecutiveRestarts < maxBind) {
+        while (consecutiveRestarts < maxBind && ! shutDown) {
             try {
                 ForkedProcess forkedProcess = new ForkedProcess(restarts);
                 FORKED_PROCESSES.add(forkedProcess);
@@ -219,6 +234,9 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                 }
             }
         }
+        if (shutDown) {
+            return null;
+        }
         throw new RuntimeException("Couldn't start forked process");
     }
 
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
index aa9629f3e..bffbb0ed7 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
@@ -506,6 +506,39 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
         }
     }
 
+    @Test
+    @Disabled("figure out how to test this with the forked process")
+    public void testSystemExitViaStopMethod() throws Exception {
+
+        Thread serverThread = new Thread() {
+            @Override
+            public void run() {
+                TikaServerCli.main(new String[]{"-p", INTEGRATION_TEST_PORT,});
+            }
+
+            //Add custom implementation of the destroy method
+            //This method was never implemented in the super class, and gives us
+            // an easy way to invoke our stop command.
+            //We pass in the preventSystemExit option to stop the call to System.Exit,
+            // which would terminate the JVM and cause a test failure.
+            @Override
+            public void interrupt() {
+                TikaServerCli.stop(new String[]{"-preventSystemExit"});
+            }
+        };
+        serverThread.start();
+        awaitServerStartup();
+        serverThread.interrupt();
+
+        //give some time for the server to crash/kill itself
+        Thread.sleep(2000);
+
+        try {
+            testBaseline();
+        } finally {
+            serverThread.interrupt();
+        }
+    }
 
     @Test
     @Disabled("turn this into a real test")