You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2015/03/23 17:09:12 UTC
svn commit: r1668673 [2/6] - in /tika/trunk: ./ tika-app/
tika-app/src/main/java/org/apache/tika/cli/ tika-app/src/main/resources/
tika-app/src/test/java/org/apache/tika/cli/ tika-batch/ tika-batch/src/
tika-batch/src/main/ tika-batch/src/main/examples...
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,392 @@
+package org.apache.tika.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.io.IOUtils;
+
+public class BatchProcessDriverCLI {
+
+ /**
+ * This relies on an special exit values of 254 (do not restart),
+ * 0 ended correctly, 253 ended with exception (do restart)
+ */
+ public static final int PROCESS_RESTART_EXIT_CODE = 253;
+ //make sure this is above 255 to avoid stopping on system errors
+ //that is, if there is a system error (e.g. 143), you
+ //should restart the process.
+ public static final int PROCESS_NO_RESTART_EXIT_CODE = 254;
+ public static final int PROCESS_COMPLETED_SUCCESSFULLY = 0;
+ private static Logger logger = Logger.getLogger(BatchProcessDriverCLI.class);
+
+ private int maxProcessRestarts = -1;
+ private long pulseMillis = 1000;
+
+ //how many times to wait pulseMillis milliseconds if a restart
+ //message has been received through stdout, but the
+ //child process has not yet exited
+ private int waitNumLoopsAfterRestartmessage = 60;
+
+
+ private volatile boolean userInterrupted = false;
+ private boolean receivedRestartMsg = false;
+ private Process process = null;
+
+ private StreamGobbler errorWatcher = null;
+ private StreamGobbler outGobbler = null;
+ private InterruptWriter interruptWriter = null;
+ private final InterruptWatcher interruptWatcher =
+ new InterruptWatcher(System.in);
+
+ private Thread errorWatcherThread = null;
+ private Thread outGobblerThread = null;
+ private Thread interruptWriterThread = null;
+ private final Thread interruptWatcherThread = new Thread(interruptWatcher);
+
+ private final String[] commandLine;
+ private int numRestarts = 0;
+ private boolean redirectChildProcessToStdOut = true;
+
+ public BatchProcessDriverCLI(String[] commandLine){
+ this.commandLine = tryToReadMaxRestarts(commandLine);
+ }
+
+ private String[] tryToReadMaxRestarts(String[] commandLine) {
+ List<String> args = new ArrayList<String>();
+ for (int i = 0; i < commandLine.length; i++) {
+ String arg = commandLine[i];
+ if (arg.equals("-maxRestarts")) {
+ if (i == commandLine.length-1) {
+ throw new IllegalArgumentException("Must specify an integer after \"-maxRestarts\"");
+ }
+ String restartNumString = commandLine[i+1];
+ try {
+ maxProcessRestarts = Integer.parseInt(restartNumString);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Must specify an integer after \"-maxRestarts\" arg.");
+ }
+ i++;
+ } else {
+ args.add(arg);
+ }
+ }
+ return args.toArray(new String[args.size()]);
+ }
+
+ public void execute() throws Exception {
+
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ logger.trace("about to start");
+ start();
+ int loopsAfterRestartMessageReceived = 0;
+ while (!userInterrupted) {
+ Integer exit = null;
+ try {
+ logger.trace("about to check exit value");
+ exit = process.exitValue();
+ logger.trace("exit value:" + exit);
+ stop();
+ } catch (IllegalThreadStateException e) {
+ //hasn't exited
+ logger.trace("process has not exited; IllegalThreadStateException");
+ }
+
+ logger.trace("Before sleep:" +
+ " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+
+ //Even if the process has exited,
+ //wait just a little bit to make sure that
+ //mustRestart hasn't been set to true
+ try {
+ Thread.sleep(pulseMillis);
+ } catch (InterruptedException e) {
+ logger.trace("interrupted exception during sleep");
+ }
+ logger.trace("After sleep:" +
+ " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+ //if we've gotten the message via stdout to restart
+ //but the process hasn't exited yet, give it another
+ //chance
+ if (receivedRestartMsg && exit == null) {
+ loopsAfterRestartMessageReceived++;
+ logger.trace("Must restart, still not exited; loops after restart: " +
+ loopsAfterRestartMessageReceived);
+ continue;
+ }
+ if (loopsAfterRestartMessageReceived > waitNumLoopsAfterRestartmessage) {
+ logger.trace("About to try to restart because:" +
+ " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+ logger.warn("Restarting after exceeded wait loops waiting for exit: "+
+ loopsAfterRestartMessageReceived);
+ boolean restarted = restart(exit, receivedRestartMsg);
+ if (!restarted) {
+ break;
+ }
+ } else if (exit != null && exit != BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE
+ && exit != BatchProcessDriverCLI.PROCESS_COMPLETED_SUCCESSFULLY) {
+ logger.trace("About to try to restart because:" +
+ " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+
+ if (exit != null && exit == BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE) {
+ logger.info("Restarting on expected restart code");
+ } else {
+ logger.warn("Restarting on unexpected restart code: "+exit);
+ }
+ boolean restarted = restart(exit, receivedRestartMsg);
+ if (!restarted) {
+ break;
+ }
+ } else if (exit != null && (exit == PROCESS_COMPLETED_SUCCESSFULLY
+ || exit == BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE)) {
+ logger.trace("Will not restart: "+exit);
+ break;
+ }
+ }
+ logger.trace("about to call shutdown driver now");
+ shutdownDriverNow();
+ }
+
+ private void shutdownDriverNow() {
+ if (process != null) {
+ for (int i = 0; i < 10; i++) {
+
+ logger.trace("trying to shut down: "+i);
+ try {
+ int exit = process.exitValue();
+ logger.trace("trying to stop:"+exit);
+ stop();
+ interruptWatcherThread.interrupt();
+ return;
+ } catch (IllegalThreadStateException e) {
+ //hasn't exited
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ //swallow
+ }
+ }
+ logger.error("Process didn't stop after 10 seconds after shutdown. " +
+ "I am forcefully killing it.");
+ }
+ interruptWatcherThread.interrupt();
+ }
+
+ public int getNumRestarts() {
+ return numRestarts;
+ }
+
+ public boolean getUserInterrupted() {
+ return userInterrupted;
+ }
+
+ /**
+ * Tries to restart (stop and then start) the child process
+ * @return whether or not this was successful, will be false if numRestarts >= maxProcessRestarts
+ * @throws Exception
+ */
+ private boolean restart(Integer exitValue, boolean receivedRestartMsg) throws Exception {
+ if (maxProcessRestarts > -1 && numRestarts >= maxProcessRestarts) {
+ logger.warn("Hit the maximum number of process restarts. Driver is shutting down now.");
+ stop();
+ return false;
+ }
+ logger.warn("Must restart process (exitValue="+exitValue+" numRestarts="+numRestarts+
+ " receivedRestartMessage="+receivedRestartMsg+")");
+ stop();
+ start();
+ numRestarts++;
+ return true;
+ }
+
+ private void stop() {
+ if (process != null) {
+ logger.trace("destroying a non-null process");
+ process.destroy();
+ }
+
+ receivedRestartMsg = false;
+ //interrupt the writer thread first
+ interruptWriterThread.interrupt();
+
+ errorWatcher.stopGobblingAndDie();
+ outGobbler.stopGobblingAndDie();
+ errorWatcherThread.interrupt();
+ outGobblerThread.interrupt();
+ }
+
+ private void start() throws Exception {
+ ProcessBuilder builder = new ProcessBuilder(commandLine);
+ builder.directory(new File("."));
+ process = builder.start();
+
+ errorWatcher = new StreamWatcher(process.getErrorStream());
+ errorWatcherThread = new Thread(errorWatcher);
+ errorWatcherThread.start();
+
+ outGobbler = new StreamGobbler(process.getInputStream());
+ outGobblerThread = new Thread(outGobbler);
+ outGobblerThread.start();
+
+ interruptWriter = new InterruptWriter(process.getOutputStream());
+ interruptWriterThread = new Thread(interruptWriter);
+ interruptWriterThread.start();
+
+ }
+
+ public void setRedirectChildProcessToStdOut(boolean redirectChildProcessToStdOut) {
+ this.redirectChildProcessToStdOut = redirectChildProcessToStdOut;
+ }
+
+ /**
+ * Class to watch stdin from the driver for anything that is typed.
+ * This will currently cause an interrupt if anything followed by
+ * a return key is entered. We may want to add an "Are you sure?" dialogue.
+ */
+ private class InterruptWatcher implements Runnable {
+ private BufferedReader reader;
+
+ private InterruptWatcher(InputStream is) {
+ reader = new BufferedReader(new InputStreamReader(is, IOUtils.UTF_8));
+ }
+
+ @Override
+ public void run() {
+ try {
+ //this will block.
+ //as soon as it reads anything,
+ //set userInterrupted to true and stop
+ reader.readLine();
+ userInterrupted = true;
+ } catch (IOException e) {
+ //swallow
+ }
+ }
+ }
+
+ /**
+ * Class that writes to the child process
+ * to force an interrupt in the child process.
+ */
+ private class InterruptWriter implements Runnable {
+ private final Writer writer;
+
+ private InterruptWriter(OutputStream os) {
+ this.writer = new OutputStreamWriter(os, IOUtils.UTF_8);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ Thread.sleep(500);
+ if (userInterrupted) {
+ writer.write(String.format(Locale.ENGLISH, "Ave atque vale!%n"));
+ writer.flush();
+ }
+ }
+ } catch (IOException e) {
+ //swallow
+ } catch (InterruptedException e) {
+ //job is done, ok
+ }
+ }
+ }
+
+ private class StreamGobbler implements Runnable {
+ //plagiarized from org.apache.oodt's StreamGobbler
+ protected final BufferedReader reader;
+ protected boolean running = true;
+
+ private StreamGobbler(InputStream is) {
+ this.reader = new BufferedReader(new InputStreamReader(new BufferedInputStream(is),
+ IOUtils.UTF_8));
+ }
+
+ @Override
+ public void run() {
+ String line = null;
+ try {
+ logger.trace("gobbler starting to read");
+ while ((line = reader.readLine()) != null && this.running) {
+ if (redirectChildProcessToStdOut) {
+ System.out.println("BatchProcess:"+line);
+ }
+ }
+ } catch (IOException e) {
+ logger.trace("gobbler io exception");
+ //swallow ioe
+ }
+ logger.trace("gobbler done");
+ }
+
+ private void stopGobblingAndDie() {
+ logger.trace("stop gobbling");
+ running = false;
+ IOUtils.closeQuietly(reader);
+ }
+ }
+
+ private class StreamWatcher extends StreamGobbler implements Runnable {
+ //plagiarized from org.apache.oodt's StreamGobbler
+
+ private StreamWatcher(InputStream is){
+ super(is);
+ }
+
+ @Override
+ public void run() {
+ String line = null;
+ try {
+ logger.trace("watcher starting to read");
+ while ((line = reader.readLine()) != null && this.running) {
+ if (line.startsWith(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString())) {
+ receivedRestartMsg = true;
+ }
+ logger.info("BatchProcess: "+line);
+ }
+ } catch (IOException e) {
+ logger.trace("watcher io exception");
+ //swallow ioe
+ }
+ logger.trace("watcher done");
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ BatchProcessDriverCLI runner = new BatchProcessDriverCLI(args);
+ runner.execute();
+ System.out.println("FSBatchProcessDriver has gracefully completed");
+ System.exit(0);
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,80 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Simple interface around a collection of consumers that allows
+ * for initializing and shutting shared resources (e.g. db connection, index, writer, etc.)
+ */
+public abstract class ConsumersManager {
+
+ //maximum time to allow the ConsumersManager for either init()
+ //or shutdown()
+ private long consumersManagerMaxMillis = 60000;
+ private final List<FileResourceConsumer> consumers;
+
+ public ConsumersManager(List<FileResourceConsumer> consumers) {
+ this.consumers = Collections.unmodifiableList(consumers);
+ }
+ /**
+ * Get the consumers
+ * @return consumers
+ */
+ public List<FileResourceConsumer> getConsumers() {
+ return consumers;
+ }
+
+ /**
+ * This is called by BatchProcess before submitting the threads
+ */
+ public void init(){
+
+ }
+
+ /**
+ * This is called by BatchProcess immediately before closing.
+ * Beware! Some of the consumers may have hung or may not
+ * have completed.
+ */
+ public void shutdown(){
+
+ }
+
+ /**
+ * {@link org.apache.tika.batch.BatchProcess} will throw an exception
+ * if the ConsumersManager doesn't complete init() or shutdown()
+ * within this amount of time.
+ * @return the maximum time allowed for init() or shutdown()
+ */
+ public long getConsumersManagerMaxMillis() {
+ return consumersManagerMaxMillis;
+ }
+
+ /**
+ * {@see #getConsumersManagerMaxMillis()}
+ *
+ * @param consumersManagerMaxMillis maximum number of milliseconds
+ * to allow for init() or shutdown()
+ */
+ public void setConsumersManagerMaxMillis(long consumersManagerMaxMillis) {
+ this.consumersManagerMaxMillis = consumersManagerMaxMillis;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,37 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class FileConsumerFutureResult implements IFileProcessorFutureResult {
+
+ private final FileStarted fileStarted;
+ private final int filesProcessed;
+
+ public FileConsumerFutureResult(FileStarted fs, int filesProcessed) {
+ this.fileStarted = fs;
+ this.filesProcessed = filesProcessed;
+ }
+
+ public FileStarted getFileStarted() {
+ return fileStarted;
+ }
+
+ public int getFilesProcessed() {
+ return filesProcessed;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,68 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.Property;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * This is a basic interface to handle a logical "file".
+ * This should enable code-agnostic handling of files from different
+ * sources: file system, database, etc.
+ *
+ */
+public interface FileResource {
+
+ //The literal lowercased extension of a file. This may or may not
+ //have any relationship to the actual type of the file.
+ public static final Property FILE_EXTENSION = Property.internalText("tika:file_ext");
+
+ /**
+ * This is only used in logging to identify which file
+ * may have caused problems. While it is probably best
+ * to use unique ids for the sake of debugging, it is not
+ * necessary that the ids be unique. This id
+ * is never used as a hashkey by the batch processors, for example.
+ *
+ * @return an id for a FileResource
+ */
+ public String getResourceId();
+
+ /**
+ * This gets the metadata available before the parsing of the file.
+ * This will typically be "external" metadata: file name,
+ * file size, file location, data stream, etc. That is, things
+ * that are known about the file from outside information, not
+ * file-internal metadata.
+ *
+ * @return Metadata
+ */
+ public Metadata getMetadata();
+
+ /**
+ *
+ * @return an InputStream for the FileResource
+ * @throws java.io.IOException
+ */
+ public InputStream openInputStream() throws IOException;
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,380 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This is a base class for file consumers. The
+ * goal of this class is to abstract out the multithreading
+ * and recordkeeping components.
+ * <p/>
+ */
+public abstract class FileResourceConsumer implements Callable<IFileProcessorFutureResult> {
+
+ private static enum STATE {
+ NOT_YET_STARTED,
+ ACTIVELY_CONSUMING,
+ SWALLOWED_POISON,
+ THREAD_INTERRUPTED,
+ EXCEEDED_MAX_CONSEC_WAIT_MILLIS,
+ ASKED_TO_SHUTDOWN,
+ TIMED_OUT,
+ CONSUMER_EXCEPTION,
+ CONSUMER_ERROR,
+ COMPLETED
+ }
+
+ public static String TIME_OUT = "timeout";
+ public static String ELAPSED_MILLIS = "elapsedMS";
+
+ private static AtomicInteger numConsumers = new AtomicInteger(-1);
+ protected static Logger logger = Logger.getLogger(FileResourceConsumer.class);
+
+ private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes
+
+ private final ArrayBlockingQueue<FileResource> fileQueue;
+
+ private final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
+ private final int consumerId;
+
+ //used to lock checks on state to prevent
+ private final Object lock = new Object();
+
+ //this records the file that is currently
+ //being processed. It is null if no file is currently being processed.
+ //no need for volatile because of lock for checkForStales
+ private FileStarted currentFile = null;
+
+ //total number of files consumed; volatile so that reporter
+ //sees the latest
+ private volatile int numResourcesConsumed = 0;
+
+ //total number of exceptions that were handled by subclasses;
+ //volatile so that reporter sees the latest
+ private volatile int numHandledExceptions = 0;
+
+ //after this has been set to ACTIVELY_CONSUMING,
+ //this should only be set by setEndedState.
+ private volatile STATE currentState = STATE.NOT_YET_STARTED;
+
+ public FileResourceConsumer(ArrayBlockingQueue<FileResource> fileQueue) {
+ this.fileQueue = fileQueue;
+ consumerId = numConsumers.incrementAndGet();
+ }
+
+ public IFileProcessorFutureResult call() {
+ currentState = STATE.ACTIVELY_CONSUMING;
+
+ try {
+ FileResource fileResource = getNextFileResource();
+ while (fileResource != null) {
+ logger.debug("file consumer is about to process: " + fileResource.getResourceId());
+ boolean consumed = _processFileResource(fileResource);
+ logger.debug("file consumer has finished processing: " + fileResource.getResourceId());
+
+ if (consumed) {
+ numResourcesConsumed++;
+ }
+ fileResource = getNextFileResource();
+ }
+ } catch (InterruptedException e) {
+ setEndedState(STATE.THREAD_INTERRUPTED);
+ }
+
+ setEndedState(STATE.COMPLETED);
+ return new FileConsumerFutureResult(currentFile, numResourcesConsumed);
+ }
+
+
+ /**
+ * Main piece of code that needs to be implemented. Clients
+ * are responsible for closing streams and handling the exceptions
+ * that they'd like to handle.
+ * <p/>
+ * Unchecked throwables can be thrown past this, of course. When an unchecked
+ * throwable is thrown, this logs the error, and then rethrows the exception.
+ * Clients/subclasses should make sure to catch and handle everything they can.
+ * <p/>
+ * The design goal is that the whole process should close up and shutdown soon after
+ * an unchecked exception or error is thrown.
+ * <p/>
+ * Make sure to call {@link #incrementHandledExceptions()} appropriately in
+ * your implementation of this method.
+ * <p/>
+ *
+ * @param fileResource resource to process
+ * @return whether or not a file was successfully processed
+ */
+ public abstract boolean processFileResource(FileResource fileResource);
+
+
+ /**
+ * Make sure to call this appropriately!
+ */
+ protected void incrementHandledExceptions() {
+ numHandledExceptions++;
+ }
+
+
+ /**
+ * Returns whether or not the consumer is still could process
+ * a file or is still processing a file (ACTIVELY_CONSUMING or ASKED_TO_SHUTDOWN)
+ * @return whether this consumer is still active
+ */
+ public boolean isStillActive() {
+ if (Thread.currentThread().isInterrupted()) {
+ return false;
+ } else if( currentState == STATE.NOT_YET_STARTED ||
+ currentState == STATE.ACTIVELY_CONSUMING ||
+ currentState == STATE.ASKED_TO_SHUTDOWN) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean _processFileResource(FileResource fileResource) {
+ currentFile = new FileStarted(fileResource.getResourceId());
+ boolean consumed = false;
+ try {
+ consumed = processFileResource(fileResource);
+ } catch (RuntimeException e) {
+ setEndedState(STATE.CONSUMER_EXCEPTION);
+ throw e;
+ } catch (Error e) {
+ setEndedState(STATE.CONSUMER_ERROR);
+ throw e;
+ }
+ //if anything is thrown from processFileResource, then the fileStarted
+ //will remain what it was right before the exception was thrown.
+ currentFile = null;
+ return consumed;
+ }
+
+ /**
+ * This politely asks the consumer to shutdown.
+ * Before processing another file, the consumer will check to see
+ * if it has been asked to terminate.
+ * <p>
+ * This offers another method for politely requesting
+ * that a FileResourceConsumer stop processing
+ * besides passing it {@link org.apache.tika.batch.PoisonFileResource}.
+ *
+ */
+ public void pleaseShutdown() {
+ setEndedState(STATE.ASKED_TO_SHUTDOWN);
+ }
+
+ /**
+ * Returns the name and start time of a file that is currently being processed.
+ * If no file is currently being processed, this will return null.
+ *
+ * @return FileStarted or null
+ */
+ public FileStarted getCurrentFile() {
+ return currentFile;
+ }
+
+ public int getNumResourcesConsumed() {
+ return numResourcesConsumed;
+ }
+
+ public int getNumHandledExceptions() {
+ return numHandledExceptions;
+ }
+
+ /**
+ * Checks to see if the currentFile being processed (if there is one)
+ * should be timed out (still being worked on after staleThresholdMillis).
+ * <p>
+ * If the consumer should be timed out, this will return the currentFile and
+ * set the state to TIMED_OUT.
+ * <p>
+ * If the consumer was already timed out earlier or
+ * is not processing a file or has been working on a file
+ * for less than #staleThresholdMillis, then this will return null.
+ * <p>
+ * @param staleThresholdMillis threshold to determine whether the consumer has gone stale.
+ * @return null or the file started that triggered the stale condition
+ */
+ public FileStarted checkForTimedOutMillis(long staleThresholdMillis) {
+ //if there isn't a current file, don't bother obtaining lock
+ if (currentFile == null) {
+ return null;
+ }
+ //if threshold is < 0, don't even look.
+ if (staleThresholdMillis < 0) {
+ return null;
+ }
+ synchronized(lock) {
+ //check again once the lock has been obtained
+ if (currentState != STATE.ACTIVELY_CONSUMING
+ && currentState != STATE.ASKED_TO_SHUTDOWN) {
+ return null;
+ }
+ FileStarted tmp = currentFile;
+ if (tmp == null) {
+ return null;
+ }
+ if (tmp.getElapsedMillis() > staleThresholdMillis) {
+ setEndedState(STATE.TIMED_OUT);
+ logWithResourceId(Level.FATAL, TIME_OUT,
+ tmp.getResourceId(), ELAPSED_MILLIS, Long.toString(tmp.getElapsedMillis()));
+ return tmp;
+ }
+ }
+ return null;
+ }
+
+ protected void logWithResourceId(Level level, String type, String resourceId, String... attrs) {
+ logWithResourceId(level, type, resourceId, null, attrs);
+ }
+
+ /**
+ * Use this for structured output that captures resourceId and other attributes.
+ *
+ * @param level level
+ * @param type entity name for exception
+ * @param resourceId resourceId string
+ * @param t throwable can be null
+ * @param attrs (array of key0, value0, key1, value1, etc.)
+ */
+ protected void logWithResourceId(Level level, String type, String resourceId, Throwable t, String... attrs) {
+
+ StringWriter writer = new StringWriter();
+ try {
+ XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(writer);
+ xml.writeStartDocument();
+ xml.writeStartElement(type);
+ xml.writeAttribute("resourceId", resourceId);
+ if (attrs != null) {
+ //this assumes args has name value pairs alternating, name0 at 0, val0 at 1, name1 at 2, val2 at 3, etc.
+ for (int i = 0; i < attrs.length - 1; i++) {
+ xml.writeAttribute(attrs[i], attrs[i + 1]);
+ }
+ }
+ if (t != null) {
+ StringWriter stackWriter = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(stackWriter);
+ t.printStackTrace(printWriter);
+ xml.writeCharacters(stackWriter.toString());
+ }
+ xml.writeEndElement();
+ xml.writeEndDocument();
+ xml.flush();
+ xml.close();
+ } catch (XMLStreamException e) {
+ logger.error("error writing xml stream for: " + resourceId, t);
+ }
+
+ logger.log(level, writer.toString());
+ }
+
+ private FileResource getNextFileResource() throws InterruptedException {
+ FileResource fileResource = null;
+ long start = new Date().getTime();
+ while (fileResource == null) {
+ //check to see if thread is interrupted before polling
+ if (Thread.currentThread().isInterrupted()) {
+ setEndedState(STATE.THREAD_INTERRUPTED);
+ logger.debug("Consumer thread was interrupted.");
+ break;
+ }
+
+ synchronized(lock) {
+ //need to lock here to prevent race condition with other threads setting state
+ if (currentState != STATE.ACTIVELY_CONSUMING) {
+ logger.debug("Consumer already closed because of: "+ currentState.toString());
+ break;
+ }
+ }
+ fileResource = fileQueue.poll(1L, TimeUnit.SECONDS);
+ if (fileResource != null) {
+ if (fileResource instanceof PoisonFileResource) {
+ setEndedState(STATE.SWALLOWED_POISON);
+ fileResource = null;
+ }
+ break;
+ }
+ logger.debug(consumerId + " is waiting for file and the queue size is: " + fileQueue.size());
+
+ long elapsed = new Date().getTime() - start;
+ if (maxConsecWaitInMillis > 0 && elapsed > maxConsecWaitInMillis) {
+ setEndedState(STATE.EXCEEDED_MAX_CONSEC_WAIT_MILLIS);
+ break;
+ }
+ }
+ return fileResource;
+ }
+
+ protected void close(Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e){
+ logger.error(e.getMessage());
+ }
+ }
+ closeable = null;
+ }
+
+ protected void flushAndClose(Closeable closeable) {
+ if (closeable == null) {
+ return;
+ }
+ if (closeable instanceof Flushable){
+ try {
+ ((Flushable)closeable).flush();
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ close(closeable);
+ }
+
+ //do not overwrite a finished state except if
+ //not yet started, actively consuming or shutting down. This should
+ //represent the initial cause; all subsequent calls
+ //to set will be ignored!!!
+ private void setEndedState(STATE cause) {
+ synchronized(lock) {
+ if (currentState == STATE.NOT_YET_STARTED ||
+ currentState == STATE.ACTIVELY_CONSUMING ||
+ currentState == STATE.ASKED_TO_SHUTDOWN) {
+ currentState = cause;
+ }
+ }
+ }
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,269 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.metadata.Metadata;
+
+public abstract class FileResourceCrawler implements Callable<IFileProcessorFutureResult> {
+
+ protected final static int SKIPPED = 0;
+ protected final static int ADDED = 1;
+ protected final static int STOP_NOW = 2;
+
+ private volatile boolean hasCompletedCrawling = false;
+ private volatile boolean shutDownNoPoison = false;
+ private volatile boolean isActive = true;
+ private volatile boolean timedOut = false;
+
+ //how long to pause if can't add to queue
+ private static final long PAUSE_INCREMENT_MILLIS = 1000;
+
+ protected static Logger logger = Logger.getLogger(FileResourceCrawler.class.toString());
+
+ private int maxFilesToAdd = -1;
+ private int maxFilesToConsider = -1;
+
+ private final ArrayBlockingQueue<FileResource> queue;
+ private final int numConsumers;
+
+
+ private long maxConsecWaitInMillis = 300000;//300,000ms = 5 minutes
+ private DocumentSelector documentSelector = null;
+
+ //number of files added to queue
+ private int added = 0;
+ //number of files considered including those that were rejected by documentSelector
+ private int considered = 0;
+
+ /**
+ * @param queue shared queue
+ * @param numConsumers number of consumers (needs to know how many poisons to add when done)
+ */
+ public FileResourceCrawler(ArrayBlockingQueue<FileResource> queue, int numConsumers) {
+ this.queue = queue;
+ this.numConsumers = numConsumers;
+ }
+
+ /**
+ * Implement this to control the addition of FileResources. Call {@link #tryToAdd}
+ * to add FileResources to the queue.
+ *
+ * @throws InterruptedException
+ */
+ public abstract void start() throws InterruptedException;
+
+ public FileResourceCrawlerFutureResult call() {
+ try {
+ start();
+ } catch (InterruptedException e) {
+ //this can be triggered by shutdownNow in BatchProcess
+ logger.info("InterruptedException in FileCrawler: " + e.getMessage());
+ } catch (Exception e) {
+ logger.error("Exception in FileResourceCrawler: " + e.getMessage());
+ } finally {
+ isActive = false;
+ }
+
+ try {
+ shutdown();
+ } catch (InterruptedException e) {
+ //swallow
+ }
+
+ return new FileResourceCrawlerFutureResult(considered, added);
+ }
+
+ /**
+ *
+ * @param fileResource resource to add
+ * @return int status of the attempt (SKIPPED, ADDED, STOP_NOW) to add the resource to the queue.
+ * @throws InterruptedException
+ */
+ protected int tryToAdd(FileResource fileResource) throws InterruptedException {
+
+ if (maxFilesToAdd > -1 && added >= maxFilesToAdd) {
+ return STOP_NOW;
+ }
+
+ if (maxFilesToConsider > -1 && considered > maxFilesToConsider) {
+ return STOP_NOW;
+ }
+
+ boolean isAdded = false;
+ if (select(fileResource.getMetadata())) {
+ long totalConsecutiveWait = 0;
+ while (queue.offer(fileResource, 1L, TimeUnit.SECONDS) == false) {
+
+ logger.info("FileResourceCrawler is pausing. Queue is full: " + queue.size());
+ Thread.sleep(PAUSE_INCREMENT_MILLIS);
+ totalConsecutiveWait += PAUSE_INCREMENT_MILLIS;
+ if (maxConsecWaitInMillis > -1 && totalConsecutiveWait > maxConsecWaitInMillis) {
+ timedOut = true;
+ logger.error("Crawler had to wait longer than max consecutive wait time.");
+ throw new InterruptedException("FileResourceCrawler had to wait longer than max consecutive wait time.");
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ logger.info("FileResourceCrawler shutting down because of interrupted thread.");
+ throw new InterruptedException("FileResourceCrawler interrupted.");
+ }
+ }
+ isAdded = true;
+ added++;
+ } else {
+ logger.debug("crawler did not select: "+fileResource.getResourceId());
+ }
+ considered++;
+ return (isAdded)?ADDED:SKIPPED;
+ }
+
+ //Warning! Depending on the value of maxConsecWaitInMillis
+ //this could try forever in vain to add poison to the queue.
+ private void shutdown() throws InterruptedException{
+ logger.debug("FileResourceCrawler entering shutdown");
+ if (hasCompletedCrawling || shutDownNoPoison) {
+ return;
+ }
+ int i = 0;
+ long start = new Date().getTime();
+ while (queue.offer(new PoisonFileResource(), 1L, TimeUnit.SECONDS)) {
+ if (shutDownNoPoison) {
+ logger.debug("quitting the poison loop because shutDownNoPoison is now true");
+ return;
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ logger.debug("thread interrupted while trying to add poison");
+ return;
+ }
+ long elapsed = new Date().getTime() - start;
+ if (maxConsecWaitInMillis > -1 && elapsed > maxConsecWaitInMillis) {
+ logger.error("Crawler timed out while trying to add poison");
+ return;
+ }
+ logger.debug("added "+i+" number of PoisonFileResource(s)");
+ if (i++ >= numConsumers) {
+ break;
+ }
+
+ }
+ hasCompletedCrawling = true;
+ }
+
+ /**
+ * If the crawler stops for any reason, it is no longer active.
+ *
+ * @return whether crawler is active or not
+ */
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public void setMaxConsecWaitInMillis(long maxConsecWaitInMillis) {
+ this.maxConsecWaitInMillis = maxConsecWaitInMillis;
+ }
+ public void setDocumentSelector(DocumentSelector documentSelector) {
+ this.documentSelector = documentSelector;
+ }
+
+ public int getConsidered() {
+ return considered;
+ }
+
+ protected boolean select(Metadata m) {
+ return documentSelector.select(m);
+ }
+
+ /**
+ * Maximum number of files to add. If {@link #maxFilesToAdd} < 0 (default),
+ * then this crawler will add all documents.
+ *
+ * @param maxFilesToAdd maximum number of files to add to the queue
+ */
+ public void setMaxFilesToAdd(int maxFilesToAdd) {
+ this.maxFilesToAdd = maxFilesToAdd;
+ }
+
+
+ /**
+ * Maximum number of files to consider. A file is considered
+ * whether or not the DocumentSelector selects a document.
+ * <p/>
+ * If {@link #maxFilesToConsider} < 0 (default), then this crawler
+ * will add all documents.
+ *
+ * @param maxFilesToConsider maximum number of files to consider adding to the queue
+ */
+ public void setMaxFilesToConsider(int maxFilesToConsider) {
+ this.maxFilesToConsider = maxFilesToConsider;
+ }
+
+ /**
+ * Use sparingly. This synchronizes on the queue!
+ * @return whether this queue contains any non-poison file resources
+ */
+ public boolean isQueueEmpty() {
+ int size= 0;
+ synchronized(queue) {
+ for (FileResource aQueue : queue) {
+ if (!(aQueue instanceof PoisonFileResource)) {
+ size++;
+ }
+ }
+ }
+ return size == 0;
+ }
+
+ /**
+ * Returns whether the crawler timed out while trying to add a resource
+ * to the queue.
+ * <p/>
+ * If the crawler timed out while trying to add poison, this is not
+ * set to true.
+ *
+ * @return whether this was timed out or not
+ */
+ public boolean wasTimedOut() {
+ return timedOut;
+ }
+
+ /**
+ *
+ * @return number of files that this crawler added to the queue
+ */
+ public int getAdded() {
+ return added;
+ }
+
+ /**
+ * Set to true to shut down the FileResourceCrawler without
+ * adding poison. Do this only if you've already called another mechanism
+ * to request that consumers shut down. This prevents a potential deadlock issue
+ * where the crawler is trying to add to the queue, but it is full.
+ *
+ * @return
+ */
+ public void shutDownNoPoison() {
+ this.shutDownNoPoison = true;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,37 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class FileResourceCrawlerFutureResult implements IFileProcessorFutureResult {
+
+ private final int considered;
+ private final int added;
+
+ protected FileResourceCrawlerFutureResult(int considered, int added) {
+ this.considered = considered;
+ this.added = added;
+ }
+
+ protected int getConsidered() {
+ return considered;
+ }
+
+ protected int getAdded() {
+ return added;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,113 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Date;
+
+/**
+ * Simple class to record the time when a FileResource's processing started.
+ */
+class FileStarted {
+
+ private final String resourceId;
+ private final long started;
+
+ /**
+ * Initializes a new FileStarted class with {@link #resourceId}
+ * and sets {@link #started} as new Date().getTime().
+ *
+ * @param resourceId string for unique resource id
+ */
+ public FileStarted(String resourceId) {
+ this(resourceId, new Date().getTime());
+ }
+
+ public FileStarted(String resourceId, long started) {
+ this.resourceId = resourceId;
+ this.started = started;
+ }
+
+
+ /**
+ * @return id of resource
+ */
+ public String getResourceId() {
+ return resourceId;
+ }
+
+ /**
+ * @return time at which processing on this file started
+ */
+ public long getStarted() {
+ return started;
+ }
+
+ /**
+ * @return elapsed milliseconds this the start of processing of this
+ * file resource
+ */
+ public long getElapsedMillis() {
+ long now = new Date().getTime();
+ return now - started;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((resourceId == null) ? 0 : resourceId.hashCode());
+ result = prime * result + (int) (started ^ (started >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof FileStarted)) {
+ return false;
+ }
+ FileStarted other = (FileStarted) obj;
+ if (resourceId == null) {
+ if (other.resourceId != null) {
+ return false;
+ }
+ } else if (!resourceId.equals(other.resourceId)) {
+ return false;
+ }
+ return started == other.started;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("FileStarted [resourceId=");
+ builder.append(resourceId);
+ builder.append(", started=");
+ builder.append(started);
+ builder.append("]");
+ return builder.toString();
+ }
+
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,26 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * stub interface to allow for different result types from different processors
+ *
+ */
+public interface IFileProcessorFutureResult {
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,57 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.concurrent.Callable;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.io.IOUtils;
+
+
+/**
+ * Class that waits for input on System.in. If the user enters a keystroke on
+ * System.in, this will send a signal to the FileResourceRunner to shutdown gracefully.
+ *
+ * <p>
+ * In the future, this may implement a common IInterrupter interface for more flexibility.
+ */
+public class Interrupter implements Callable<IFileProcessorFutureResult> {
+
+ private Logger logger = Logger.getLogger(Interrupter.class);
+ public IFileProcessorFutureResult call(){
+ try{
+ BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, IOUtils.UTF_8));
+ while (true){
+ if (reader.ready()){
+ reader.readLine();
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ } catch (InterruptedException e){
+ //canceller was interrupted
+ } catch (IOException e){
+ logger.error("IOException from STDIN in CommandlineInterrupter.");
+ }
+ return new InterrupterFutureResult();
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,22 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class InterrupterFutureResult implements IFileProcessorFutureResult {
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,29 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.metadata.Metadata;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface OutputStreamFactory {
+
+ public OutputStream getOutputStream(Metadata metadata) throws IOException;
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,100 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ParallelFileProcessingResult {
+ private final int considered;
+ private final int added;
+ private final int consumed;
+ private final double secondsElapsed;
+ private final int exitStatus;
+ private final String causeForTermination;
+
+ public ParallelFileProcessingResult(int considered, int added, int consumed, double secondsElapsed,
+ int exitStatus,
+ String causeForTermination) {
+ this.considered = considered;
+ this.added = added;
+ this.consumed = consumed;
+ this.secondsElapsed = secondsElapsed;
+ this.exitStatus = exitStatus;
+ this.causeForTermination = causeForTermination;
+ }
+
+ /**
+ * Returns the number of file resources considered.
+ * If a filter causes the crawler to ignore a number of resources,
+ * this number could be higher than that returned by {@link #getConsumed()}.
+ *
+ * @return number of file resources considered
+ */
+ public int getConsidered() {
+ return considered;
+ }
+
+ /**
+ * @return number of resources added to the queue
+ */
+ public int getAdded() {
+ return added;
+ }
+
+ /**
+ * @return number of resources that were tried to be consumed. There
+ * may have been an exception.
+ */
+ public int getConsumed() {
+ return consumed;
+ }
+
+ /**
+ * @return whether the {@link BatchProcess} was interrupted
+ * by an {@link Interrupter}.
+ */
+ public String getCauseForTermination() {
+ return causeForTermination;
+ }
+
+ /**
+ *
+ * @return seconds elapsed since the start of the batch processing
+ */
+ public double secondsElapsed() {
+ return secondsElapsed;
+ }
+
+ /**
+ *
+ * @return intendedExitStatus
+ */
+ public int getExitStatus() {
+ return exitStatus;
+ }
+
+ @Override
+ public String toString() {
+ return "ParallelFileProcessingResult{" +
+ "considered=" + considered +
+ ", added=" + added +
+ ", consumed=" + consumed +
+ ", secondsElapsed=" + secondsElapsed +
+ ", exitStatus=" + exitStatus +
+ ", causeForTermination='" + causeForTermination + '\'' +
+ '}';
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,27 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.parser.Parser;
+
+public interface ParserFactory {
+
+ public Parser getParser(TikaConfig config);
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,54 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.metadata.Metadata;
+
+import java.io.InputStream;
+
+/**
+ * Sentinel class for the crawler to add to the queue to let
+ * the consumers know that they should shutdown.
+ */
+class PoisonFileResource implements FileResource {
+
+ /**
+ * always returns null
+ */
+ @Override
+ public Metadata getMetadata() {
+ return null;
+ }
+
+ /**
+ * always returns null
+ */
+ @Override
+ public InputStream openInputStream() {
+ return null;
+ }
+
+ /**
+ * always returns null
+ */
+ @Override
+ public String getResourceId() {
+ return null;
+ }
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,227 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.text.NumberFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tika.util.DurationFormatUtils;
+
+/**
+ * Basic class to use for reporting status from both the crawler and the consumers.
+ * This wakes up roughly every {@link #sleepMillis} and log.info's a status report.
+ */
+
+public class StatusReporter implements Callable<IFileProcessorFutureResult> {
+
+ private final Log logger = LogFactory.getLog(StatusReporter.class);
+
+ //require references to these so that the
+ //StatusReporter can query them when it wakes up
+ private final ConsumersManager consumersManager;
+ private final FileResourceCrawler crawler;
+
+ //local time that the StatusReporter started
+ private final long start;
+ //how long to sleep between reporting intervals
+ private long sleepMillis = 1000;
+
+ //how long before considering a parse "stale" (potentially hung forever)
+ private long staleThresholdMillis = 100000;
+
+ private volatile boolean isShuttingDown = false;
+
+ /**
+ * Initialize with the crawler and consumers
+ *
+ * @param crawler crawler to ping at intervals
+ * @param consumersManager consumers to ping at intervals
+ */
+ public StatusReporter(FileResourceCrawler crawler, ConsumersManager consumersManager) {
+ this.consumersManager = consumersManager;
+ this.crawler = crawler;
+ start = new Date().getTime();
+ }
+
+ /**
+ * Override for different behavior.
+ * <p/>
+ * This reports the string at the info level to this class' logger.
+ *
+ * @param s string to report
+ */
+ protected void report(String s) {
+ logger.info(s);
+ }
+
+ /**
+ * Startup the reporter.
+ */
+ public IFileProcessorFutureResult call() {
+ NumberFormat numberFormat = NumberFormat.getNumberInstance(Locale.ROOT);
+ try {
+ while (true) {
+ Thread.sleep(sleepMillis);
+ int cnt = getRoughCountConsumed();
+ int exceptions = getRoughCountExceptions();
+ long elapsed = new Date().getTime() - start;
+ double elapsedSecs = (double) elapsed / (double) 1000;
+ int avg = (elapsedSecs > 5 || cnt > 100) ? (int) ((double) cnt / elapsedSecs) : -1;
+
+ String elapsedString = DurationFormatUtils.formatMillis(new Date().getTime() - start);
+ String docsPerSec = avg > -1 ? String.format(Locale.ROOT,
+ " (%s docs per sec)",
+ numberFormat.format(avg)) : "";
+ String msg =
+ String.format(
+ Locale.ROOT,
+ "Processed %s documents in %s%s.",
+ numberFormat.format(cnt), elapsedString, docsPerSec);
+ report(msg);
+ if (exceptions == 1){
+ msg = "There has been one handled exception.";
+ } else {
+ msg =
+ String.format(Locale.ROOT,
+ "There have been %s handled exceptions.",
+ numberFormat.format(exceptions));
+ }
+ report(msg);
+
+ reportStale();
+
+ int stillAlive = getStillAlive();
+ if (stillAlive == 1) {
+ msg = "There is one file processor still active.";
+ } else {
+ msg = "There are " + numberFormat.format(stillAlive) + " file processors still active.";
+ }
+ report(msg);
+
+ int crawled = crawler.getConsidered();
+ int added = crawler.getAdded();
+ if (crawled == 1) {
+ msg = "The directory crawler has considered 1 file,";
+ } else {
+ msg = "The directory crawler has considered " +
+ numberFormat.format(crawled) + " files, ";
+ }
+ if (added == 1) {
+ msg += "and it has added 1 file.";
+ } else {
+ msg += "and it has added " +
+ numberFormat.format(crawler.getAdded()) + " files.";
+ }
+ msg += "\n";
+ report(msg);
+
+ if (! crawler.isActive()) {
+ msg = "The directory crawler has completed its crawl.\n";
+ report(msg);
+ }
+ if (isShuttingDown) {
+ msg = "Process is shutting down now.";
+ report(msg);
+ }
+ }
+ } catch (InterruptedException e) {
+ //swallow
+ }
+ return new StatusReporterFutureResult();
+ }
+
+
+ /**
+ * Set the amount of time to sleep between reports.
+ * @param sleepMillis length to sleep btwn reports in milliseconds
+ */
+ public void setSleepMillis(long sleepMillis) {
+ this.sleepMillis = sleepMillis;
+ }
+
+ /**
+ * Set the amount of time in milliseconds to use as the threshold for determining
+ * a stale parse.
+ *
+ * @param staleThresholdMillis threshold for determining whether or not to report a stale
+ */
+ public void setStaleThresholdMillis(long staleThresholdMillis) {
+ this.staleThresholdMillis = staleThresholdMillis;
+ }
+
+
+ private void reportStale() {
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ FileStarted fs = consumer.getCurrentFile();
+ if (fs == null) {
+ continue;
+ }
+ long elapsed = fs.getElapsedMillis();
+ if (elapsed > staleThresholdMillis) {
+ String elapsedString = Double.toString((double) elapsed / (double) 1000);
+ report("A thread has been working on " + fs.getResourceId() +
+ " for " + elapsedString + " seconds.");
+ }
+ }
+ }
+
+ /*
+ * This returns a rough (unsynchronized) count of resources consumed.
+ */
+ private int getRoughCountConsumed() {
+ int ret = 0;
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ ret += consumer.getNumResourcesConsumed();
+ }
+ return ret;
+ }
+
+ private int getStillAlive() {
+ int ret = 0;
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ if ( consumer.isStillActive()) {
+ ret++;
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * This returns a rough (unsynchronized) count of caught/handled exceptions.
+ * @return rough count of exceptions
+ */
+ public int getRoughCountExceptions() {
+ int ret = 0;
+ for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+ ret += consumer.getNumHandledExceptions();
+ }
+ return ret;
+ }
+
+ /**
+ * Set whether the main process is in the process of shutting down.
+ * @param isShuttingDown
+ */
+ public void setIsShuttingDown(boolean isShuttingDown){
+ this.isShuttingDown = isShuttingDown;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,23 @@
+package org.apache.tika.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Empty class for what a StatusReporter returns when it finishes.
+ */
+public class StatusReporterFutureResult implements IFileProcessorFutureResult {
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,38 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResource;
+import org.w3c.dom.Node;
+
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+public abstract class AbstractConsumersBuilder {
+
+ public static int getDefaultNumConsumers(){
+ int n = Runtime.getRuntime().availableProcessors()-1;
+ return (n < 1) ? 1 : n;
+ }
+
+ public abstract ConsumersManager build(Node node, Map<String, String> runtimeAttributes,
+ ArrayBlockingQueue<FileResource> queue);
+
+
+}