You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/10/27 13:50:15 UTC

[tika] branch main updated (cd96631 -> 3eaf766)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git.


    from cd96631  TIKA-3585 -- general updates for main
     new fb5e9af  remove duplicated dependency
     new a78bca9  TIKA-3581 -- align tika-server-config-default.xml with TikaServerConfig.
     new 3eaf766  TIKA-3586 -- fix race condition when starting multiple servers on multiple ports

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   3 +
 tika-parsers/tika-parsers-ml/tika-dl/pom.xml       |   5 --
 .../apache/tika/server/core/TikaServerConfig.java  |  18 ++--
 .../tika/server/core/TikaServerWatchDog.java       | 100 +++++++++++++++------
 .../main/resources/tika-server-config-default.xml  |  26 ++----
 .../tika/server/core/TikaServerConfigTest.java     |  26 ++++++
 6 files changed, 124 insertions(+), 54 deletions(-)

[tika] 03/03: TIKA-3586 -- fix race condition when starting multiple servers on multiple ports

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 3eaf766b323b2a3d804268ae0e2bba6bd8e08117
Author: tballison <ta...@apache.org>
AuthorDate: Wed Oct 27 09:50:04 2021 -0400

    TIKA-3586 -- fix race condition when starting multiple servers on multiple ports
---
 CHANGES.txt                                        |   3 +
 .../tika/server/core/TikaServerWatchDog.java       | 100 +++++++++++++++------
 2 files changed, 78 insertions(+), 25 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 368faca..97cb1c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
 Release 2.1.1 - ???
 
+   * Fix race condition when starting multiple forked
+     servers on multiple ports (TIKA-3586).
+
    * Add timeout per task to be configured via headers
      for tika-server's legacy endpoints /tika and /rmeta.
      Note that this timeout greater than taskTimeoutMillis (TIKA-3582).
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
index 192f520..b23c6ba 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerWatchDog.java
@@ -39,7 +39,9 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -51,8 +53,8 @@ import org.apache.tika.utils.ProcessUtils;
 public class TikaServerWatchDog implements Callable<WatchDogResult> {
 
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerWatchDog.class);
-    private static Thread SHUTDOWN_HOOK = null;
-
+    private static Set<Process> PROCESSES = ConcurrentHashMap.newKeySet();
+    private static Set<ForkedProcess> FORKED_PROCESSES = ConcurrentHashMap.newKeySet();
     private final int port;
     private final String id;
     private final TikaServerConfig tikaServerConfig;
@@ -64,6 +66,23 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
     private volatile boolean shutDown = false;
 
 
+    static {
+        Thread shutdownHook = new Thread(() -> {
+            //prioritize destroying processes
+            for (Process process : PROCESSES) {
+                process.destroyForcibly();
+            }
+            //once that's done, try to clean up tmp files too
+            for (ForkedProcess forkedProcess : FORKED_PROCESSES) {
+                try {
+                    forkedProcess.close();
+                } catch (DoNotRestartException | InterruptedException e) {
+                    //swallow
+                }
+            }
+        });
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
 
     TikaServerWatchDog(int port, String id, TikaServerConfig tikaServerConfig) {
         this.port = port;
@@ -90,17 +109,20 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         gobbler.start();
     }
 
-    private static synchronized void destroyForkedForcibly(Process process) {
+    private static synchronized void destroyForkedForcibly(Process process)
+            throws InterruptedException {
+
         process = process.destroyForcibly();
         try {
             boolean destroyed = process.waitFor(60, TimeUnit.SECONDS);
+
             if (!destroyed) {
                 LOG.error("Forked process still alive after 60 seconds. " +
                         "Shutting down the forking process.");
                 System.exit(1);
             }
-        } catch (InterruptedException e) {
-            //swallow
+        } finally {
+            PROCESSES.remove(process);
         }
     }
 
@@ -127,19 +149,19 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                     if (exited) {
                         LOG.info("forked process exited with exit value {}",
                                 forkedProcess.process.exitValue());
-                        forkedProcess.close();
+                        closeForkedProcess(forkedProcess);
                         mustRestart = true;
                     } else {
                         ForkedStatus status = forkedProcess.readStatus();
                         if (status.status == FORKED_STATUS.FAILED_COMMUNICATION.ordinal()) {
                             LOG.info("failed to read from status file. Restarting now.");
-                            forkedProcess.close();
+                            closeForkedProcess(forkedProcess);
                             mustRestart = true;
                         } else if (status.status == FORKED_STATUS.SHUTTING_DOWN.ordinal()) {
                             LOG.info("Forked process is in shutting down mode.  Will wait a bit");
                             forkedProcess.process.waitFor(tikaServerConfig.getTaskTimeoutMillis(),
                                     TimeUnit.MILLISECONDS);
-                            forkedProcess.close();
+                            closeForkedProcess(forkedProcess);
                             mustRestart = true;
                         } else {
                             long elapsed = Duration.between(Instant.ofEpochMilli(status.timestamp),
@@ -149,7 +171,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                                         "{} ms have elapsed since forked process " +
                                                 "last updated status. " +
                                                 "Shutting down and restarting.", elapsed);
-                                forkedProcess.close();
+                                closeForkedProcess(forkedProcess);
                                 mustRestart = true;
                             }
                         }
@@ -163,11 +185,20 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
             LOG.debug("about to shutdown");
             if (forkedProcess != null) {
                 LOG.info("about to shutdown process");
-                forkedProcess.close();
+                closeForkedProcess(forkedProcess);
             }
         }
     }
 
