You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/10/26 14:27:11 UTC

[tika] branch main updated: TIKA-3584 -- allow retries for forked process startup timeouts; and do not swallow InterruptedExceptions

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new c544010  TIKA-3584 -- allow retries for forked process startup timeouts; and do not swallow InterruptedExceptions
c544010 is described below

commit c5440102b7317314f757dfcbabc11905995c568b
Author: tballison <ta...@apache.org>
AuthorDate: Tue Oct 26 10:21:42 2021 -0400

    TIKA-3584 -- allow retries for forked process startup timeouts; and do not swallow InterruptedExceptions
---
 .../java/org/apache/tika/pipes/PipesClient.java    | 153 +++++++++++----------
 .../org/apache/tika/pipes/PipesConfigBase.java     |  13 ++
 .../java/org/apache/tika/pipes/PipesParser.java    |   6 +-
 .../java/org/apache/tika/pipes/PipesServer.java    |   3 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    |   6 +-
 .../tika/server/core/resource/PipesResource.java   |   2 +-
 6 files changed, 102 insertions(+), 81 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 52999f8..79dcc4f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -54,27 +54,25 @@ public class PipesClient implements Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);
     private static final int MAX_BYTES_BEFORE_READY = 20000;
     private static AtomicInteger CLIENT_COUNTER = new AtomicInteger(0);
-
-    private Process process;
     private final PipesConfigBase pipesConfig;
-    private DataOutputStream output;
-    private DataInputStream input;
     private final int pipesClientId;
     private final ExecutorService executorService = Executors.newFixedThreadPool(1);
+    private Process process;
+    private DataOutputStream output;
+    private DataInputStream input;
+    private int filesProcessed = 0;
 
     public PipesClient(PipesConfigBase pipesConfig) {
         this.pipesConfig = pipesConfig;
         this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
     }
 
