You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2018/11/13 19:08:48 UTC

[tika] 02/02: TIKA-2778 -- the shutdown method for tika-batch mode should not be typing anything on stdin of the parent process. Rather, require an interrupt and/or kill signal and then make sure the children are stopped as well.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 30c3d8104a51f015416382995435a4785059f07c
Author: TALLISON <ta...@apache.org>
AuthorDate: Tue Nov 13 14:08:26 2018 -0500

    TIKA-2778 -- the shutdown method for tika-batch mode should not be
    typing anything on stdin of the parent process.  Rather, require
    an interrupt and/or kill signal and then make sure the children are
    stopped as well.
---
 CHANGES.txt                                        |  4 ++
 .../java/org/apache/tika/batch/BatchProcess.java   | 21 ++++---
 .../apache/tika/batch/BatchProcessDriverCLI.java   |  2 +-
 .../java/org/apache/tika/batch/Interrupter.java    | 72 ++++++++++++++--------
 .../apache/tika/batch/InterrupterFutureResult.java |  3 +-
 .../tika/batch/builders/BatchProcessBuilder.java   |  6 +-
 .../tika/batch/builders/InterrupterBuilder.java    |  4 +-
 7 files changed, 71 insertions(+), 41 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 68f1c6c..7ea3ab8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,10 @@ Release 2.0.0 - ???
 
 Release 1.20 - ???
 
+   * tika-app in batch mode now requires an interrupt or
+     kill signal to the parent process to stop the parent
+     and the child processes (TIKA-2778).
+
    * Bulk upgrade of dependencies (TIKA-2775).
 
    * Improve language id efficiency in tika-eval (TIKA-2777).
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
index 82a9c52..2cd40ff 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java
@@ -21,7 +21,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
@@ -63,7 +62,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
         MAIN_LOOP_EXCEPTION,
         CRAWLER_TIMED_OUT,
         TIMED_OUT_CONSUMER,
-        USER_INTERRUPTION,
+        PARENT_SHUTDOWN,
         BATCH_PROCESS_ALIVE_TOO_LONG,
     }
 
@@ -163,6 +162,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
         } finally {
             shutdownConsumersManager();
         }
+        LOG.trace("finishing up");
         return result;
     }
 
@@ -194,6 +194,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
                 if (futureResult != null) {
                     state.removed++;
                     IFileProcessorFutureResult result = futureResult.get();
+                    LOG.trace("result: "+result);
                     if (result instanceof FileConsumerFutureResult) {
                         state.consumersRemoved++;
                     } else if (result instanceof FileResourceCrawlerFutureResult) {
@@ -203,14 +204,13 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
                             break;
                         }
                     } else if (result instanceof InterrupterFutureResult) {
-                        causeForTermination = CAUSE_FOR_TERMINATION.USER_INTERRUPTION;
+                        causeForTermination = CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN;
                         break;
                     } else if (result instanceof TimeoutFutureResult) {
                         causeForTermination = CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER;
                         break;
                     } //only thing left should be StatusReporterResult
                 }
-
                 if (state.consumersRemoved >= state.numConsumers) {
                     causeForTermination = CAUSE_FOR_TERMINATION.COMPLETED_NORMALLY;
                     break;
@@ -244,7 +244,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
         //TODO: figure out safe way to shutdown resource crawler
         //if it isn't.  Does it need to add poison at this point?
         //fileResourceCrawler.pleaseShutdown();
-
+        LOG.trace("about to shutdown");
         //Step 1: prevent uncalled threads from being started
         ex.shutdown();
 
@@ -255,8 +255,9 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
         }
         //The resourceCrawler should shutdown now.  No need for poison.
         fileResourceCrawler.shutDownNoPoison();
-        //if there are any active/asked to shutdown consumers, await termination
-        //this can happen if a user interrupts the process
+        //if there are any active/asked-to-shutdown consumers, wait
+        //a bit for those parsers to finish.
+        //This can happen if the parent process dies
         //of if the crawler stops early, or ...
         politelyAwaitTermination(state.causeForTermination);
 
