You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/11/13 19:08:48 UTC
[tika] 02/02: TIKA-2778 -- the shutdown method for tika-batch mode
should not be typing anything on stdin of the parent process. Rather,
require an interrupt and/or kill signal and then make sure the children are
stopped as well.
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 30c3d8104a51f015416382995435a4785059f07c
Author: TALLISON <ta...@apache.org>
AuthorDate: Tue Nov 13 14:08:26 2018 -0500
TIKA-2778 -- the shutdown method for tika-batch mode should not be
typing anything on stdin of the parent process. Rather, require
an interrupt and/or kill signal and then make sure the children are
stopped as well.
---
CHANGES.txt | 4 ++
.../java/org/apache/tika/batch/BatchProcess.java | 21 ++++---
.../apache/tika/batch/BatchProcessDriverCLI.java | 2 +-
.../java/org/apache/tika/batch/Interrupter.java | 72 ++++++++++++++--------
.../apache/tika/batch/InterrupterFutureResult.java | 3 +-
.../tika/batch/builders/BatchProcessBuilder.java | 6 +-
.../tika/batch/builders/InterrupterBuilder.java | 4 +-
7 files changed, 71 insertions(+), 41 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 68f1c6c..7ea3ab8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,10 @@ Release 2.0.0 - ???
Release 1.20 - ???
+ * tika-app in batch mode now requires an interrupt or
+ kill signal to the parent process to stop the parent
+ and the child processes (TIKA-2778).
+
* Bulk upgrade of dependencies (TIKA-2775).
* Improve language id efficiency in tika-eval (TIKA-2777).
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
index 82a9c52..2cd40ff 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
@@ -21,7 +21,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
@@ -63,7 +62,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
MAIN_LOOP_EXCEPTION,
CRAWLER_TIMED_OUT,
TIMED_OUT_CONSUMER,
- USER_INTERRUPTION,
+ PARENT_SHUTDOWN,
BATCH_PROCESS_ALIVE_TOO_LONG,
}
@@ -163,6 +162,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
} finally {
shutdownConsumersManager();
}
+ LOG.trace("finishing up");
return result;
}
@@ -194,6 +194,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
if (futureResult != null) {
state.removed++;
IFileProcessorFutureResult result = futureResult.get();
+ LOG.trace("result: "+result);
if (result instanceof FileConsumerFutureResult) {
state.consumersRemoved++;
} else if (result instanceof FileResourceCrawlerFutureResult) {
@@ -203,14 +204,13 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
break;
}
} else if (result instanceof InterrupterFutureResult) {
- causeForTermination = CAUSE_FOR_TERMINATION.USER_INTERRUPTION;
+ causeForTermination = CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN;
break;
} else if (result instanceof TimeoutFutureResult) {
causeForTermination = CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER;
break;
} //only thing left should be StatusReporterResult
}
-
if (state.consumersRemoved >= state.numConsumers) {
causeForTermination = CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY;
break;
@@ -244,7 +244,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
//TODO: figure out safe way to shutdown resource crawler
//if it isn't. Does it need to add poison at this point?
//fileResourceCrawler.pleaseShutdown();
-
+ LOG.trace("about to shutdown");
//Step 1: prevent uncalled threads from being started
ex.shutdown();
@@ -255,8 +255,9 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
}
//The resourceCrawler should shutdown now. No need for poison.
fileResourceCrawler.shutDownNoPoison();
- //if there are any active/asked to shutdown consumers, await termination
- //this can happen if a user interrupts the process
+ //if there are any active/asked-to-shutdown consumers, wait
+ //a bit for those parsers to finish.
+ //This can happen if the parent process dies
//of if the crawler stops early, or ...
politelyAwaitTermination(state.causeForTermination);
@@ -287,6 +288,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
if (result instanceof FileConsumerFutureResult) {
FileConsumerFutureResult consumerResult = (FileConsumerFutureResult) result;
FileStarted fileStarted = consumerResult.getFileStarted();
+ LOG.trace("file started "+fileStarted);
if (fileStarted != null
&& fileStarted.getElapsedMillis() > timeoutThresholdMillis) {
LOG.warn("{} caused a file processor to hang or crash. You may need to remove "
@@ -305,7 +307,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
}
//do we need to restart?
String restartMsg = null;
- if (state.causeForTermination == CAUSE_FOR_TERMINATION.USER_INTERRUPTION
+ if (state.causeForTermination == CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN
|| state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
//do not restart!!!
} else if (state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
@@ -325,7 +327,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
} else if (! fileResourceCrawler.isQueueEmpty()) {
restartMsg = "Resources still exist for processing";
}
-
+ LOG.trace("restart msg: "+restartMsg);
int exitStatus = getExitStatus(state.causeForTermination, restartMsg);
//need to re-check, report, mark timed out consumers
@@ -342,6 +344,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
processed += c.getNumResourcesConsumed();
numExceptions += c.getNumHandledExceptions();
}
+ LOG.trace("returning "+state.causeForTermination);
return new
ParallelFileProcessingResult(considered, added, processed, numExceptions,
elapsed, exitStatus, state.causeForTermination.toString());
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
index aa8ebf8..a1b19f6 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
@@ -230,7 +230,7 @@ public class BatchProcessDriverCLI {
private void stop() {
if (process != null) {
LOG.trace("destroying a non-null process");
- process.destroy();
+ process.destroyForcibly();
}
receivedRestartMsg = false;
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java b/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
index c1e31ba..eea1308 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
@@ -1,5 +1,3 @@
-package org.apache.tika.batch;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,44 +14,70 @@ package org.apache.tika.batch;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.tika.batch;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
+
+import java.io.InputStream;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
- * Class that waits for input on System.in. If the user enters a keystroke on
- * System.in, this will send a signal to the FileResourceRunner to shutdown gracefully.
- *
+ * Class that waits for input on System.in. If this reads
+ * EOF or if there is an exception from the parent's IO,
+ * this will send a signal to shutdown the child process.
* <p>
- * In the future, this may implement a common IInterrupter interface for more flexibility.
+ * This will call System.exit(-1) if the process
+ * doesn't stop after {@link #pauseOnEarlyTermination}
+ * milliseconds.
+ * </p>
+ *
+ *
*/
public class Interrupter implements Callable<IFileProcessorFutureResult> {
private static final Logger LOG = LoggerFactory.getLogger(Interrupter.class);
- public IFileProcessorFutureResult call(){
+ private static final long EXTRA_GRACE_PERIOD_MILLIS = 1000;
+ private final long pauseOnEarlyTermination;
+
+ public Interrupter(long pauseOnEarlyTermination) {
+ this.pauseOnEarlyTermination = pauseOnEarlyTermination;
+ }
+
+ public IFileProcessorFutureResult call() {
try{
- BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, UTF_8));
- while (true){
- if (reader.ready()){
- reader.readLine();
- break;
- } else {
- Thread.sleep(1000);
- }
+ InputStream is = System.in;
+ int byt = is.read();
+ while (byt > -1){
+ byt = is.read();
}
- } catch (InterruptedException e){
- //canceller was interrupted
- } catch (IOException e){
- LOG.error("IOException from STDIN in CommandlineInterrupter.");
+ } catch (Throwable e) {
+ LOG.warn("Exception from STDIN in CommandlineInterrupter.", e);
}
+ new Thread(new Doomsday()).start();
return new InterrupterFutureResult();
}
+
+ private class Doomsday implements Runnable {
+ @Override
+ public void run() {
+ if (pauseOnEarlyTermination < 0) {
+ return;
+ }
+ long start = System.currentTimeMillis();
+ long elapsed = System.currentTimeMillis()-start;
+ while (elapsed < (pauseOnEarlyTermination+EXTRA_GRACE_PERIOD_MILLIS)) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ return;
+ }
+ elapsed = System.currentTimeMillis()-start;
+ }
+ LOG.error("Interrupter timed out; now calling System.exit.");
+ System.exit(-1);
+ }
+ }
}
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java b/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
index c4d3704..ec49507 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
@@ -1,5 +1,3 @@
-package org.apache.tika.batch;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,6 +14,7 @@ package org.apache.tika.batch;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.tika.batch;
public class InterrupterFutureResult implements IFileProcessorFutureResult {
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java b/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
index 0ebfd15..26b944e 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
@@ -133,7 +133,7 @@ public class BatchProcessBuilder {
reporter = buildReporter(crawler, consumersManager, keyNodes.get("reporter"), runtimeAttributes);
- interrupter = buildInterrupter(keyNodes.get("interrupter"), runtimeAttributes);
+ interrupter = buildInterrupter(keyNodes.get("interrupter"), pauseOnEarlyTerminationMillis, runtimeAttributes);
BatchProcess proc = new BatchProcess(
crawler, consumersManager, reporter, interrupter);
@@ -153,7 +153,7 @@ public class BatchProcessBuilder {
return proc;
}
- private Interrupter buildInterrupter(Node node, Map<String, String> runtimeAttributes) {
+ private Interrupter buildInterrupter(Node node, long pauseOnEarlyTermination, Map<String, String> runtimeAttributes) {
Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
String className = attrs.get("builderClass");
if (className == null) {
@@ -161,7 +161,7 @@ public class BatchProcessBuilder {
}
InterrupterBuilder builder = ClassLoaderUtil.buildClass(InterrupterBuilder.class, className);
- return builder.build(node, runtimeAttributes);
+ return builder.build(node, pauseOnEarlyTermination, runtimeAttributes);
}
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java b/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
index d7223cd..d02b48c 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
@@ -26,7 +26,7 @@ import org.w3c.dom.Node;
*/
public class InterrupterBuilder {
- public Interrupter build(Node n, Map<String, String> commandlineArguments) {
- return new Interrupter();
+ public Interrupter build(Node n, long pauseOnEarlyTermination, Map<String, String> commandlineArguments) {
+ return new Interrupter(pauseOnEarlyTermination);
}
}