+    private static void closeForkedProcess(ForkedProcess forkedProcess)
+            throws DoNotRestartException, InterruptedException {
+        try {
+            forkedProcess.close();
+        } finally {
+            FORKED_PROCESSES.remove(forkedProcess);
+        }
+    }
+
     private ForkedProcess startForkedProcess(int restarts) throws Exception {
         int consecutiveRestarts = 0;
         //if there's a bind exception, retry for 5 seconds to give the OS
@@ -175,7 +206,9 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
         int maxBind = 5;
         while (consecutiveRestarts < maxBind) {
             try {
-                return new ForkedProcess(restarts);
+                ForkedProcess forkedProcess = new ForkedProcess(restarts);
+                FORKED_PROCESSES.add(forkedProcess);
+                return forkedProcess;
             } catch (BindException e) {
                 LOG.warn("WatchDog observes bind exception on retry {}. " +
                         "Will retry {} times.", consecutiveRestarts, maxBind);
@@ -186,7 +219,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                 }
             }
         }
-        throw new RuntimeException("Couldn't start child process");
+        throw new RuntimeException("Couldn't start forked process");
     }
 
 
@@ -307,7 +340,7 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
             return new ForkedStatus(-1, FORKED_STATUS.FAILED_COMMUNICATION.ordinal(), -1);
         }
 