-    private int filesProcessed = 0;
-
     public int getFilesProcessed() {
         return filesProcessed;
     }
 
     private boolean ping() {
-        if (process == null || ! process.isAlive()) {
+        if (process == null || !process.isAlive()) {
             return false;
         }
         try {
@@ -98,19 +96,27 @@ public class PipesClient implements Closeable {
         executorService.shutdownNow();
     }
 
-    public PipesResult process(FetchEmitTuple t) throws IOException {
-        if (! ping()) {
-            restart();
-        }
-        if (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
-                filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess()) {
+    public PipesResult process(FetchEmitTuple t) throws IOException, InterruptedException {
+
+        if (!ping() || (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
+                filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess())) {
             LOG.info("restarting server after hitting max files: " + filesProcessed);
-            restart();
+            boolean successfulRestart = false;
+            while (!successfulRestart) {
+                try {
+                    restart();
+                    successfulRestart = true;
+                } catch (TimeoutException e) {
+                    LOG.warn("couldn't restart within {} ms (startupTimeoutMillis)",
+                            pipesConfig.getStartupTimeoutMillis());
+                    Thread.sleep(pipesConfig.getSleepOnStartupTimeoutMillis());
+                }
+            }
         }
         return actuallyProcess(t);
     }
 
-    private PipesResult actuallyProcess(FetchEmitTuple t) {
+    private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedException {
         long start = System.currentTimeMillis();
         FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
 
@@ -140,7 +146,7 @@ public class PipesClient implements Closeable {
             return futureTask.get(pipesConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             process.destroyForcibly();
-            return PipesResult.INTERRUPTED_EXCEPTION;
+            throw e;
         } catch (ExecutionException e) {
             LOG.error("pipesClientId=" + pipesClientId + " execution exception", e);
             long elapsed = System.currentTimeMillis() - start;
@@ -150,38 +156,31 @@ public class PipesClient implements Closeable {
                         elapsed);
                 return PipesResult.TIMEOUT;
             }
-            try {
-                process.waitFor(500, TimeUnit.MILLISECONDS);
-                if (process.isAlive()) {
-                    LOG.warn("pipesClientId={} crash: {} in {} ms with no exit code available",
-                            pipesClientId, t.getId(), elapsed);
-                } else {
-                    LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}",
-                            pipesClientId, t.getId(),
-                            elapsed, process.exitValue());
-                }
-            } catch (InterruptedException interruptedException) {
-                //swallow
+            process.waitFor(500, TimeUnit.MILLISECONDS);
+            if (process.isAlive()) {
+                LOG.warn("pipesClientId={} crash: {} in {} ms with no exit code available",
+                        pipesClientId, t.getId(), elapsed);
+            } else {
+                LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}", pipesClientId,
+                        t.getId(), elapsed, process.exitValue());
             }
             return PipesResult.UNSPECIFIED_CRASH;
         } catch (TimeoutException e) {
             long elapsed = System.currentTimeMillis() - start;
             process.destroyForcibly();
-            LOG.warn("pipesClientId={} client timeout: {} in {} ms",
-                    pipesClientId, t.getId(), elapsed);
+            LOG.warn("pipesClientId={} client timeout: {} in {} ms", pipesClientId, t.getId(),
+                    elapsed);
             return PipesResult.TIMEOUT;
         } finally {
             futureTask.cancel(true);
         }
     }
 
-    private void destroyWithPause() {
+    private void destroyWithPause() throws InterruptedException {
         //wait just a little bit to let process end to get exit value
         //if there's a timeout on the server side
         try {
             process.waitFor(200, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException interruptedException) {
-            //swallow
         } finally {
             process.destroyForcibly();
         }
@@ -202,33 +201,39 @@ public class PipesClient implements Closeable {
                 LOG.warn("pipesClientId={} oom: {} in {} ms", pipesClientId, t.getId(), millis);
                 return PipesResult.OOM;
             case TIMEOUT:
-                LOG.warn("pipesClientId={} server response timeout: {} in {} ms", pipesClientId, t.getId(),
-                        millis);
+                LOG.warn("pipesClientId={} server response timeout: {} in {} ms", pipesClientId,
+                        t.getId(), millis);
                 return PipesResult.TIMEOUT;
             case EMIT_EXCEPTION:
-                LOG.warn("pipesClientId={} emit exception: {} in {} ms", pipesClientId, t.getId(), millis);
+                LOG.warn("pipesClientId={} emit exception: {} in {} ms", pipesClientId, t.getId(),
+                        millis);
                 return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
             case EMITTER_NOT_FOUND:
-                LOG.warn("pipesClientId={} emitter not found: {} in {} ms", pipesClientId, t.getId(), millis);
+                LOG.warn("pipesClientId={} emitter not found: {} in {} ms", pipesClientId,
+                        t.getId(), millis);
                 return readMessage(PipesResult.STATUS.NO_EMITTER_FOUND);
             case FETCHER_NOT_FOUND:
-                LOG.warn("pipesClientId={} fetcher not found: {} in {} ms", pipesClientId, t.getId(), millis);
+                LOG.warn("pipesClientId={} fetcher not found: {} in {} ms", pipesClientId,
+                        t.getId(), millis);
                 return readMessage(PipesResult.STATUS.NO_FETCHER_FOUND);
             case FETCHER_INITIALIZATION_EXCEPTION:
-                LOG.warn("pipesClientId={} fetcher initialization exception: {} in {} ms", pipesClientId,
-                        t.getId(), millis);
+                LOG.warn("pipesClientId={} fetcher initialization exception: {} in {} ms",
+                        pipesClientId, t.getId(), millis);
                 return readMessage(PipesResult.STATUS.FETCHER_INITIALIZATION_EXCEPTION);
             case FETCH_EXCEPTION:
-                LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(), millis);
+                LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(),
+                        millis);
                 return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
             case PARSE_SUCCESS:
                 //there may have been a parse exception, but the parse didn't crash
-                LOG.info("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(), millis);
+                LOG.info("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
+                        millis);
                 return deserializeEmitData();
             case PARSE_EXCEPTION_NO_EMIT:
                 return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
             case EMIT_SUCCESS:
-                LOG.info("pipesClientId={} emit success: {} in {} ms", pipesClientId, t.getId(), millis);
+                LOG.info("pipesClientId={} emit success: {} in {} ms", pipesClientId, t.getId(),
+                        millis);
                 return PipesResult.EMIT_SUCCESS;
             case EMIT_SUCCESS_PARSE_EXCEPTION:
                 return readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
@@ -240,7 +245,7 @@ public class PipesClient implements Closeable {
             case PING:
             case FAILED_TO_START:
                 throw new IOException("Not expecting this status: " + status);
-            default :
+            default:
                 throw new IOException("Need to handle procesing for: " + status);
         }
 
@@ -258,16 +263,15 @@ public class PipesClient implements Closeable {
         int length = input.readInt();
         byte[] bytes = new byte[length];
         input.readFully(bytes);
-        try (ObjectInputStream objectInputStream =
-                     new ObjectInputStream(new ByteArrayInputStream(bytes))) {
-            EmitData emitData = (EmitData)objectInputStream.readObject();
+        try (ObjectInputStream objectInputStream = new ObjectInputStream(
+                new ByteArrayInputStream(bytes))) {
+            EmitData emitData = (EmitData) objectInputStream.readObject();
 
             String stack = getStack(emitData);
             if (StringUtils.isBlank(stack)) {
                 return new PipesResult(emitData);
             } else {
-                return new PipesResult(
-                        emitData, stack);
+                return new PipesResult(emitData, stack);
             }
         } catch (ClassNotFoundException e) {
             LOG.error("class not found exception deserializing data", e);
@@ -278,14 +282,13 @@ public class PipesClient implements Closeable {
     }
 
     private String getStack(EmitData emitData) {
-        if (emitData.getMetadataList() == null ||
-                emitData.getMetadataList().size() < 1) {
+        if (emitData.getMetadataList() == null || emitData.getMetadataList().size() < 1) {
             return StringUtils.EMPTY;
         }
         return emitData.getMetadataList().get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
     }
 
-    private void restart() throws IOException {
+    private void restart() throws IOException, InterruptedException, TimeoutException {
         if (process != null) {
             process.destroyForcibly();
             LOG.info("restarting process");
@@ -298,26 +301,26 @@ public class PipesClient implements Closeable {
         input = new DataInputStream(process.getInputStream());
         output = new DataOutputStream(process.getOutputStream());
         //wait for ready signal
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
         FutureTask<Integer> futureTask = new FutureTask<>(() -> {
             int b = input.read();
             int read = 1;
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
             while (read < MAX_BYTES_BEFORE_READY && b != READY.getByte()) {
                 if (b == -1) {
-                    throw new RuntimeException("Couldn't start server: " +
-                            "read EOF before 'ready' byte.\n" +
-                            " Make absolutely certain that your logger is not writing to stdout.");
+                    throw new RuntimeException(
+                            "Couldn't start server: " + "read EOF before 'ready' byte.\n" +
+                                    " Make absolutely certain that your logger is not writing to stdout.");
                 }
                 bos.write(b);
                 b = input.read();
                 read++;
             }
             if (read >= MAX_BYTES_BEFORE_READY) {
-                throw new RuntimeException("Couldn't start server: read too many bytes before " +
-                        "'ready' byte.\n" +
-                        " Make absolutely certain that your logger is not writing to stdout.\n" +
-                        " Message read: " + new String(bos.toByteArray(),
-                        StandardCharsets.ISO_8859_1));
+                throw new RuntimeException(
+                        "Couldn't start server: read too many bytes before " + "'ready' byte.\n" +
+                                " Make absolutely certain that your logger is not writing to stdout.\n" +
+                                " Message read: " +
+                                new String(bos.toByteArray(), StandardCharsets.ISO_8859_1));
             }
             if (bos.size() > 0) {
                 LOG.warn("From forked process before start byte: {}",
@@ -330,15 +333,17 @@ public class PipesClient implements Closeable {
             futureTask.get(pipesConfig.getStartupTimeoutMillis(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             process.destroyForcibly();
-            return;
+            throw e;
         } catch (ExecutionException e) {
             LOG.error("couldn't start server", e);
             process.destroyForcibly();
             throw new RuntimeException(e);
         } catch (TimeoutException e) {
-            LOG.error("couldn't start server in time", e);
+            LOG.error("didn't receive ready byte from server within StartupTimeoutMillis {};" +
+                            " so far, I've read {}", pipesConfig.getStartupTimeoutMillis(),
+                    new String(bos.toByteArray(), StandardCharsets.ISO_8859_1), e);
             process.destroyForcibly();
-            throw new RuntimeException(e);
+            throw e;
         } finally {
             futureTask.cancel(true);
         }
@@ -380,26 +385,28 @@ public class PipesClient implements Closeable {
         List<String> commandLine = new ArrayList<>();
         String javaPath = pipesConfig.getJavaPath();
         commandLine.add(ProcessUtils.escapeCommandLine(javaPath));
-        if (! hasClassPath) {
+        if (!hasClassPath) {
             commandLine.add("-cp");
             commandLine.add(System.getProperty("java.class.path"));
         }
-        if (! hasHeadless) {
+        if (!hasHeadless) {
             commandLine.add("-Djava.awt.headless=true");
         }
         if (hasExitOnOOM) {
-            LOG.warn("I notice that you have an exit/crash on OOM. If you run heavy external processes " +
-                    "like tesseract, this setting may result in orphaned processes which could be disastrous" +
-                    " for performance.");
+            LOG.warn(
+                    "I notice that you have an exit/crash on OOM. If you run heavy external processes " +
+                            "like tesseract, this setting may result in orphaned processes which could be disastrous" +
+                            " for performance.");
         }
-        if (! hasLog4j) {
-            commandLine.add("-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
+        if (!hasLog4j) {
+            commandLine.add(
+                    "-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
         }
         commandLine.add("-DpipesClientId=" + pipesClientId);
         commandLine.addAll(configArgs);
         commandLine.add("org.apache.tika.pipes.PipesServer");
-        commandLine.add(
-                ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
+        commandLine.add(ProcessUtils.escapeCommandLine(
+                pipesConfig.getTikaConfig().toAbsolutePath().toString()));
 
         commandLine.add(Long.toString(pipesConfig.getMaxForEmitBatchBytes()));
         commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index a02654f..bf6a6bb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -48,6 +48,7 @@ public class PipesConfigBase extends ConfigBase {
     private long maxForEmitBatchBytes = DEFAULT_MAX_FOR_EMIT_BATCH;
     private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
     private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
+    private long sleepOnStartupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
 
     private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
     private int numClients = DEFAULT_NUM_CLIENTS;
@@ -99,6 +100,10 @@ public class PipesConfigBase extends ConfigBase {
         return ret;
     }
 
+    public void setStartupTimeoutMillis(long startupTimeoutMillis) {
+        this.startupTimeoutMillis = startupTimeoutMillis;
+    }
+
     public void setForkedJvmArgs(List<String> jvmArgs) {
         this.forkedJvmArgs = Collections.unmodifiableList(jvmArgs);
     }
@@ -158,4 +163,12 @@ public class PipesConfigBase extends ConfigBase {
     public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
         this.maxForEmitBatchBytes = maxForEmitBatchBytes;
     }
+
+    public long getSleepOnStartupTimeoutMillis() {
+        return sleepOnStartupTimeoutMillis;
+    }
+
+    public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
+        this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
index 59bf633..8446983 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
@@ -40,7 +40,9 @@ public class PipesParser implements Closeable {
             clients.add(client);
         }
     }
-    public PipesResult parse(FetchEmitTuple t) throws PipesException, IOException {
+
+    public PipesResult parse(FetchEmitTuple t) throws InterruptedException,
+            PipesException, IOException {
         PipesClient client = null;
         try {
             client = clientQueue.poll(pipesConfig.getMaxWaitForClientMillis(),
@@ -49,8 +51,6 @@ public class PipesParser implements Closeable {
                 return PipesResult.CLIENT_UNAVAILABLE_WITHIN_MS;
             }
             return client.process(t);
-        } catch (InterruptedException e) {
-            throw new PipesException(e);
         } finally {
             if (client != null) {
                 clientQueue.offer(client);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 6e03430..a14019b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -171,6 +171,7 @@ public class PipesServer implements Runnable {
         }
     }
 
+    @Override
     public void run() {
         try {
             while (true) {
@@ -188,7 +189,7 @@ public class PipesServer implements Runnable {
                 Thread.sleep(checkForTimeoutMs);
             }
         } catch (InterruptedException e) {
-            //swallow
+            LOG.debug("interrupted");
         }
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 84fdba4..4014c8f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -153,7 +153,7 @@ public class AsyncProcessor implements Closeable {
         }
     }
 
-    public synchronized boolean checkActive() {
+    public synchronized boolean checkActive() throws InterruptedException {
 
         Future<Integer> future = executorCompletionService.poll();
         if (future != null) {
@@ -174,8 +174,8 @@ public class AsyncProcessor implements Closeable {
                     default :
                         throw new IllegalArgumentException("Don't recognize this future code: " + i);
                 }
-            } catch (InterruptedException | ExecutionException e) {
-                e.printStackTrace();
+            } catch (ExecutionException e) {
+                LOG.error("execution exception", e);
                 throw new RuntimeException(e);
             }
         }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index d7589d8..cc7be06 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -93,7 +93,7 @@ public class PipesResource {
     }
 
     private Map<String, String> processTuple(FetchEmitTuple fetchEmitTuple)
-            throws PipesException, IOException {
+            throws InterruptedException, PipesException, IOException {
 
         PipesResult pipesResult = pipesParser.parse(fetchEmitTuple);
         switch (pipesResult.getStatus()) {