@@ -287,6 +288,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
                 if (result instanceof FileConsumerFutureResult) {
                     FileConsumerFutureResult consumerResult = (FileConsumerFutureResult) result;
                     FileStarted fileStarted = consumerResult.getFileStarted();
+                    LOG.trace("file started "+fileStarted);
                     if (fileStarted != null
                             && fileStarted.getElapsedMillis() > timeoutThresholdMillis) {
                         LOG.warn("{} caused a file processor to hang or crash. You may need to remove "
@@ -305,7 +307,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
         }
         //do we need to restart?
         String restartMsg = null;
-        if (state.causeForTermination == CAUSE_FOR_TERMINATION.USER_INTERRUPTION
+        if (state.causeForTermination == CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN
                 || state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
             //do not restart!!!
         } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
@@ -325,7 +327,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
         } else if (! fileResourceCrawler.isQueueEmpty()) {
             restartMsg = "Resources still exist for processing";
         }
-
+        LOG.trace("restart msg: "+restartMsg);
         int exitStatus = getExitStatus(state.causeForTermination, restartMsg);
 
         //need to re-check, report, mark timed out consumers
@@ -342,6 +344,7 @@ public class BatchProcess implements Callable<ParallelFileProcessingResult> {
             processed += c.getNumResourcesConsumed();
             numExceptions += c.getNumHandledExceptions();
         }
+        LOG.trace("returning "+state.causeForTermination);
         return new
             ParallelFileProcessingResult(considered, added, processed, numExceptions,
                 elapsed, exitStatus, state.causeForTermination.toString());
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
index aa8ebf8..a1b19f6 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
@@ -230,7 +230,7 @@ public class BatchProcessDriverCLI {
     private void stop() {
         if (process != null) {
             LOG.trace("destroying a non-null process");
-            process.destroy();
+            process.destroyForcibly();
         }
 
         receivedRestartMsg = false;
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java b/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
index c1e31ba..eea1308 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
@@ -1,5 +1,3 @@
-package org.apache.tika.batch;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,44 +14,70 @@ package org.apache.tika.batch;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.tika.batch;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
+
+import java.io.InputStream;
 import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 
 /**
- * Class that waits for input on System.in.  If the user enters a keystroke on 
- * System.in, this will send a signal to the FileResourceRunner to shutdown gracefully.
- *
+ * Class that waits for input on System.in.  If this reads
+ * EOF or if there is an exception from the parent's IO,
+ * this will send a signal to shutdown the child process.
  * <p>
- * In the future, this may implement a common IInterrupter interface for more flexibility.
+ *     This will call System.exit(-1) if the process
+ *     doesn't stop after {@link #pauseOnEarlyTermination}
+ *     milliseconds.
+ * </p>
+ *
+ *
  */
 public class Interrupter implements Callable<IFileProcessorFutureResult> {
     private static final Logger LOG = LoggerFactory.getLogger(Interrupter.class);
 
-	public IFileProcessorFutureResult call(){
+    private static final long EXTRA_GRACE_PERIOD_MILLIS = 1000;
+    private final long pauseOnEarlyTermination;
+
+    public Interrupter(long pauseOnEarlyTermination) {
+        this.pauseOnEarlyTermination = pauseOnEarlyTermination;
+    }
+
+	public IFileProcessorFutureResult call() {
 		try{
-			BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, UTF_8));
-			while (true){
-				if (reader.ready()){
-					reader.readLine();
-					break;
-				} else {
-					Thread.sleep(1000);
-				}
+			InputStream is = System.in;
+			int byt = is.read();
+			while (byt > -1){
+				byt = is.read();
 			}
-		} catch (InterruptedException e){
-		    //canceller was interrupted
-		} catch (IOException e){
-            LOG.error("IOException from STDIN in CommandlineInterrupter.");
+		} catch (Throwable e) {
+            LOG.warn("Exception from STDIN in CommandlineInterrupter.", e);
 		}
+		new Thread(new Doomsday()).start();
 		return new InterrupterFutureResult();
 	}
+
+    private class Doomsday implements Runnable {
+        @Override
+        public void run() {
+            if (pauseOnEarlyTermination < 0) {
+                return;
+            }
+            long start = System.currentTimeMillis();
+            long elapsed = System.currentTimeMillis()-start;
+            while (elapsed < (pauseOnEarlyTermination+EXTRA_GRACE_PERIOD_MILLIS)) {
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    return;
+                }
+                elapsed = System.currentTimeMillis()-start;
+            }
+            LOG.error("Interrupter timed out; now calling System.exit.");
+            System.exit(-1);
+        }
+    }
 }
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java b/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
index c4d3704..ec49507 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
@@ -1,5 +1,3 @@
-package org.apache.tika.batch;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,6 +14,7 @@ package org.apache.tika.batch;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.tika.batch;
 
 public class InterrupterFutureResult implements IFileProcessorFutureResult {
 
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java b/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
index 0ebfd15..26b944e 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/builders/BatchProcessBuilder.java
@@ -133,7 +133,7 @@ public class BatchProcessBuilder {
 
         reporter = buildReporter(crawler, consumersManager, keyNodes.get("reporter"), runtimeAttributes);
 
-        interrupter = buildInterrupter(keyNodes.get("interrupter"), runtimeAttributes);
+        interrupter = buildInterrupter(keyNodes.get("interrupter"), pauseOnEarlyTerminationMillis, runtimeAttributes);
 
         BatchProcess proc = new BatchProcess(
                 crawler, consumersManager, reporter, interrupter);
@@ -153,7 +153,7 @@ public class BatchProcessBuilder {
         return proc;
     }
 
-    private Interrupter buildInterrupter(Node node, Map<String, String> runtimeAttributes) {
+    private Interrupter buildInterrupter(Node node, long pauseOnEarlyTermination, Map<String, String> runtimeAttributes) {
         Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
         String className = attrs.get("builderClass");
         if (className == null) {
@@ -161,7 +161,7 @@ public class BatchProcessBuilder {
         }
         InterrupterBuilder builder = ClassLoaderUtil.buildClass(InterrupterBuilder.class, className);
 
-        return builder.build(node, runtimeAttributes);
+        return builder.build(node, pauseOnEarlyTermination, runtimeAttributes);
 
     }
 
diff --git a/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java b/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
index d7223cd..d02b48c 100644
--- a/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
+++ b/tika-batch/src/main/java/org/apache/tika/batch/builders/InterrupterBuilder.java
@@ -26,7 +26,7 @@ import org.w3c.dom.Node;
  */
 public class InterrupterBuilder {
 
-    public Interrupter build(Node n, Map<String, String> commandlineArguments) {
-        return new Interrupter();
+    public Interrupter build(Node n, long pauseOnEarlyTermination, Map<String, String> commandlineArguments) {
+        return new Interrupter(pauseOnEarlyTermination);
     }
 }