-        private void close() throws DoNotRestartException {
+        private void close() throws DoNotRestartException, InterruptedException {
 
             try {
                 if (!process.isAlive()) {
@@ -374,25 +407,42 @@ public class TikaServerWatchDog implements Callable<WatchDogResult> {
                 builder.environment().put("TIKA_CONFIG", tikaConfigEnv);
             }
             Process process = builder.start();
+            PROCESSES.add(process);
             //redirect stdout to parent stderr to avoid error msgs
             //from maven during build: Corrupted STDOUT by directly writing to
             // native stream in forked
             redirectIO(process.getInputStream(), System.err);
             redirectIO(process.getErrorStream(), System.err);
-            if (SHUTDOWN_HOOK != null) {
-                Runtime.getRuntime().removeShutdownHook(SHUTDOWN_HOOK);
+            return process;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
             }
-            SHUTDOWN_HOOK = new Thread(() -> {
-                try {
-                    this.close();
-                } catch (DoNotRestartException e) {
-                    LOG.error("should not restart", e);
-                    shutDown();
-                }
-            });
-            Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
 
-            return process;
+            ForkedProcess that = (ForkedProcess) o;
+
+            if (!process.equals(that.process)) {
+                return false;
+            }
+            if (!forkedStatusFile.equals(that.forkedStatusFile)) {
+                return false;
+            }
+            return statusBuffer.equals(that.statusBuffer);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = process.hashCode();
+            result = 31 * result + forkedStatusFile.hashCode();
+            result = 31 * result + statusBuffer.hashCode();
+            return result;
         }
     }
+
 }

[tika] 02/03: TIKA-3581 -- align tika-server-config-default.xml with TikaServerConfig.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit a78bca9005ba32994652959e7a4d57e98c12d7ef
Author: tballison <ta...@apache.org>
AuthorDate: Wed Oct 27 09:20:28 2021 -0400

    TIKA-3581 -- align tika-server-config-default.xml with TikaServerConfig.
---
 .../apache/tika/server/core/TikaServerConfig.java  | 18 ++++++++++-----
 .../main/resources/tika-server-config-default.xml  | 26 +++++++---------------
 .../tika/server/core/TikaServerConfigTest.java     | 26 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 24 deletions(-)

diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
index 2a67db9..b69c361 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
@@ -73,8 +73,8 @@ public class TikaServerConfig extends ConfigBase {
                     " your emitter endpoints.  See CVE-2015-3271.\n" +
                     "Please make sure you know what you are doing.";
     private static final List<String> ONLY_IN_FORK_MODE = Arrays.asList(
-            new String[]{"taskTimeoutMillis", "taskPulseMillis", "pingTimeoutMillis",
-                    "pingPulseMillis", "maxFiles", "javaHome", "maxRestarts", "numRestarts",
+            new String[]{"taskTimeoutMillis", "taskPulseMillis",
+                    "maxFiles", "javaPath", "maxRestarts", "numRestarts",
                     "forkedStatusFile", "maxForkedStartupMillis", "tmpFilePrefix"});
 
         /*
@@ -100,7 +100,8 @@ public class TikaServerConfig extends ConfigBase {
     private String cors = "";
     private boolean returnStackTrace = false;
     private boolean noFork = false;
-    private String tempFilePrefix = "apache-tika-server-forked-tmp-"; //can be set for debugging
+    //TODO: make parameterizable for debugging
+    private String tempFilePrefix = "apache-tika-server-forked-tmp-";
     private Set<String> supportedFetchers = new HashSet<>();
     private Set<String> supportedEmitters = new HashSet<>();
     private List<String> forkedJvmArgs = new ArrayList<>();
@@ -109,6 +110,7 @@ public class TikaServerConfig extends ConfigBase {
     private String host = DEFAULT_HOST;
     private int digestMarkLimit = DEFAULT_DIGEST_MARK_LIMIT;
     private String digest = "";
+    private String javaPath = "java";
     //debug or info only
     private String logLevel = "";
     private Path configPath;
@@ -259,7 +261,7 @@ public class TikaServerConfig extends ConfigBase {
         return noFork;
     }
 
-    private void setNoFork(boolean noFork) {
+    public void setNoFork(boolean noFork) {
         this.noFork = noFork;
     }
 
@@ -360,7 +362,11 @@ public class TikaServerConfig extends ConfigBase {
      * @return
      */
     public String getJavaPath() {
-        return "java";
+        return javaPath;
+    }
+
+    public void setJavaPath(String javaPath) {
+        this.javaPath = javaPath;
     }
 
     public List<String> getForkedJvmArgs() {
@@ -514,7 +520,7 @@ public class TikaServerConfig extends ConfigBase {
         return idBase;
     }
 
-    private void setId(String id) {
+    public void setId(String id) {
         this.idBase = id;
     }
 
diff --git a/tika-server/tika-server-core/src/main/resources/tika-server-config-default.xml b/tika-server/tika-server-core/src/main/resources/tika-server-config-default.xml
index e7a3698..56a2889 100644
--- a/tika-server/tika-server-core/src/main/resources/tika-server-config-default.xml
+++ b/tika-server/tika-server-core/src/main/resources/tika-server-config-default.xml
@@ -33,7 +33,7 @@
           forked server has to restart, it will maintain its original id.
           If not specified, a UUID will be generated.
           -->
-      <id></id>
+      <id>my-id</id>
       <!-- whether or not to allow CORS requests. Set to 'all' if you
           want to allow all CORS requests. Set to NONE or leave blank
            if you do not want to enable CORS. -->
@@ -46,12 +46,10 @@
           spooling to disc...only if digest is selected -->
       <digestMarkLimit>1000000</digestMarkLimit>
       <!-- request URI log level 'debug' or 'info' -->
-      <log>info</log>
+      <logLevel>info</logLevel>
       <!-- whether or not to include the stacktrace when a parse exception happens
           in the data returned to the user -->
-      <includeStack>false</includeStack>
-      <!-- whether or not to enable the status endpoint -->
-      <status>false</status>
+      <returnStackTrace>false</returnStackTrace>
       <!-- If set to 'true', this runs tika server "in process"
           in the legacy 1.x mode.
           This means that the server will be susceptible to infinite loops
@@ -61,20 +59,13 @@
           (this was called -spawnChild mode in 1.x).
           nofork=false is the default in 2.x
       -->
-      <nofork>false</nofork>
+      <noFork>false</noFork>
       <!-- maximum time to allow per parse before shutting down and restarting
           the forked parser. Not allowed if nofork=true. -->
       <taskTimeoutMillis>300000</taskTimeoutMillis>
       <!-- how often to check whether a parse has timed out.
           Not allowed if nofork=true. -->
       <taskPulseMillis>10000</taskPulseMillis>
-      <!-- maximum time to allow for a response from the forked process
-          before shutting it down and restarting it.
-          Not allowed if nofork=true. -->
-      <pingTimeoutMillis>60000</pingTimeoutMillis>
-      <!-- how often to check whether the fork process needs to be restarted
-          Not allowed if nofork=true. -->
-      <pingPulseMillis>10000</pingPulseMillis>
       <!-- maximum amount of time to wait for a forked process to
           start up.
           Not allowed if nofork=true. -->
@@ -88,12 +79,11 @@
           memory leaks.
           Not allowed if nofork=true. -->
       <maxFiles>100000</maxFiles>
-      <!-- if you want to specify a specific javaHome for
-          the forked process.
+      <!-- if you want to specify a specific javaPath for
+          the forked process.  This should be the full path
+          including the executable, e.g.: /usr/bin/java
           Not allowed if nofork=true. -->
-      <javaHome></javaHome>
-      <!-- this is for debugging only -->
-      <tmpFilePrefix></tmpFilePrefix>
+      <javaPath>java</javaPath>
     </params>
   </server>
 </properties>
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
index a9716ad..0f9a25c 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
@@ -27,10 +27,12 @@ import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.junit.jupiter.api.Test;
 
 import org.apache.tika.config.TikaConfigTest;
+import org.apache.tika.utils.ProcessUtils;
 
 public class TikaServerConfigTest {
 
@@ -72,4 +74,28 @@ public class TikaServerConfigTest {
         assertTrue(config.getSupportedFetchers().contains("fsf"));
         assertTrue(config.getSupportedEmitters().contains("fse"));
     }
+
+    @Test
+    public void testPorts() throws Exception {
+        CommandLineParser parser = new DefaultParser();
+        Path path = Paths.get(TikaConfigTest.class.getResource(
+                "/configs/tika-config-server.xml").toURI());
+        CommandLine commandLine =
+                parser.parse(
+                        new Options()
+                                .addOption(Option.builder("p").longOpt("port").hasArg().build())
+                                .addOption(Option.builder("c").longOpt("config").hasArg().build()
+                                ),
+                        new String[]{
+                                "-p", "9994-9999",
+                                "-c",
+                                ProcessUtils.escapeCommandLine(path.toAbsolutePath().toString())
+                        });
+        TikaServerConfig config = TikaServerConfig
+                .load(commandLine);
+        int[] ports = config.getPorts();
+        assertEquals(6, ports.length);
+        assertEquals(9994, ports[0]);
+        assertEquals(9999, ports[5]);
+    }
 }

[tika] 01/03: remove duplicated dependency

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit fb5e9afcd87ed0bf04eab4f2175cc902d1fabab9
Author: tballison <ta...@apache.org>
AuthorDate: Tue Oct 26 15:14:38 2021 -0400

    remove duplicated dependency
---
 tika-parsers/tika-parsers-ml/tika-dl/pom.xml | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/tika-parsers/tika-parsers-ml/tika-dl/pom.xml b/tika-parsers/tika-parsers-ml/tika-dl/pom.xml
index f448328..77b558b 100644
--- a/tika-parsers/tika-parsers-ml/tika-dl/pom.xml
+++ b/tika-parsers/tika-parsers-ml/tika-dl/pom.xml
@@ -97,11 +97,6 @@
       <version>${dl4j.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.datavec</groupId>
-      <artifactId>datavec-data-image</artifactId>
-      <version>${dl4j.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>