You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2015/03/23 17:09:12 UTC

svn commit: r1668673 [2/6] - in /tika/trunk: ./ tika-app/ tika-app/src/main/java/org/apache/tika/cli/ tika-app/src/main/resources/ tika-app/src/test/java/org/apache/tika/cli/ tika-batch/ tika-batch/src/ tika-batch/src/main/ tika-batch/src/main/examples...

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,392 @@
+package org.apache.tika.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.io.IOUtils;
+
+public class BatchProcessDriverCLI {
+
+    /**
+     * This relies on an special exit values of 254 (do not restart),
+     * 0 ended correctly, 253 ended with exception (do restart)
+     */
+    public static final int PROCESS_RESTART_EXIT_CODE = 253;
+    //make sure this is above 255 to avoid stopping on system errors
+    //that is, if there is a system error (e.g. 143), you
+    //should restart the process.
+    public static final int PROCESS_NO_RESTART_EXIT_CODE = 254;
+    public static final int PROCESS_COMPLETED_SUCCESSFULLY = 0;
+    private static Logger logger = Logger.getLogger(BatchProcessDriverCLI.class);
+
+    private int maxProcessRestarts = -1;
+    private long pulseMillis = 1000;
+
+    //how many times to wait pulseMillis milliseconds if a restart
+    //message has been received through stdout, but the
+    //child process has not yet exited
+    private int waitNumLoopsAfterRestartmessage = 60;
+
+
+    private volatile boolean userInterrupted = false;
+    private boolean receivedRestartMsg = false;
+    private Process process = null;
+
+    private StreamGobbler errorWatcher = null;
+    private StreamGobbler outGobbler = null;
+    private InterruptWriter interruptWriter = null;
+    private final InterruptWatcher interruptWatcher =
+            new InterruptWatcher(System.in);
+
+    private Thread errorWatcherThread = null;
+    private Thread outGobblerThread = null;
+    private Thread interruptWriterThread = null;
+    private final Thread interruptWatcherThread = new Thread(interruptWatcher);
+
+    private final String[] commandLine;
+    private int numRestarts = 0;
+    private boolean redirectChildProcessToStdOut = true;
+
+    public BatchProcessDriverCLI(String[] commandLine){
+        this.commandLine = tryToReadMaxRestarts(commandLine);
+    }
+
+    private String[] tryToReadMaxRestarts(String[] commandLine) {
+        List<String> args = new ArrayList<String>();
+        for (int i = 0; i < commandLine.length; i++) {
+            String arg = commandLine[i];
+            if (arg.equals("-maxRestarts")) {
+                if (i == commandLine.length-1) {
+                    throw new IllegalArgumentException("Must specify an integer after \"-maxRestarts\"");
+                }
+                String restartNumString = commandLine[i+1];
+                try {
+                    maxProcessRestarts = Integer.parseInt(restartNumString);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Must specify an integer after \"-maxRestarts\" arg.");
+                }
+                i++;
+            } else {
+                args.add(arg);
+            }
+        }
+        return args.toArray(new String[args.size()]);
+    }
+
+    public void execute() throws Exception {
+
+        interruptWatcherThread.setDaemon(true);
+        interruptWatcherThread.start();
+        logger.trace("about to start");
+        start();
+        int loopsAfterRestartMessageReceived = 0;
+        while (!userInterrupted) {
+            Integer exit = null;
+            try {
+                logger.trace("about to check exit value");
+                exit = process.exitValue();
+                logger.trace("exit value:" + exit);
+                stop();
+            } catch (IllegalThreadStateException e) {
+                //hasn't exited
+                logger.trace("process has not exited; IllegalThreadStateException");
+            }
+
+            logger.trace("Before sleep:" +
+                        " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+
+            //Even if the process has exited,
+            //wait just a little bit to make sure that
+            //mustRestart hasn't been set to true
+            try {
+                Thread.sleep(pulseMillis);
+            } catch (InterruptedException e) {
+                logger.trace("interrupted exception during sleep");
+            }
+            logger.trace("After sleep:" +
+                    " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+            //if we've gotten the message via stdout to restart
+            //but the process hasn't exited yet, give it another
+            //chance
+            if (receivedRestartMsg && exit == null) {
+                loopsAfterRestartMessageReceived++;
+                logger.trace("Must restart, still not exited; loops after restart: " +
+                            loopsAfterRestartMessageReceived);
+                continue;
+            }
+            if (loopsAfterRestartMessageReceived > waitNumLoopsAfterRestartmessage) {
+                logger.trace("About to try to restart because:" +
+                        " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+                logger.warn("Restarting after exceeded wait loops waiting for exit: "+
+                        loopsAfterRestartMessageReceived);
+                boolean restarted = restart(exit, receivedRestartMsg);
+                if (!restarted) {
+                    break;
+                }
+            } else if (exit != null && exit != BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE
+                    && exit != BatchProcessDriverCLI.PROCESS_COMPLETED_SUCCESSFULLY) {
+                logger.trace("About to try to restart because:" +
+                            " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg);
+
+                if (exit != null && exit == BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE) {
+                    logger.info("Restarting on expected restart code");
+                } else {
+                    logger.warn("Restarting on unexpected restart code: "+exit);
+                }
+                boolean restarted = restart(exit, receivedRestartMsg);
+                if (!restarted) {
+                    break;
+                }
+            } else if (exit != null && (exit == PROCESS_COMPLETED_SUCCESSFULLY
+                    || exit == BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE)) {
+                logger.trace("Will not restart: "+exit);
+                break;
+            }
+        }
+        logger.trace("about to call shutdown driver now");
+        shutdownDriverNow();
+    }
+
+    private void shutdownDriverNow() {
+        if (process != null) {
+            for (int i = 0; i < 10; i++) {
+
+                logger.trace("trying to shut down: "+i);
+                try {
+                    int exit = process.exitValue();
+                    logger.trace("trying to stop:"+exit);
+                    stop();
+                    interruptWatcherThread.interrupt();
+                    return;
+                } catch (IllegalThreadStateException e) {
+                    //hasn't exited
+                }
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    //swallow
+                }
+            }
+            logger.error("Process didn't stop after 10 seconds after shutdown. " +
+                    "I am forcefully killing it.");
+        }
+        interruptWatcherThread.interrupt();
+    }
+
+    public int getNumRestarts() {
+        return numRestarts;
+    }
+
+    public boolean getUserInterrupted() {
+        return userInterrupted;
+    }
+
+    /**
+     * Tries to restart (stop and then start) the child process
+     * @return whether or not this was successful, will be false if numRestarts >= maxProcessRestarts
+     * @throws Exception
+     */
+    private boolean restart(Integer exitValue, boolean receivedRestartMsg) throws Exception {
+        if (maxProcessRestarts > -1 && numRestarts >= maxProcessRestarts) {
+            logger.warn("Hit the maximum number of process restarts. Driver is shutting down now.");
+            stop();
+            return false;
+        }
+        logger.warn("Must restart process (exitValue="+exitValue+" numRestarts="+numRestarts+
+                " receivedRestartMessage="+receivedRestartMsg+")");
+        stop();
+        start();
+        numRestarts++;
+        return true;
+    }
+
+    private void stop() {
+        if (process != null) {
+            logger.trace("destroying a non-null process");
+            process.destroy();
+        }
+
+        receivedRestartMsg = false;
+        //interrupt the writer thread first
+        interruptWriterThread.interrupt();
+
+        errorWatcher.stopGobblingAndDie();
+        outGobbler.stopGobblingAndDie();
+        errorWatcherThread.interrupt();
+        outGobblerThread.interrupt();
+    }
+
+    private void start() throws Exception {
+        ProcessBuilder builder = new ProcessBuilder(commandLine);
+        builder.directory(new File("."));
+        process = builder.start();
+
+        errorWatcher = new StreamWatcher(process.getErrorStream());
+        errorWatcherThread = new Thread(errorWatcher);
+        errorWatcherThread.start();
+
+        outGobbler = new StreamGobbler(process.getInputStream());
+        outGobblerThread = new Thread(outGobbler);
+        outGobblerThread.start();
+
+        interruptWriter = new InterruptWriter(process.getOutputStream());
+        interruptWriterThread = new Thread(interruptWriter);
+        interruptWriterThread.start();
+
+    }
+
+    public void setRedirectChildProcessToStdOut(boolean redirectChildProcessToStdOut) {
+        this.redirectChildProcessToStdOut = redirectChildProcessToStdOut;
+    }
+
+    /**
+     * Class to watch stdin from the driver for anything that is typed.
+     * This will currently cause an interrupt if anything followed by
+     * a return key is entered.  We may want to add an "Are you sure?" dialogue.
+     */
+    private class InterruptWatcher implements Runnable {
+        private BufferedReader reader;
+
+        private InterruptWatcher(InputStream is) {
+            reader = new BufferedReader(new InputStreamReader(is, IOUtils.UTF_8));
+        }
+
+        @Override
+        public void run() {
+            try {
+                //this will block.
+                //as soon as it reads anything,
+                //set userInterrupted to true and stop
+                reader.readLine();
+                userInterrupted = true;
+            } catch (IOException e) {
+                //swallow
+            }
+        }
+    }
+
+    /**
+     * Class that writes to the child process
+     * to force an interrupt in the child process.
+     */
+    private class InterruptWriter implements Runnable {
+        private final Writer writer;
+
+        private InterruptWriter(OutputStream os) {
+            this.writer = new OutputStreamWriter(os, IOUtils.UTF_8);
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    Thread.sleep(500);
+                    if (userInterrupted) {
+                        writer.write(String.format(Locale.ENGLISH, "Ave atque vale!%n"));
+                        writer.flush();
+                    }
+                }
+            } catch (IOException e) {
+                //swallow
+            } catch (InterruptedException e) {
+                //job is done, ok
+            }
+        }
+    }
+
+    private class StreamGobbler implements Runnable {
+        //plagiarized from org.apache.oodt's StreamGobbler
+        protected final BufferedReader reader;
+        protected boolean running = true;
+
+        private StreamGobbler(InputStream is) {
+            this.reader = new BufferedReader(new InputStreamReader(new BufferedInputStream(is),
+                    IOUtils.UTF_8));
+        }
+
+        @Override
+        public void run() {
+            String line = null;
+            try {
+                logger.trace("gobbler starting to read");
+                while ((line = reader.readLine()) != null && this.running) {
+                    if (redirectChildProcessToStdOut) {
+                        System.out.println("BatchProcess:"+line);
+                    }
+                }
+            } catch (IOException e) {
+                logger.trace("gobbler io exception");
+                //swallow ioe
+            }
+            logger.trace("gobbler done");
+        }
+
+        private void stopGobblingAndDie() {
+            logger.trace("stop gobbling");
+            running = false;
+            IOUtils.closeQuietly(reader);
+        }
+    }
+
+    private class StreamWatcher extends StreamGobbler implements Runnable {
+        //plagiarized from org.apache.oodt's StreamGobbler
+
+        private StreamWatcher(InputStream is){
+            super(is);
+        }
+
+        @Override
+        public void run() {
+            String line = null;
+            try {
+                logger.trace("watcher starting to read");
+                while ((line = reader.readLine()) != null && this.running) {
+                    if (line.startsWith(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString())) {
+                        receivedRestartMsg = true;
+                    }
+                    logger.info("BatchProcess: "+line);
+                }
+            } catch (IOException e) {
+                logger.trace("watcher io exception");
+                //swallow ioe
+            }
+            logger.trace("watcher done");
+        }
+    }
+
+
+    public static void main(String[] args) throws Exception {
+
+        BatchProcessDriverCLI runner = new BatchProcessDriverCLI(args);
+        runner.execute();
+        System.out.println("FSBatchProcessDriver has gracefully completed");
+        System.exit(0);
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,80 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Simple interface around a collection of consumers that allows
+ * for initializing and shutting shared resources (e.g. db connection, index, writer, etc.)
+ */
+public abstract class ConsumersManager {
+
+    //maximum time to allow the ConsumersManager for either init()
+    //or shutdown()
+    private long consumersManagerMaxMillis = 60000;
+    private final List<FileResourceConsumer> consumers;
+
+    public ConsumersManager(List<FileResourceConsumer> consumers) {
+        this.consumers = Collections.unmodifiableList(consumers);
+    }
+    /**
+     * Get the consumers
+     * @return consumers
+     */
+    public List<FileResourceConsumer> getConsumers() {
+        return consumers;
+    }
+
+    /**
+     * This is called by BatchProcess before submitting the threads
+     */
+    public void init(){
+
+    }
+
+    /**
+     * This is called by BatchProcess immediately before closing.
+     * Beware! Some of the consumers may have hung or may not
+     * have completed.
+     */
+    public void shutdown(){
+
+    }
+
+    /**
+     * {@link org.apache.tika.batch.BatchProcess} will throw an exception
+     * if the ConsumersManager doesn't complete init() or shutdown()
+     * within this amount of time.
+     * @return the maximum time allowed for init() or shutdown()
+     */
+    public long getConsumersManagerMaxMillis() {
+        return consumersManagerMaxMillis;
+    }
+
+    /**
+     * {@see #getConsumersManagerMaxMillis()}
+     *
+     * @param consumersManagerMaxMillis maximum number of milliseconds
+     *                                  to allow for init() or shutdown()
+     */
+    public void setConsumersManagerMaxMillis(long consumersManagerMaxMillis) {
+        this.consumersManagerMaxMillis = consumersManagerMaxMillis;
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,37 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class FileConsumerFutureResult implements IFileProcessorFutureResult {
+
+  private final FileStarted fileStarted;
+  private final int filesProcessed;
+  
+  public FileConsumerFutureResult(FileStarted fs, int filesProcessed) {
+    this.fileStarted = fs;
+    this.filesProcessed = filesProcessed;
+  }
+  
+  public FileStarted getFileStarted() {
+    return fileStarted;
+  }
+  
+  public int getFilesProcessed() {
+    return filesProcessed;
+  }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,68 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.Property;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * This is a basic interface to handle a logical "file".  
+ * This should enable code-agnostic handling of files from different 
+ * sources: file system, database, etc.
+ *
+ */
+public interface FileResource {
+
+  //The literal lowercased extension of a file.  This may or may not
+  //have any relationship to the actual type of the file.
+  public static final Property FILE_EXTENSION = Property.internalText("tika:file_ext");
+
+  /**
+   * This is only used in logging to identify which file
+   * may have caused problems.  While it is probably best
+   * to use unique ids for the sake of debugging, it is not 
+   * necessary that the ids be unique.  This id
+   * is never used as a hashkey by the batch processors, for example.
+   * 
+   * @return an id for a FileResource
+   */
+  public String getResourceId();
+  
+  /**
+   * This gets the metadata available before the parsing of the file.
+   * This will typically be "external" metadata: file name,
+   * file size, file location, data stream, etc.  That is, things
+   * that are known about the file from outside information, not
+   * file-internal metadata.
+   * 
+   * @return Metadata
+   */
+  public Metadata getMetadata();
+  
+  /**
+   * 
+   * @return an InputStream for the FileResource
+   * @throws java.io.IOException
+   */
+  public InputStream openInputStream() throws IOException;
+  
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,380 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This is a base class for file consumers. The
+ * goal of this class is to abstract out the multithreading
+ * and recordkeeping components.
+ * <p/>
+ */
+public abstract class FileResourceConsumer implements Callable<IFileProcessorFutureResult> {
+
+    private static enum STATE {
+        NOT_YET_STARTED,
+        ACTIVELY_CONSUMING,
+        SWALLOWED_POISON,
+        THREAD_INTERRUPTED,
+        EXCEEDED_MAX_CONSEC_WAIT_MILLIS,
+        ASKED_TO_SHUTDOWN,
+        TIMED_OUT,
+        CONSUMER_EXCEPTION,
+        CONSUMER_ERROR,
+        COMPLETED
+    }
+
+    public static String TIME_OUT = "timeout";
+    public static String ELAPSED_MILLIS = "elapsedMS";
+
+    private static AtomicInteger numConsumers = new AtomicInteger(-1);
+    protected static Logger logger = Logger.getLogger(FileResourceConsumer.class);
+
+    private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes
+
+    private final ArrayBlockingQueue<FileResource> fileQueue;
+
+    private final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
+    private final int consumerId;
+
+    //used to lock checks on state to prevent
+    private final Object lock = new Object();
+
+    //this records the file that is currently
+    //being processed.  It is null if no file is currently being processed.
+    //no need for volatile because of lock for checkForStales
+    private FileStarted currentFile = null;
+
+    //total number of files consumed; volatile so that reporter
+    //sees the latest
+    private volatile int numResourcesConsumed = 0;
+
+    //total number of exceptions that were handled by subclasses;
+    //volatile so that reporter sees the latest
+    private volatile int numHandledExceptions = 0;
+
+    //after this has been set to ACTIVELY_CONSUMING,
+    //this should only be set by setEndedState.
+    private volatile STATE currentState = STATE.NOT_YET_STARTED;
+
+    public FileResourceConsumer(ArrayBlockingQueue<FileResource> fileQueue) {
+        this.fileQueue = fileQueue;
+        consumerId = numConsumers.incrementAndGet();
+    }
+
+    public IFileProcessorFutureResult call() {
+        currentState = STATE.ACTIVELY_CONSUMING;
+
+        try {
+            FileResource fileResource = getNextFileResource();
+            while (fileResource != null) {
+                logger.debug("file consumer is about to process: " + fileResource.getResourceId());
+                boolean consumed = _processFileResource(fileResource);
+                logger.debug("file consumer has finished processing: " + fileResource.getResourceId());
+
+                if (consumed) {
+                    numResourcesConsumed++;
+                }
+                fileResource = getNextFileResource();
+            }
+        } catch (InterruptedException e) {
+            setEndedState(STATE.THREAD_INTERRUPTED);
+        }
+
+        setEndedState(STATE.COMPLETED);
+        return new FileConsumerFutureResult(currentFile, numResourcesConsumed);
+    }
+
+
+    /**
+     * Main piece of code that needs to be implemented.  Clients
+     * are responsible for closing streams and handling the exceptions
+     * that they'd like to handle.
+     * <p/>
+     * Unchecked throwables can be thrown past this, of course.  When an unchecked
+     * throwable is thrown, this logs the error, and then rethrows the exception.
+     * Clients/subclasses should make sure to catch and handle everything they can.
+     * <p/>
+     * The design goal is that the whole process should close up and shutdown soon after
+     * an unchecked exception or error is thrown.
+     * <p/>
+     * Make sure to call {@link #incrementHandledExceptions()} appropriately in
+     * your implementation of this method.
+     * <p/>
+     *
+     * @param fileResource resource to process
+     * @return whether or not a file was successfully processed
+     */
+    public abstract boolean processFileResource(FileResource fileResource);
+
+
+    /**
+     * Make sure to call this appropriately!
+     */
+    protected void incrementHandledExceptions() {
+        numHandledExceptions++;
+    }
+
+
+    /**
+     * Returns whether or not the consumer is still could process
+     * a file or is still processing a file (ACTIVELY_CONSUMING or ASKED_TO_SHUTDOWN)
+     * @return whether this consumer is still active
+     */
+    public boolean isStillActive() {
+        if (Thread.currentThread().isInterrupted()) {
+            return false;
+        } else if( currentState == STATE.NOT_YET_STARTED ||
+                currentState == STATE.ACTIVELY_CONSUMING ||
+                currentState == STATE.ASKED_TO_SHUTDOWN) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean _processFileResource(FileResource fileResource) {
+        currentFile = new FileStarted(fileResource.getResourceId());
+        boolean consumed = false;
+        try {
+            consumed = processFileResource(fileResource);
+        } catch (RuntimeException e) {
+            setEndedState(STATE.CONSUMER_EXCEPTION);
+            throw e;
+        } catch (Error e) {
+            setEndedState(STATE.CONSUMER_ERROR);
+            throw e;
+        }
+        //if anything is thrown from processFileResource, then the fileStarted
+        //will remain what it was right before the exception was thrown.
+        currentFile = null;
+        return consumed;
+    }
+
+    /**
+     * This politely asks the consumer to shutdown.
+     * Before processing another file, the consumer will check to see
+     * if it has been asked to terminate.
+     * <p>
+     * This offers another method for politely requesting
+     * that a FileResourceConsumer stop processing
+     * besides passing it {@link org.apache.tika.batch.PoisonFileResource}.
+     *
+     */
+    public void pleaseShutdown() {
+        setEndedState(STATE.ASKED_TO_SHUTDOWN);
+    }
+
+    /**
+     * Returns the name and start time of a file that is currently being processed.
+     * If no file is currently being processed, this will return null.
+     *
+     * @return FileStarted or null
+     */
+    public FileStarted getCurrentFile() {
+        return currentFile;
+    }
+
+    public int getNumResourcesConsumed() {
+        return numResourcesConsumed;
+    }
+
+    public int getNumHandledExceptions() {
+        return numHandledExceptions;
+    }
+
+    /**
+     * Checks to see if the currentFile being processed (if there is one)
+     * should be timed out (still being worked on after staleThresholdMillis).
+     * <p>
+     * If the consumer should be timed out, this will return the currentFile and
+     * set the state to TIMED_OUT.
+     * <p>
+     * If the consumer was already timed out earlier or
+     * is not processing a file or has been working on a file
+     * for less than #staleThresholdMillis, then this will return null.
+     * <p>
+     * @param staleThresholdMillis threshold to determine whether the consumer has gone stale.
+     * @return null or the file started that triggered the stale condition
+     */
+    public FileStarted checkForTimedOutMillis(long staleThresholdMillis) {
+        //if there isn't a current file, don't bother obtaining lock
+        if (currentFile == null) {
+            return null;
+        }
+        //if threshold is < 0, don't even look.
+        if (staleThresholdMillis < 0) {
+            return null;
+        }
+        synchronized(lock) {
+            //check again once the lock has been obtained
+            if (currentState != STATE.ACTIVELY_CONSUMING
+                    && currentState != STATE.ASKED_TO_SHUTDOWN) {
+                return null;
+            }
+            FileStarted tmp = currentFile;
+            if (tmp == null) {
+                return null;
+            }
+            if (tmp.getElapsedMillis() > staleThresholdMillis) {
+                setEndedState(STATE.TIMED_OUT);
+                logWithResourceId(Level.FATAL, TIME_OUT,
+                        tmp.getResourceId(), ELAPSED_MILLIS, Long.toString(tmp.getElapsedMillis()));
+                return tmp;
+            }
+        }
+        return null;
+    }
+
+    protected void logWithResourceId(Level level, String type, String resourceId, String... attrs) {
+        logWithResourceId(level, type, resourceId, null, attrs);
+    }
+
+    /**
+     * Use this for structured output that captures resourceId and other attributes.
+     *
+     * @param level level
+     * @param type entity name for exception
+     * @param resourceId resourceId string
+     * @param t throwable can be null
+     * @param attrs (array of key0, value0, key1, value1, etc.)
+     */
+    protected void logWithResourceId(Level level, String type, String resourceId, Throwable t, String... attrs) {
+
+        StringWriter writer = new StringWriter();
+        try {
+            XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(writer);
+            xml.writeStartDocument();
+            xml.writeStartElement(type);
+            xml.writeAttribute("resourceId", resourceId);
+            if (attrs != null) {
+                //this assumes args has name value pairs alternating, name0 at 0, val0 at 1, name1 at 2, val2 at 3, etc.
+                for (int i = 0; i < attrs.length - 1; i++) {
+                    xml.writeAttribute(attrs[i], attrs[i + 1]);
+                }
+            }
+            if (t != null) {
+                StringWriter stackWriter = new StringWriter();
+                PrintWriter printWriter = new PrintWriter(stackWriter);
+                t.printStackTrace(printWriter);
+                xml.writeCharacters(stackWriter.toString());
+            }
+            xml.writeEndElement();
+            xml.writeEndDocument();
+            xml.flush();
+            xml.close();
+        } catch (XMLStreamException e) {
+            logger.error("error writing xml stream for: " + resourceId, t);
+        }
+
+        logger.log(level, writer.toString());
+    }
+
+    private FileResource getNextFileResource() throws InterruptedException {
+        FileResource fileResource = null;
+        long start = new Date().getTime();
+        while (fileResource == null) {
+            //check to see if thread is interrupted before polling
+            if (Thread.currentThread().isInterrupted()) {
+                setEndedState(STATE.THREAD_INTERRUPTED);
+                logger.debug("Consumer thread was interrupted.");
+                break;
+            }
+
+            synchronized(lock) {
+                //need to lock here to prevent race condition with other threads setting state
+                if (currentState != STATE.ACTIVELY_CONSUMING) {
+                    logger.debug("Consumer already closed because of: "+ currentState.toString());
+                    break;
+                }
+            }
+            fileResource = fileQueue.poll(1L, TimeUnit.SECONDS);
+            if (fileResource != null) {
+                if (fileResource instanceof PoisonFileResource) {
+                    setEndedState(STATE.SWALLOWED_POISON);
+                    fileResource = null;
+                }
+                break;
+            }
+            logger.debug(consumerId + " is waiting for file and the queue size is: " + fileQueue.size());
+
+            long elapsed = new Date().getTime() - start;
+            if (maxConsecWaitInMillis > 0 && elapsed > maxConsecWaitInMillis) {
+                setEndedState(STATE.EXCEEDED_MAX_CONSEC_WAIT_MILLIS);
+                break;
+            }
+        }
+        return fileResource;
+    }
+
+    protected void close(Closeable closeable) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (IOException e){
+                logger.error(e.getMessage());
+            }
+        }
+        closeable = null;
+    }
+
+    protected void flushAndClose(Closeable closeable) {
+        if (closeable == null) {
+            return;
+        }
+        if (closeable instanceof Flushable){
+            try {
+                ((Flushable)closeable).flush();
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
+        close(closeable);
+    }
+
+    //do not overwrite a finished state except if
+    //not yet started, actively consuming or shutting down.  This should
+    //represent the initial cause; all subsequent calls
+    //to set will be ignored!!!
+    private void setEndedState(STATE cause) {
+        synchronized(lock) {
+            if (currentState == STATE.NOT_YET_STARTED ||
+                    currentState == STATE.ACTIVELY_CONSUMING ||
+                    currentState == STATE.ASKED_TO_SHUTDOWN) {
+                currentState = cause;
+            }
+        }
+    }
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,269 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.metadata.Metadata;
+
+public abstract class FileResourceCrawler implements Callable<IFileProcessorFutureResult> {
+
+    protected final static int SKIPPED = 0;
+    protected final static int ADDED = 1;
+    protected final static int STOP_NOW = 2;
+
+    private volatile boolean hasCompletedCrawling = false;
+    private volatile boolean shutDownNoPoison = false;
+    private volatile boolean isActive = true;
+    private volatile boolean timedOut = false;
+
+    //how long to pause if can't add to queue
+    private static final long PAUSE_INCREMENT_MILLIS = 1000;
+
+    protected static Logger logger = Logger.getLogger(FileResourceCrawler.class.toString());
+
+    private int maxFilesToAdd = -1;
+    private int maxFilesToConsider = -1;
+
+    private final ArrayBlockingQueue<FileResource> queue;
+    private final int numConsumers;
+
+
+    private long maxConsecWaitInMillis = 300000;//300,000ms = 5 minutes
+    private DocumentSelector documentSelector = null;
+
+    //number of files added to queue
+    private int added = 0;
+    //number of files considered including those that were rejected by documentSelector
+    private int considered = 0;
+
+    /**
+     * @param queue        shared queue
+     * @param numConsumers number of consumers (needs to know how many poisons to add when done)
+     */
+    public FileResourceCrawler(ArrayBlockingQueue<FileResource> queue, int numConsumers) {
+        this.queue = queue;
+        this.numConsumers = numConsumers;
+    }
+
+    /**
+     * Implement this to control the addition of FileResources.  Call {@link #tryToAdd}
+     * to add FileResources to the queue.
+     *
+     * @throws InterruptedException
+     */
+    public abstract void start() throws InterruptedException;
+
+    public FileResourceCrawlerFutureResult call() {
+        try {
+            start();
+        } catch (InterruptedException e) {
+            //this can be triggered by shutdownNow in BatchProcess
+            logger.info("InterruptedException in FileCrawler: " + e.getMessage());
+        } catch (Exception e) {
+            logger.error("Exception in FileResourceCrawler: " + e.getMessage());
+        } finally {
+            isActive = false;
+        }
+
+        try {
+            shutdown();
+        } catch (InterruptedException e) {
+            //swallow
+        }
+
+        return new FileResourceCrawlerFutureResult(considered, added);
+    }
+
+    /**
+     *
+     * @param fileResource resource to add
+     * @return int status of the attempt (SKIPPED, ADDED, STOP_NOW) to add the resource to the queue.
+     * @throws InterruptedException
+     */
+    protected int tryToAdd(FileResource fileResource) throws InterruptedException {
+
+        if (maxFilesToAdd > -1 && added >= maxFilesToAdd) {
+            return STOP_NOW;
+        }
+
+        if (maxFilesToConsider > -1 && considered > maxFilesToConsider) {
+            return STOP_NOW;
+        }
+
+        boolean isAdded = false;
+        if (select(fileResource.getMetadata())) {
+            long totalConsecutiveWait = 0;
+            while (queue.offer(fileResource, 1L, TimeUnit.SECONDS) == false) {
+
+                logger.info("FileResourceCrawler is pausing.  Queue is full: " + queue.size());
+                Thread.sleep(PAUSE_INCREMENT_MILLIS);
+                totalConsecutiveWait += PAUSE_INCREMENT_MILLIS;
+                if (maxConsecWaitInMillis > -1 && totalConsecutiveWait > maxConsecWaitInMillis) {
+                    timedOut = true;
+                    logger.error("Crawler had to wait longer than max consecutive wait time.");
+                    throw new InterruptedException("FileResourceCrawler had to wait longer than max consecutive wait time.");
+                }
+                if (Thread.currentThread().isInterrupted()) {
+                    logger.info("FileResourceCrawler shutting down because of interrupted thread.");
+                    throw new InterruptedException("FileResourceCrawler interrupted.");
+                }
+            }
+            isAdded = true;
+            added++;
+        } else {
+            logger.debug("crawler did not select: "+fileResource.getResourceId());
+        }
+        considered++;
+        return (isAdded)?ADDED:SKIPPED;
+    }
+
+    //Warning! Depending on the value of maxConsecWaitInMillis
+    //this could try forever in vain to add poison to the queue.
+    private void shutdown() throws InterruptedException{
+        logger.debug("FileResourceCrawler entering shutdown");
+        if (hasCompletedCrawling || shutDownNoPoison) {
+            return;
+        }
+        int i = 0;
+        long start = new Date().getTime();
+        while (queue.offer(new PoisonFileResource(), 1L, TimeUnit.SECONDS)) {
+            if (shutDownNoPoison) {
+                logger.debug("quitting the poison loop because shutDownNoPoison is now true");
+                return;
+            }
+            if (Thread.currentThread().isInterrupted()) {
+                logger.debug("thread interrupted while trying to add poison");
+                return;
+            }
+            long elapsed = new Date().getTime() - start;
+            if (maxConsecWaitInMillis > -1 && elapsed > maxConsecWaitInMillis) {
+                logger.error("Crawler timed out while trying to add poison");
+                return;
+            }
+            logger.debug("added "+i+" number of PoisonFileResource(s)");
+            if (i++ >= numConsumers) {
+                break;
+            }
+
+        }
+        hasCompletedCrawling = true;
+    }
+
+    /**
+     * If the crawler stops for any reason, it is no longer active.
+     *
+     * @return whether crawler is active or not
+     */
+    public boolean isActive() {
+        return isActive;
+    }
+
+    public void setMaxConsecWaitInMillis(long maxConsecWaitInMillis) {
+        this.maxConsecWaitInMillis = maxConsecWaitInMillis;
+    }
+    public void setDocumentSelector(DocumentSelector documentSelector) {
+        this.documentSelector = documentSelector;
+    }
+
+    public int getConsidered() {
+        return considered;
+    }
+
+    protected boolean select(Metadata m) {
+        return documentSelector.select(m);
+    }
+
+    /**
+     * Maximum number of files to add.  If {@link #maxFilesToAdd} < 0 (default),
+     * then this crawler will add all documents.
+     *
+     * @param maxFilesToAdd maximum number of files to add to the queue
+     */
+    public void setMaxFilesToAdd(int maxFilesToAdd) {
+        this.maxFilesToAdd = maxFilesToAdd;
+    }
+
+
+    /**
+     * Maximum number of files to consider.  A file is considered
+     * whether or not the DocumentSelector selects a document.
+     * <p/>
+     * If {@link #maxFilesToConsider} < 0 (default), then this crawler
+     * will add all documents.
+     *
+     * @param maxFilesToConsider maximum number of files to consider adding to the queue
+     */
+    public void setMaxFilesToConsider(int maxFilesToConsider) {
+        this.maxFilesToConsider = maxFilesToConsider;
+    }
+
+    /**
+     * Use sparingly.  This synchronizes on the queue!
+     * @return whether this queue contains any non-poison file resources
+     */
+    public boolean isQueueEmpty() {
+        int size= 0;
+        synchronized(queue) {
+            for (FileResource aQueue : queue) {
+                if (!(aQueue instanceof PoisonFileResource)) {
+                    size++;
+                }
+            }
+        }
+        return size == 0;
+    }
+
+    /**
+     * Returns whether the crawler timed out while trying to add a resource
+     * to the queue.
+     * <p/>
+     * If the crawler timed out while trying to add poison, this is not
+     * set to true.
+     *
+     * @return whether this was timed out or not
+     */
+    public boolean wasTimedOut() {
+        return timedOut;
+    }
+
+    /**
+     *
+     * @return number of files that this crawler added to the queue
+     */
+    public int getAdded() {
+        return added;
+    }
+
+    /**
+     * Set to true to shut down the FileResourceCrawler without
+     * adding poison.  Do this only if you've already called another mechanism
+     * to request that consumers shut down.  This prevents a potential deadlock issue
+     * where the crawler is trying to add to the queue, but it is full.
+     *
+     * @return
+     */
+    public void shutDownNoPoison() {
+        this.shutDownNoPoison = true;
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,37 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class FileResourceCrawlerFutureResult implements IFileProcessorFutureResult {
+
+  private final int considered;
+  private final int added;
+  
+  protected FileResourceCrawlerFutureResult(int considered, int added) {
+    this.considered = considered;
+    this.added = added;
+  }
+  
+  protected int getConsidered() {
+    return considered;
+  }
+  
+  protected int getAdded() {
+    return added;
+  }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,113 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Date;
+
+/**
+ * Simple class to record the time when a FileResource's processing started.
+ */
+class FileStarted {
+
+    private final String resourceId;
+    private final long started;
+
+    /**
+     * Initializes a new FileStarted class with {@link #resourceId}
+     * and sets {@link #started} as new Date().getTime().
+     *
+     * @param resourceId string for unique resource id
+     */
+    public FileStarted(String resourceId) {
+        this(resourceId, new Date().getTime());
+    }
+
+    public FileStarted(String resourceId, long started) {
+        this.resourceId = resourceId;
+        this.started = started;
+    }
+
+
+    /**
+     * @return id of resource
+     */
+    public String getResourceId() {
+        return resourceId;
+    }
+
+    /**
+     * @return time at which processing on this file started
+     */
+    public long getStarted() {
+        return started;
+    }
+
+    /**
+     * @return elapsed milliseconds this the start of processing of this
+     * file resource
+     */
+    public long getElapsedMillis() {
+        long now = new Date().getTime();
+        return now - started;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result
+                + ((resourceId == null) ? 0 : resourceId.hashCode());
+        result = prime * result + (int) (started ^ (started >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof FileStarted)) {
+            return false;
+        }
+        FileStarted other = (FileStarted) obj;
+        if (resourceId == null) {
+            if (other.resourceId != null) {
+                return false;
+            }
+        } else if (!resourceId.equals(other.resourceId)) {
+            return false;
+        }
+        return started == other.started;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("FileStarted [resourceId=");
+        builder.append(resourceId);
+        builder.append(", started=");
+        builder.append(started);
+        builder.append("]");
+        return builder.toString();
+    }
+
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,26 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * stub interface to allow for different result types from different processors
+ *
+ */
+public interface IFileProcessorFutureResult {
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,57 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.concurrent.Callable;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.io.IOUtils;
+
+
+/**
+ * Class that waits for input on System.in.  If the user enters a keystroke on 
+ * System.in, this will send a signal to the FileResourceRunner to shutdown gracefully.
+ *
+ * <p>
+ * In the future, this may implement a common IInterrupter interface for more flexibility.
+ */
+public class Interrupter implements Callable<IFileProcessorFutureResult> {
+
+    private Logger logger = Logger.getLogger(Interrupter.class);
+	public IFileProcessorFutureResult call(){
+		try{
+			BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, IOUtils.UTF_8));
+			while (true){
+				if (reader.ready()){
+					reader.readLine();
+					break;
+				} else {
+					Thread.sleep(1000);
+				}
+			}
+		} catch (InterruptedException e){
+		    //canceller was interrupted
+		} catch (IOException e){
+            logger.error("IOException from STDIN in CommandlineInterrupter.");
+		}
+		return new InterrupterFutureResult();
+	}
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,22 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class InterrupterFutureResult implements IFileProcessorFutureResult {
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,29 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.metadata.Metadata;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface OutputStreamFactory {
+  
+  public OutputStream getOutputStream(Metadata metadata) throws IOException;
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,100 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ParallelFileProcessingResult {
+    private final int considered;
+    private final int added;
+    private final int consumed;
+    private final double secondsElapsed;
+    private final int exitStatus;
+    private final String causeForTermination;
+
+    public ParallelFileProcessingResult(int considered, int added, int consumed, double secondsElapsed,
+                                        int exitStatus,
+                                        String causeForTermination) {
+        this.considered = considered;
+        this.added = added;
+        this.consumed = consumed;
+        this.secondsElapsed = secondsElapsed;
+        this.exitStatus = exitStatus;
+        this.causeForTermination = causeForTermination;
+    }
+
+    /**
+     * Returns the number of file resources considered.
+     * If a filter causes the crawler to ignore a number of resources,
+     * this number could be higher than that returned by {@link #getConsumed()}.
+     *
+     * @return number of file resources considered
+     */
+    public int getConsidered() {
+        return considered;
+    }
+
+    /**
+     * @return number of resources added to the queue
+     */
+    public int getAdded() {
+        return added;
+    }
+
+    /**
+     * @return number of resources that were tried to be consumed.  There
+     * may have been an exception.
+     */
+    public int getConsumed() {
+        return consumed;
+    }
+
+    /**
+     * @return whether the {@link BatchProcess} was interrupted
+     * by an {@link Interrupter}.
+     */
+    public String getCauseForTermination() {
+        return causeForTermination;
+    }
+
+    /**
+     *
+     * @return seconds elapsed since the start of the batch processing
+     */
+    public double secondsElapsed() {
+        return secondsElapsed;
+    }
+
+    /**
+     *
+     * @return intendedExitStatus
+     */
+    public int getExitStatus() {
+        return exitStatus;
+    }
+
+    @Override
+    public String toString() {
+        return "ParallelFileProcessingResult{" +
+                "considered=" + considered +
+                ", added=" + added +
+                ", consumed=" + consumed +
+                ", secondsElapsed=" + secondsElapsed +
+                ", exitStatus=" + exitStatus +
+                ", causeForTermination='" + causeForTermination + '\'' +
+                '}';
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,27 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.parser.Parser;
+
+public interface ParserFactory {
+  
+  public Parser getParser(TikaConfig config);
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,54 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.metadata.Metadata;
+
+import java.io.InputStream;
+
+/**
+ * Sentinel class for the crawler to add to the queue to let
+ * the consumers know that they should shutdown.
+ */
+class PoisonFileResource implements FileResource {
+  
+  /**
+   * always returns null
+   */
+  @Override
+  public Metadata getMetadata() {
+    return null;
+  }
+
+  /**
+   * always returns null
+   */
+  @Override
+  public InputStream openInputStream() {
+    return null;
+  }
+
+  /**
+   * always returns null
+   */
+  @Override
+  public String getResourceId() {
+    return null;
+  }
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,227 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.text.NumberFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tika.util.DurationFormatUtils;
+
+/**
+ * Basic class to use for reporting status from both the crawler and the consumers.
+ * This wakes up roughly every {@link #sleepMillis} and log.info's a status report.
+ */
+
+public class StatusReporter implements Callable<IFileProcessorFutureResult> {
+
+    private final Log logger = LogFactory.getLog(StatusReporter.class);
+
+    //require references to these so that the
+    //StatusReporter can query them when it wakes up
+    private final ConsumersManager consumersManager;
+    private final FileResourceCrawler crawler;
+
+    //local time that the StatusReporter started
+    private final long start;
+    //how long to sleep between reporting intervals
+    private long sleepMillis = 1000;
+
+    //how long before considering a parse "stale" (potentially hung forever)
+    private long staleThresholdMillis = 100000;
+
+    private volatile boolean isShuttingDown = false;
+
+    /**
+     * Initialize with the crawler and consumers
+     *
+     * @param crawler   crawler to ping at intervals
+     * @param consumersManager consumers to ping at intervals
+     */
+    public StatusReporter(FileResourceCrawler crawler, ConsumersManager consumersManager) {
+        this.consumersManager = consumersManager;
+        this.crawler = crawler;
+        start = new Date().getTime();
+    }
+
+    /**
+     * Override for different behavior.
+     * <p/>
+     * This reports the string at the info level to this class' logger.
+     *
+     * @param s string to report
+     */
+    protected void report(String s) {
+        logger.info(s);
+    }
+
+    /**
+     * Startup the reporter.
+     */
+    public IFileProcessorFutureResult call() {
+        NumberFormat numberFormat = NumberFormat.getNumberInstance(Locale.ROOT);
+        try {
+            while (true) {
+                Thread.sleep(sleepMillis);
+                int cnt = getRoughCountConsumed();
+                int exceptions = getRoughCountExceptions();
+                long elapsed = new Date().getTime() - start;
+                double elapsedSecs = (double) elapsed / (double) 1000;
+                int avg = (elapsedSecs > 5 || cnt > 100) ? (int) ((double) cnt / elapsedSecs) : -1;
+
+                String elapsedString = DurationFormatUtils.formatMillis(new Date().getTime() - start);
+                String docsPerSec = avg > -1 ? String.format(Locale.ROOT,
+                        " (%s docs per sec)",
+                        numberFormat.format(avg)) : "";
+                String msg =
+                        String.format(
+                                Locale.ROOT,
+                                "Processed %s documents in %s%s.",
+                                numberFormat.format(cnt), elapsedString, docsPerSec);
+                report(msg);
+                if (exceptions == 1){
+                    msg = "There has been one handled exception.";
+                } else {
+                    msg =
+                            String.format(Locale.ROOT,
+                                    "There have been %s handled exceptions.",
+                                    numberFormat.format(exceptions));
+                }
+                report(msg);
+
+                reportStale();
+
+                int stillAlive = getStillAlive();
+                if (stillAlive == 1) {
+                    msg = "There is one file processor still active.";
+                } else {
+                    msg = "There are " + numberFormat.format(stillAlive) + " file processors still active.";
+                }
+                report(msg);
+
+                int crawled = crawler.getConsidered();
+                int added = crawler.getAdded();
+                if (crawled == 1) {
+                    msg = "The directory crawler has considered 1 file,";
+                } else {
+                    msg = "The directory crawler has considered " +
+                            numberFormat.format(crawled) + " files, ";
+                }
+                if (added == 1) {
+                    msg += "and it has added 1 file.";
+                } else {
+                    msg += "and it has added " +
+                            numberFormat.format(crawler.getAdded()) + " files.";
+                }
+                msg += "\n";
+                report(msg);
+
+                if (! crawler.isActive()) {
+                    msg = "The directory crawler has completed its crawl.\n";
+                    report(msg);
+                }
+                if (isShuttingDown) {
+                    msg = "Process is shutting down now.";
+                    report(msg);
+                }
+            }
+        } catch (InterruptedException e) {
+            //swallow
+        }
+        return new StatusReporterFutureResult();
+    }
+
+
+    /**
+     * Set the amount of time to sleep between reports.
+     * @param sleepMillis length to sleep btwn reports in milliseconds
+     */
+    public void setSleepMillis(long sleepMillis) {
+        this.sleepMillis = sleepMillis;
+    }
+
+    /**
+     * Set the amount of time in milliseconds to use as the threshold for determining
+     * a stale parse.
+     *
+     * @param staleThresholdMillis threshold for determining whether or not to report a stale
+     */
+    public void setStaleThresholdMillis(long staleThresholdMillis) {
+        this.staleThresholdMillis = staleThresholdMillis;
+    }
+
+
+    private void reportStale() {
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            FileStarted fs = consumer.getCurrentFile();
+            if (fs == null) {
+                continue;
+            }
+            long elapsed = fs.getElapsedMillis();
+            if (elapsed > staleThresholdMillis) {
+                String elapsedString = Double.toString((double) elapsed / (double) 1000);
+                report("A thread has been working on " + fs.getResourceId() +
+                        " for " + elapsedString + " seconds.");
+            }
+        }
+    }
+
+    /*
+     * This returns a rough (unsynchronized) count of resources consumed.
+     */
+    private int getRoughCountConsumed() {
+        int ret = 0;
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            ret += consumer.getNumResourcesConsumed();
+        }
+        return ret;
+    }
+
+    private int getStillAlive() {
+        int ret = 0;
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            if ( consumer.isStillActive()) {
+                ret++;
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * This returns a rough (unsynchronized) count of caught/handled exceptions.
+     * @return rough count of exceptions
+     */
+    public int getRoughCountExceptions() {
+        int ret = 0;
+        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
+            ret += consumer.getNumHandledExceptions();
+        }
+        return ret;
+    }
+
+    /**
+     * Set whether the main process is in the process of shutting down.
+     * @param isShuttingDown
+     */
+    public void setIsShuttingDown(boolean isShuttingDown){
+        this.isShuttingDown = isShuttingDown;
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,23 @@
+package org.apache.tika.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Empty class for what a StatusReporter returns when it finishes.
+ */
+public class StatusReporterFutureResult implements IFileProcessorFutureResult {
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,38 @@
+package org.apache.tika.batch.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResource;
+import org.w3c.dom.Node;
+
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+public abstract class AbstractConsumersBuilder {
+
+  public static int getDefaultNumConsumers(){
+    int n = Runtime.getRuntime().availableProcessors()-1;
+    return (n < 1) ? 1 : n;
+  }
+
+  public abstract ConsumersManager build(Node node, Map<String, String> runtimeAttributes,
+    ArrayBlockingQueue<FileResource> queue);
+
+
+}