You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/10/26 14:27:11 UTC
[tika] branch main updated: TIKA-3584 -- allow retries for forked
process startup timeouts; and do not swallow InterruptedExceptions
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new c544010 TIKA-3584 -- allow retries for forked process startup timeouts; and do not swallow InterruptedExceptions
c544010 is described below
commit c5440102b7317314f757dfcbabc11905995c568b
Author: tballison <ta...@apache.org>
AuthorDate: Tue Oct 26 10:21:42 2021 -0400
TIKA-3584 -- allow retries for forked process startup timeouts; and do not swallow InterruptedExceptions
---
.../java/org/apache/tika/pipes/PipesClient.java | 153 +++++++++++----------
.../org/apache/tika/pipes/PipesConfigBase.java | 13 ++
.../java/org/apache/tika/pipes/PipesParser.java | 6 +-
.../java/org/apache/tika/pipes/PipesServer.java | 3 +-
.../apache/tika/pipes/async/AsyncProcessor.java | 6 +-
.../tika/server/core/resource/PipesResource.java | 2 +-
6 files changed, 102 insertions(+), 81 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 52999f8..79dcc4f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -54,27 +54,25 @@ public class PipesClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);
private static final int MAX_BYTES_BEFORE_READY = 20000;
private static AtomicInteger CLIENT_COUNTER = new AtomicInteger(0);
-
- private Process process;
private final PipesConfigBase pipesConfig;
- private DataOutputStream output;
- private DataInputStream input;
private final int pipesClientId;
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
+ private Process process;
+ private DataOutputStream output;
+ private DataInputStream input;
+ private int filesProcessed = 0;
public PipesClient(PipesConfigBase pipesConfig) {
this.pipesConfig = pipesConfig;
this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
}
- private int filesProcessed = 0;
-
public int getFilesProcessed() {
return filesProcessed;
}
private boolean ping() {
- if (process == null || ! process.isAlive()) {
+ if (process == null || !process.isAlive()) {
return false;
}
try {
@@ -98,19 +96,27 @@ public class PipesClient implements Closeable {
executorService.shutdownNow();
}
- public PipesResult process(FetchEmitTuple t) throws IOException {
- if (! ping()) {
- restart();
- }
- if (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
- filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess()) {
+ public PipesResult process(FetchEmitTuple t) throws IOException, InterruptedException {
+
+ if (!ping() || (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
+ filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess())) {
LOG.info("restarting server after hitting max files: " + filesProcessed);
- restart();
+ boolean successfulRestart = false;
+ while (!successfulRestart) {
+ try {
+ restart();
+ successfulRestart = true;
+ } catch (TimeoutException e) {
+ LOG.warn("couldn't restart within {} ms (startupTimeoutMillis)",
+ pipesConfig.getStartupTimeoutMillis());
+ Thread.sleep(pipesConfig.getSleepOnStartupTimeoutMillis());
+ }
+ }
}
return actuallyProcess(t);
}
- private PipesResult actuallyProcess(FetchEmitTuple t) {
+ private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedException {
long start = System.currentTimeMillis();
FutureTask<PipesResult> futureTask = new FutureTask<>(() -> {
@@ -140,7 +146,7 @@ public class PipesClient implements Closeable {
return futureTask.get(pipesConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
process.destroyForcibly();
- return PipesResult.INTERRUPTED_EXCEPTION;
+ throw e;
} catch (ExecutionException e) {
LOG.error("pipesClientId=" + pipesClientId + " execution exception", e);
long elapsed = System.currentTimeMillis() - start;
@@ -150,38 +156,31 @@ public class PipesClient implements Closeable {
elapsed);
return PipesResult.TIMEOUT;
}
- try {
- process.waitFor(500, TimeUnit.MILLISECONDS);
- if (process.isAlive()) {
- LOG.warn("pipesClientId={} crash: {} in {} ms with no exit code available",
- pipesClientId, t.getId(), elapsed);
- } else {
- LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}",
- pipesClientId, t.getId(),
- elapsed, process.exitValue());
- }
- } catch (InterruptedException interruptedException) {
- //swallow
+ process.waitFor(500, TimeUnit.MILLISECONDS);
+ if (process.isAlive()) {
+ LOG.warn("pipesClientId={} crash: {} in {} ms with no exit code available",
+ pipesClientId, t.getId(), elapsed);
+ } else {
+ LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}", pipesClientId,
+ t.getId(), elapsed, process.exitValue());
}
return PipesResult.UNSPECIFIED_CRASH;
} catch (TimeoutException e) {
long elapsed = System.currentTimeMillis() - start;
process.destroyForcibly();
- LOG.warn("pipesClientId={} client timeout: {} in {} ms",
- pipesClientId, t.getId(), elapsed);
+ LOG.warn("pipesClientId={} client timeout: {} in {} ms", pipesClientId, t.getId(),
+ elapsed);
return PipesResult.TIMEOUT;
} finally {
futureTask.cancel(true);
}
}
- private void destroyWithPause() {
+ private void destroyWithPause() throws InterruptedException {
//wait just a little bit to let process end to get exit value
//if there's a timeout on the server side
try {
process.waitFor(200, TimeUnit.MILLISECONDS);
- } catch (InterruptedException interruptedException) {
- //swallow
} finally {
process.destroyForcibly();
}
@@ -202,33 +201,39 @@ public class PipesClient implements Closeable {
LOG.warn("pipesClientId={} oom: {} in {} ms", pipesClientId, t.getId(), millis);
return PipesResult.OOM;
case TIMEOUT:
- LOG.warn("pipesClientId={} server response timeout: {} in {} ms", pipesClientId, t.getId(),
- millis);
+ LOG.warn("pipesClientId={} server response timeout: {} in {} ms", pipesClientId,
+ t.getId(), millis);
return PipesResult.TIMEOUT;
case EMIT_EXCEPTION:
- LOG.warn("pipesClientId={} emit exception: {} in {} ms", pipesClientId, t.getId(), millis);
+ LOG.warn("pipesClientId={} emit exception: {} in {} ms", pipesClientId, t.getId(),
+ millis);
return readMessage(PipesResult.STATUS.EMIT_EXCEPTION);
case EMITTER_NOT_FOUND:
- LOG.warn("pipesClientId={} emitter not found: {} in {} ms", pipesClientId, t.getId(), millis);
+ LOG.warn("pipesClientId={} emitter not found: {} in {} ms", pipesClientId,
+ t.getId(), millis);
return readMessage(PipesResult.STATUS.NO_EMITTER_FOUND);
case FETCHER_NOT_FOUND:
- LOG.warn("pipesClientId={} fetcher not found: {} in {} ms", pipesClientId, t.getId(), millis);
+ LOG.warn("pipesClientId={} fetcher not found: {} in {} ms", pipesClientId,
+ t.getId(), millis);
return readMessage(PipesResult.STATUS.NO_FETCHER_FOUND);
case FETCHER_INITIALIZATION_EXCEPTION:
- LOG.warn("pipesClientId={} fetcher initialization exception: {} in {} ms", pipesClientId,
- t.getId(), millis);
+ LOG.warn("pipesClientId={} fetcher initialization exception: {} in {} ms",
+ pipesClientId, t.getId(), millis);
return readMessage(PipesResult.STATUS.FETCHER_INITIALIZATION_EXCEPTION);
case FETCH_EXCEPTION:
- LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(), millis);
+ LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(),
+ millis);
return readMessage(PipesResult.STATUS.FETCH_EXCEPTION);
case PARSE_SUCCESS:
//there may have been a parse exception, but the parse didn't crash
- LOG.info("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(), millis);
+ LOG.info("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(),
+ millis);
return deserializeEmitData();
case PARSE_EXCEPTION_NO_EMIT:
return readMessage(PipesResult.STATUS.PARSE_EXCEPTION_NO_EMIT);
case EMIT_SUCCESS:
- LOG.info("pipesClientId={} emit success: {} in {} ms", pipesClientId, t.getId(), millis);
+ LOG.info("pipesClientId={} emit success: {} in {} ms", pipesClientId, t.getId(),
+ millis);
return PipesResult.EMIT_SUCCESS;
case EMIT_SUCCESS_PARSE_EXCEPTION:
return readMessage(PipesResult.STATUS.EMIT_SUCCESS_PARSE_EXCEPTION);
@@ -240,7 +245,7 @@ public class PipesClient implements Closeable {
case PING:
case FAILED_TO_START:
throw new IOException("Not expecting this status: " + status);
- default :
+ default:
throw new IOException("Need to handle procesing for: " + status);
}
@@ -258,16 +263,15 @@ public class PipesClient implements Closeable {
int length = input.readInt();
byte[] bytes = new byte[length];
input.readFully(bytes);
- try (ObjectInputStream objectInputStream =
- new ObjectInputStream(new ByteArrayInputStream(bytes))) {
- EmitData emitData = (EmitData)objectInputStream.readObject();
+ try (ObjectInputStream objectInputStream = new ObjectInputStream(
+ new ByteArrayInputStream(bytes))) {
+ EmitData emitData = (EmitData) objectInputStream.readObject();
String stack = getStack(emitData);
if (StringUtils.isBlank(stack)) {
return new PipesResult(emitData);
} else {
- return new PipesResult(
- emitData, stack);
+ return new PipesResult(emitData, stack);
}
} catch (ClassNotFoundException e) {
LOG.error("class not found exception deserializing data", e);
@@ -278,14 +282,13 @@ public class PipesClient implements Closeable {
}
private String getStack(EmitData emitData) {
- if (emitData.getMetadataList() == null ||
- emitData.getMetadataList().size() < 1) {
+ if (emitData.getMetadataList() == null || emitData.getMetadataList().size() < 1) {
return StringUtils.EMPTY;
}
return emitData.getMetadataList().get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
}
- private void restart() throws IOException {
+ private void restart() throws IOException, InterruptedException, TimeoutException {
if (process != null) {
process.destroyForcibly();
LOG.info("restarting process");
@@ -298,26 +301,26 @@ public class PipesClient implements Closeable {
input = new DataInputStream(process.getInputStream());
output = new DataOutputStream(process.getOutputStream());
//wait for ready signal
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
int b = input.read();
int read = 1;
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
while (read < MAX_BYTES_BEFORE_READY && b != READY.getByte()) {
if (b == -1) {
- throw new RuntimeException("Couldn't start server: " +
- "read EOF before 'ready' byte.\n" +
- " Make absolutely certain that your logger is not writing to stdout.");
+ throw new RuntimeException(
+ "Couldn't start server: " + "read EOF before 'ready' byte.\n" +
+ " Make absolutely certain that your logger is not writing to stdout.");
}
bos.write(b);
b = input.read();
read++;
}
if (read >= MAX_BYTES_BEFORE_READY) {
- throw new RuntimeException("Couldn't start server: read too many bytes before " +
- "'ready' byte.\n" +
- " Make absolutely certain that your logger is not writing to stdout.\n" +
- " Message read: " + new String(bos.toByteArray(),
- StandardCharsets.ISO_8859_1));
+ throw new RuntimeException(
+ "Couldn't start server: read too many bytes before " + "'ready' byte.\n" +
+ " Make absolutely certain that your logger is not writing to stdout.\n" +
+ " Message read: " +
+ new String(bos.toByteArray(), StandardCharsets.ISO_8859_1));
}
if (bos.size() > 0) {
LOG.warn("From forked process before start byte: {}",
@@ -330,15 +333,17 @@ public class PipesClient implements Closeable {
futureTask.get(pipesConfig.getStartupTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
process.destroyForcibly();
- return;
+ throw e;
} catch (ExecutionException e) {
LOG.error("couldn't start server", e);
process.destroyForcibly();
throw new RuntimeException(e);
} catch (TimeoutException e) {
- LOG.error("couldn't start server in time", e);
+ LOG.error("didn't receive ready byte from server within StartupTimeoutMillis {};" +
+ " so far, I've read {}", pipesConfig.getStartupTimeoutMillis(),
+ new String(bos.toByteArray(), StandardCharsets.ISO_8859_1), e);
process.destroyForcibly();
- throw new RuntimeException(e);
+ throw e;
} finally {
futureTask.cancel(true);
}
@@ -380,26 +385,28 @@ public class PipesClient implements Closeable {
List<String> commandLine = new ArrayList<>();
String javaPath = pipesConfig.getJavaPath();
commandLine.add(ProcessUtils.escapeCommandLine(javaPath));
- if (! hasClassPath) {
+ if (!hasClassPath) {
commandLine.add("-cp");
commandLine.add(System.getProperty("java.class.path"));
}
- if (! hasHeadless) {
+ if (!hasHeadless) {
commandLine.add("-Djava.awt.headless=true");
}
if (hasExitOnOOM) {
- LOG.warn("I notice that you have an exit/crash on OOM. If you run heavy external processes " +
- "like tesseract, this setting may result in orphaned processes which could be disastrous" +
- " for performance.");
+ LOG.warn(
+ "I notice that you have an exit/crash on OOM. If you run heavy external processes " +
+ "like tesseract, this setting may result in orphaned processes which could be disastrous" +
+ " for performance.");
}
- if (! hasLog4j) {
- commandLine.add("-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
+ if (!hasLog4j) {
+ commandLine.add(
+ "-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
}
commandLine.add("-DpipesClientId=" + pipesClientId);
commandLine.addAll(configArgs);
commandLine.add("org.apache.tika.pipes.PipesServer");
- commandLine.add(
- ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
+ commandLine.add(ProcessUtils.escapeCommandLine(
+ pipesConfig.getTikaConfig().toAbsolutePath().toString()));
commandLine.add(Long.toString(pipesConfig.getMaxForEmitBatchBytes()));
commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index a02654f..bf6a6bb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -48,6 +48,7 @@ public class PipesConfigBase extends ConfigBase {
private long maxForEmitBatchBytes = DEFAULT_MAX_FOR_EMIT_BATCH;
private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
+ private long sleepOnStartupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
private int numClients = DEFAULT_NUM_CLIENTS;
@@ -99,6 +100,10 @@ public class PipesConfigBase extends ConfigBase {
return ret;
}
+ public void setStartupTimeoutMillis(long startupTimeoutMillis) {
+ this.startupTimeoutMillis = startupTimeoutMillis;
+ }
+
public void setForkedJvmArgs(List<String> jvmArgs) {
this.forkedJvmArgs = Collections.unmodifiableList(jvmArgs);
}
@@ -158,4 +163,12 @@ public class PipesConfigBase extends ConfigBase {
public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
this.maxForEmitBatchBytes = maxForEmitBatchBytes;
}
+
+ public long getSleepOnStartupTimeoutMillis() {
+ return sleepOnStartupTimeoutMillis;
+ }
+
+ public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
+ this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
index 59bf633..8446983 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesParser.java
@@ -40,7 +40,9 @@ public class PipesParser implements Closeable {
clients.add(client);
}
}
- public PipesResult parse(FetchEmitTuple t) throws PipesException, IOException {
+
+ public PipesResult parse(FetchEmitTuple t) throws InterruptedException,
+ PipesException, IOException {
PipesClient client = null;
try {
client = clientQueue.poll(pipesConfig.getMaxWaitForClientMillis(),
@@ -49,8 +51,6 @@ public class PipesParser implements Closeable {
return PipesResult.CLIENT_UNAVAILABLE_WITHIN_MS;
}
return client.process(t);
- } catch (InterruptedException e) {
- throw new PipesException(e);
} finally {
if (client != null) {
clientQueue.offer(client);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 6e03430..a14019b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -171,6 +171,7 @@ public class PipesServer implements Runnable {
}
}
+ @Override
public void run() {
try {
while (true) {
@@ -188,7 +189,7 @@ public class PipesServer implements Runnable {
Thread.sleep(checkForTimeoutMs);
}
} catch (InterruptedException e) {
- //swallow
+ LOG.debug("interrupted");
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 84fdba4..4014c8f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -153,7 +153,7 @@ public class AsyncProcessor implements Closeable {
}
}
- public synchronized boolean checkActive() {
+ public synchronized boolean checkActive() throws InterruptedException {
Future<Integer> future = executorCompletionService.poll();
if (future != null) {
@@ -174,8 +174,8 @@ public class AsyncProcessor implements Closeable {
default :
throw new IllegalArgumentException("Don't recognize this future code: " + i);
}
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
+ } catch (ExecutionException e) {
+ LOG.error("execution exception", e);
throw new RuntimeException(e);
}
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
index d7589d8..cc7be06 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java
@@ -93,7 +93,7 @@ public class PipesResource {
}
private Map<String, String> processTuple(FetchEmitTuple fetchEmitTuple)
- throws PipesException, IOException {
+ throws InterruptedException, PipesException, IOException {
PipesResult pipesResult = pipesParser.parse(fetchEmitTuple);
switch (pipesResult.getStatus()) {