You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2015/03/23 17:09:12 UTC

svn commit: r1668673 [4/6] - in /tika/trunk: ./ tika-app/ tika-app/src/main/java/org/apache/tika/cli/ tika-app/src/main/resources/ tika-app/src/test/java/org/apache/tika/cli/ tika-batch/ tika-batch/src/ tika-batch/src/main/ tika-batch/src/main/examples...

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,163 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.log4j.Level;
+import org.apache.tika.batch.BatchNoRestartError;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceConsumer;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.batch.ParserFactory;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.io.IOUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.util.TikaExceptionFilter;
+import org.xml.sax.helpers.DefaultHandler;
+
+/**
+ * Basic FileResourceConsumer that reads files from an input
+ * directory and writes content to the output directory.
+ * <p/>
+ * This tries to catch most of the common exceptions, log them and
+ * store them in the metadata list output.
+ */
+public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
+
+
+    private final ParserFactory parserFactory;
+    private final ContentHandlerFactory contentHandlerFactory;
+    private final OutputStreamFactory fsOSFactory;
+    private final TikaConfig tikaConfig;
+    private String outputEncoding = "UTF-8";
+    //TODO: parameterize this
+    private TikaExceptionFilter exceptionFilter = new TikaExceptionFilter();
+
+
+    public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
+                                            ParserFactory parserFactory,
+                                            ContentHandlerFactory contentHandlerFactory,
+                                            OutputStreamFactory fsOSFactory, TikaConfig tikaConfig) {
+        super(queue);
+        this.parserFactory = parserFactory;
+        this.contentHandlerFactory = contentHandlerFactory;
+        this.fsOSFactory = fsOSFactory;
+        this.tikaConfig = tikaConfig;
+    }
+
+    @Override
+    public boolean processFileResource(FileResource fileResource) {
+
+        Parser wrapped = parserFactory.getParser(tikaConfig);
+        RecursiveParserWrapper parser = new RecursiveParserWrapper(wrapped, contentHandlerFactory);
+        ParseContext context = new ParseContext();
+
+//        if (parseRecursively == true) {
+        context.set(Parser.class, parser);
+//        }
+
+        //try to open outputstream first
+        OutputStream os = getOutputStream(fsOSFactory, fileResource);
+
+        if (os == null) {
+            logger.debug("Skipping: " + fileResource.getMetadata().get(FSProperties.FS_REL_PATH));
+            return false;
+        }
+
+        //try to open the inputstream before the parse.
+        //if the parse hangs or throws a nasty exception, at least there will
+        //be a zero byte file there so that the batchrunner can skip that problematic
+        //file during the next run.
+        InputStream is = getInputStream(fileResource);
+        if (is == null) {
+            IOUtils.closeQuietly(os);
+            return false;
+        }
+
+        Throwable thrown = null;
+        List<Metadata> metadataList = null;
+        Metadata containerMetadata = fileResource.getMetadata();
+        try {
+            parse(fileResource.getResourceId(), parser, is, new DefaultHandler(),
+                    containerMetadata, context);
+            metadataList = parser.getMetadata();
+        } catch (Throwable t) {
+            thrown = t;
+            metadataList = parser.getMetadata();
+            if (metadataList == null) {
+                metadataList = new LinkedList<Metadata>();
+            }
+            Metadata m = null;
+            if (metadataList.size() == 0) {
+                m = containerMetadata;
+            } else {
+                //take the top metadata item
+                m = metadataList.remove(0);
+            }
+            String stackTrace = exceptionFilter.getStackTrace(t);
+            m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
+            metadataList.add(0, m);
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+
+        Writer writer = null;
+
+        try {
+            writer = new OutputStreamWriter(os, getOutputEncoding());
+            JsonMetadataList.toJson(metadataList, writer);
+        } catch (Exception e) {
+            logWithResourceId(Level.ERROR, "json_ex",
+                    fileResource.getResourceId(), e);
+        } finally {
+            flushAndClose(writer);
+        }
+
+        if (thrown != null) {
+            if (thrown instanceof Error) {
+                throw (Error) thrown;
+            } else {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public String getOutputEncoding() {
+        return outputEncoding;
+    }
+
+    public void setOutputEncoding(String outputEncoding) {
+        this.outputEncoding = outputEncoding;
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.batch.fs.builders;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceConsumer;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.batch.ParserFactory;
+import org.apache.tika.batch.builders.AbstractConsumersBuilder;
+import org.apache.tika.batch.builders.BatchProcessBuilder;
+import org.apache.tika.batch.builders.IContentHandlerFactoryBuilder;
+import org.apache.tika.batch.fs.BasicTikaFSConsumer;
+import org.apache.tika.batch.fs.FSConsumersManager;
+import org.apache.tika.batch.fs.FSOutputStreamFactory;
+import org.apache.tika.batch.fs.FSUtil;
+import org.apache.tika.batch.fs.RecursiveParserWrapperFSConsumer;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.util.ClassLoaderUtil;
+import org.apache.tika.util.PropsUtil;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder {
+
+    @Override
+    public ConsumersManager build(Node node, Map<String, String> runtimeAttributes,
+                                            ArrayBlockingQueue<FileResource> queue) {
+
+        //figure out if we're building a recursiveParserWrapper
+        boolean recursiveParserWrapper = false;
+        String recursiveParserWrapperString = runtimeAttributes.get("recursiveParserWrapper");
+        if (recursiveParserWrapperString != null){
+            recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperString, recursiveParserWrapper);
+        } else {
+            Node recursiveParserWrapperNode = node.getAttributes().getNamedItem("recursiveParserWrapper");
+            if (recursiveParserWrapperNode != null) {
+                recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperNode.getNodeValue(), recursiveParserWrapper);
+            }
+        }
+
+        //how long to let the consumersManager run on init() and shutdown()
+        Long consumersManagerMaxMillis = null;
+        String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis");
+        if (consumersManagerMaxMillisString != null){
+            consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisString, null);
+        } else {
+            Node consumersManagerMaxMillisNode = node.getAttributes().getNamedItem("consumersManagerMaxMillis");
+            if (consumersManagerMaxMillis == null) {
+                consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisNode.getNodeValue(),
+                        null);
+            }
+        }
+
+        TikaConfig config = null;
+        String tikaConfigPath = runtimeAttributes.get("c");
+
+        if( tikaConfigPath == null) {
+            Node tikaConfigNode = node.getAttributes().getNamedItem("tikaConfig");
+            if (tikaConfigNode != null) {
+                tikaConfigPath = PropsUtil.getString(tikaConfigNode.getNodeValue(), null);
+            }
+        }
+        if (tikaConfigPath != null) {
+            try {
+                config = new TikaConfig(new File(tikaConfigPath));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            config = TikaConfig.getDefaultConfig();
+        }
+
+        List<FileResourceConsumer> consumers = new LinkedList<FileResourceConsumer>();
+        int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes);
+
+        NodeList nodeList = node.getChildNodes();
+        Node contentHandlerFactoryNode = null;
+        Node parserFactoryNode = null;
+        Node outputStreamFactoryNode = null;
+
+        for (int i = 0; i < nodeList.getLength(); i++){
+            Node child = nodeList.item(i);
+            String cn = child.getNodeName();
+            if (cn.equals("parser")){
+                parserFactoryNode = child;
+            } else if (cn.equals("contenthandler")) {
+                contentHandlerFactoryNode = child;
+            } else if (cn.equals("outputstream")) {
+                outputStreamFactoryNode = child;
+            }
+        }
+
+        if (contentHandlerFactoryNode == null || parserFactoryNode == null
+                || outputStreamFactoryNode == null) {
+            throw new RuntimeException("You must specify a ContentHandlerFactory, "+
+                    "a ParserFactory and an OutputStreamFactory");
+        }
+        ContentHandlerFactory contentHandlerFactory = getContentHandlerFactory(contentHandlerFactoryNode, runtimeAttributes);
+        ParserFactory parserFactory = getParserFactory(parserFactoryNode, runtimeAttributes);
+        OutputStreamFactory outputStreamFactory = getOutputStreamFactory(outputStreamFactoryNode, runtimeAttributes);
+
+        if (recursiveParserWrapper) {
+            for (int i = 0; i < numConsumers; i++) {
+                FileResourceConsumer c = new RecursiveParserWrapperFSConsumer(queue,
+                        parserFactory, contentHandlerFactory, outputStreamFactory, config);
+                consumers.add(c);
+            }
+        } else {
+            for (int i = 0; i < numConsumers; i++) {
+                FileResourceConsumer c = new BasicTikaFSConsumer(queue,
+                        parserFactory, contentHandlerFactory, outputStreamFactory, config);
+                consumers.add(c);
+            }
+        }
+        ConsumersManager manager = new FSConsumersManager(consumers);
+        if (consumersManagerMaxMillis != null) {
+            manager.setConsumersManagerMaxMillis(consumersManagerMaxMillis);
+        }
+        return manager;
+    }
+
+
+    private ContentHandlerFactory getContentHandlerFactory(Node node, Map<String, String> runtimeAttributes) {
+
+        Map<String, String> localAttrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+        String className = localAttrs.get("builderClass");
+        if (className == null) {
+            throw new RuntimeException("Must specify builderClass for contentHandler");
+        }
+        IContentHandlerFactoryBuilder builder = ClassLoaderUtil.buildClass(IContentHandlerFactoryBuilder.class, className);
+        return builder.build(node, runtimeAttributes);
+    }
+
+    private ParserFactory getParserFactory(Node node, Map<String, String> runtimeAttributes) {
+        //TODO: add ability to set TikaConfig file path
+        Map<String, String> localAttrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+        String className = localAttrs.get("class");
+        return ClassLoaderUtil.buildClass(ParserFactory.class, className);
+    }
+
+    private OutputStreamFactory getOutputStreamFactory(Node node, Map<String, String> runtimeAttributes) {
+        Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+
+        File outputDir = PropsUtil.getFile(attrs.get("outputDir"), null);
+/*        FSUtil.HANDLE_EXISTING handleExisting = null;
+        String handleExistingString = attrs.get("handleExisting");
+        if (handleExistingString == null) {
+            handleExistingException();
+        } else if (handleExistingString.equals("overwrite")){
+            handleExisting = FSUtil.HANDLE_EXISTING.OVERWRITE;
+        } else if (handleExistingString.equals("rename")) {
+            handleExisting = FSUtil.HANDLE_EXISTING.RENAME;
+        } else if (handleExistingString.equals("skip")) {
+            handleExisting = FSUtil.HANDLE_EXISTING.SKIP;
+        } else {
+            handleExistingException();
+        }
+*/
+        String compressionString = attrs.get("compression");
+        FSOutputStreamFactory.COMPRESSION compression = FSOutputStreamFactory.COMPRESSION.NONE;
+        if (compressionString == null) {
+            //do nothing
+        } else if (compressionString.contains("bz")) {
+            compression = FSOutputStreamFactory.COMPRESSION.BZIP2;
+        } else if (compressionString.contains("gz")) {
+            compression = FSOutputStreamFactory.COMPRESSION.GZIP;
+        } else if (compressionString.contains("zip")) {
+            compression = FSOutputStreamFactory.COMPRESSION.ZIP;
+        }
+        String suffix = attrs.get("outputSuffix");
+
+        //TODO: possibly open up the different handle existings in the future
+        //but for now, lock it down to require skip.  Too dangerous otherwise
+        //if the driver restarts and this is set to overwrite...
+        return new FSOutputStreamFactory(outputDir, FSUtil.HANDLE_EXISTING.SKIP,
+                compression, suffix);
+    }
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,129 @@
+package org.apache.tika.batch.fs.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.regex.Pattern;
+
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceCrawler;
+import org.apache.tika.batch.builders.BatchProcessBuilder;
+import org.apache.tika.batch.builders.ICrawlerBuilder;
+import org.apache.tika.batch.fs.FSDirectoryCrawler;
+import org.apache.tika.batch.fs.FSDocumentSelector;
+import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.util.PropsUtil;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Node;
+
+/**
+ * Builds either an FSDirectoryCrawler or an FSListCrawler.
+ */
+public class FSCrawlerBuilder implements ICrawlerBuilder {
+
+    private final static String MAX_CONSEC_WAIT_MILLIS = "maxConsecWaitMillis";
+    private final static String MAX_FILES_TO_ADD_ATTR = "maxFilesToAdd";
+    private final static String MAX_FILES_TO_CONSIDER_ATTR = "maxFilesToConsider";
+
+
+    private final static String CRAWL_ORDER = "crawlOrder";
+    private final static String INPUT_DIR_ATTR = "inputDir";
+    private final static String INPUT_START_DIR_ATTR = "startDir";
+    private final static String MAX_FILE_SIZE_BYTES_ATTR = "maxFileSizeBytes";
+    private final static String MIN_FILE_SIZE_BYTES_ATTR = "minFileSizeBytes";
+
+
+    private final static String INCLUDE_FILE_PAT_ATTR = "includeFilePat";
+    private final static String EXCLUDE_FILE_PAT_ATTR = "excludeFilePat";
+
+    @Override
+    public FileResourceCrawler build(Node node, Map<String, String> runtimeAttributes,
+                                     ArrayBlockingQueue<FileResource> queue) {
+
+        Map<String, String> attributes = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+
+        int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes);
+        File inputDir = PropsUtil.getFile(attributes.get(INPUT_DIR_ATTR), new File("input"));
+        FileResourceCrawler crawler = null;
+        if (attributes.containsKey("fileList")) {
+            String randomCrawlString = attributes.get(CRAWL_ORDER);
+
+            if (randomCrawlString != null) {
+                //TODO: change to logger warn or throw RuntimeException?
+                System.err.println("randomCrawl attribute is ignored by FSListCrawler");
+            }
+            File fileList = PropsUtil.getFile(attributes.get("fileList"), null);
+            String encoding = PropsUtil.getString(attributes.get("fileListEncoding"), "UTF-8");
+            try {
+                crawler = new org.apache.tika.batch.fs.FSListCrawler(queue, numConsumers, inputDir, fileList, encoding);
+            } catch (java.io.FileNotFoundException e) {
+                throw new RuntimeException("fileList file not found for FSListCrawler: " + fileList.getAbsolutePath());
+            } catch (java.io.UnsupportedEncodingException e) {
+                throw new RuntimeException("fileList encoding not supported: "+encoding);
+            }
+        } else {
+            FSDirectoryCrawler.CRAWL_ORDER crawlOrder = getCrawlOrder(attributes.get(CRAWL_ORDER));
+            File startDir = PropsUtil.getFile(attributes.get(INPUT_START_DIR_ATTR), null);
+            if (startDir == null) {
+                crawler = new FSDirectoryCrawler(queue, numConsumers, inputDir, crawlOrder);
+            } else {
+                crawler = new FSDirectoryCrawler(queue, numConsumers, inputDir, startDir, crawlOrder);
+            }
+        }
+
+        crawler.setMaxFilesToConsider(PropsUtil.getInt(attributes.get(MAX_FILES_TO_CONSIDER_ATTR), -1));
+        crawler.setMaxFilesToAdd(PropsUtil.getInt(attributes.get(MAX_FILES_TO_ADD_ATTR), -1));
+
+        DocumentSelector selector = buildSelector(attributes);
+        if (selector != null) {
+            crawler.setDocumentSelector(selector);
+        }
+
+        crawler.setMaxConsecWaitInMillis(PropsUtil.getLong(attributes.get(MAX_CONSEC_WAIT_MILLIS), 300000L));//5 minutes
+        return crawler;
+    }
+
+    private FSDirectoryCrawler.CRAWL_ORDER getCrawlOrder(String s) {
+        if (s == null || s.trim().length() == 0 || s.equals("os")) {
+            return FSDirectoryCrawler.CRAWL_ORDER.OS_ORDER;
+        } else if (s.toLowerCase(Locale.ROOT).contains("rand")) {
+            return FSDirectoryCrawler.CRAWL_ORDER.RANDOM;
+        } else if (s.toLowerCase(Locale.ROOT).contains("sort")) {
+            return FSDirectoryCrawler.CRAWL_ORDER.SORTED;
+        } else {
+            return FSDirectoryCrawler.CRAWL_ORDER.OS_ORDER;
+        }
+    }
+
+    private DocumentSelector buildSelector(Map<String, String> attributes) {
+        String includeString = attributes.get(INCLUDE_FILE_PAT_ATTR);
+        String excludeString = attributes.get(EXCLUDE_FILE_PAT_ATTR);
+        long maxFileSize = PropsUtil.getLong(attributes.get(MAX_FILE_SIZE_BYTES_ATTR), -1L);
+        long minFileSize = PropsUtil.getLong(attributes.get(MIN_FILE_SIZE_BYTES_ATTR), -1L);
+        Pattern includePat = (includeString != null && includeString.length() > 0) ? Pattern.compile(includeString) : null;
+        Pattern excludePat = (excludeString != null && excludeString.length() > 0) ? Pattern.compile(excludeString) : null;
+
+        return new FSDocumentSelector(includePat, excludePat, minFileSize, maxFileSize);
+    }
+
+
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,249 @@
+package org.apache.tika.batch.fs.strawman;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.io.IOUtils;
+
+/**
+ * Simple single-threaded class that calls tika-app against every file in a directory.
+ *
+ * This is exceedingly robust.  One file per process.
+ *
+ * However, you can use this to compare performance against tika-batch fs code.
+ *
+ *
+ */
+public class StrawManTikaAppDriver implements Callable<Integer> {
+
+    private static AtomicInteger threadCount = new AtomicInteger(0);
+    private final int totalThreads;
+    private final int threadNum;
+    private int rootLen = -1;
+    private File inputDir = null;
+    private File outputDir = null;
+    private String[] args = null;
+    private Logger logger = Logger.getLogger(StrawManTikaAppDriver.class);
+
+
+    public StrawManTikaAppDriver(File inputDir, File outputDir, int totalThreads, String[] args) {
+        rootLen = inputDir.getAbsolutePath().length()+1;
+        this.inputDir = inputDir;
+        this.outputDir = outputDir;
+        this.args = args;
+        threadNum = threadCount.getAndIncrement();
+        this.totalThreads = totalThreads;
+    }
+
+
+    private int processDirectory(File inputDir) {
+        int processed = 0;
+        if (inputDir == null || inputDir.listFiles() == null) {
+            return processed;
+        }
+        for (File f : inputDir.listFiles()) {
+            List<File> childDirs = new ArrayList<File>();
+            if (f.isDirectory()) {
+                childDirs.add(f);
+            } else {
+                processed += processFile(f);
+            }
+            for (File dir : childDirs) {
+                processed += processDirectory(dir);
+
+            }
+        }
+        return processed;
+    }
+
+    private int processFile(File f) {
+        if (totalThreads > 1) {
+            int hashCode = f.getAbsolutePath().hashCode();
+            if (Math.abs(hashCode % totalThreads) != threadNum) {
+                return 0;
+            }
+        }
+        File outputFile = new File(outputDir, f.getAbsolutePath().substring(rootLen)+".txt");
+        outputFile.getAbsoluteFile().getParentFile().mkdirs();
+        if (! outputFile.getParentFile().exists()) {
+            logger.fatal("parent directory for "+ outputFile + " was not made!");
+            throw new RuntimeException("couldn't make parent file for " + outputFile);
+        }
+        List<String> commandLine = new ArrayList<String>();
+        for (String arg : args) {
+            commandLine.add(arg);
+        }
+        commandLine.add("-t");
+        commandLine.add("\""+f.getAbsolutePath()+"\"");
+        ProcessBuilder builder = new ProcessBuilder(commandLine.toArray(new String[commandLine.size()]));
+        logger.info("about to process: "+f.getAbsolutePath());
+        Process proc = null;
+        RedirectGobbler gobbler = null;
+        Thread gobblerThread = null;
+        try {
+            OutputStream os = new FileOutputStream(outputFile);
+            proc = builder.start();
+            gobbler = new RedirectGobbler(proc.getInputStream(), os);
+            gobblerThread = new Thread(gobbler);
+            gobblerThread.start();
+        } catch (IOException e) {
+            logger.error(e.getMessage());
+            return 0;
+        }
+
+        boolean finished = false;
+        long totalTime = 180000;//3 minutes
+        long pulse = 100;
+        for (int i = 0; i < totalTime; i += pulse) {
+            try {
+                Thread.currentThread().sleep(pulse);
+            } catch (InterruptedException e) {
+                //swallow
+            }
+            try {
+                int exit = proc.exitValue();
+                finished = true;
+                break;
+            } catch (IllegalThreadStateException e) {
+                //swallow
+            }
+        }
+        if (!finished) {
+            logger.warn("Had to kill process working on: " + f.getAbsolutePath());
+            proc.destroy();
+        }
+        gobbler.close();
+        gobblerThread.interrupt();
+        return 1;
+    }
+
+
+    @Override
+    public Integer call() throws Exception {
+        long start = new Date().getTime();
+
+        int processed = processDirectory(inputDir);
+        double elapsedSecs = ((double)new Date().getTime()-(double)start)/(double)1000;
+        logger.info("Finished processing " + processed + " files in " + elapsedSecs + " seconds.");
+        return processed;
+    }
+
+    private class RedirectGobbler implements Runnable {
+        private OutputStream redirectOs = null;
+        private InputStream redirectIs = null;
+
+        private RedirectGobbler(InputStream is, OutputStream os) {
+            this.redirectIs = is;
+            this.redirectOs = os;
+        }
+
+        private void close() {
+            if (redirectOs != null) {
+                try {
+                    redirectOs.flush();
+                } catch (IOException e) {
+                    logger.error("can't flush");
+                }
+                try {
+                    redirectIs.close();
+                } catch (IOException e) {
+                    logger.error("can't close input in redirect gobbler");
+                }
+                try {
+                    redirectOs.close();
+                } catch (IOException e) {
+                    logger.error("can't close output in redirect gobbler");
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                IOUtils.copy(redirectIs, redirectOs);
+            } catch (IOException e) {
+                logger.error("IOException while gobbling");
+            }
+        }
+    }
+
+    public static String usage() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Example usage:\n");
+        sb.append("java -cp <CP> org.apache.batch.fs.strawman.StrawManTikaAppDriver ");
+        sb.append("<inputDir> <outputDir> <numThreads> ");
+        sb.append("java -jar tika-app-X.Xjar <...commandline arguments for tika-app>\n\n");
+        return sb.toString();
+    }
+
+    public static void main(String[] args) {
+        long start = new Date().getTime();
+        if (args.length < 6) {
+            System.err.println(StrawManTikaAppDriver.usage());
+        }
+        File inputDir = new File(args[0]);
+        File outputDir = new File(args[1]);
+        int totalThreads = Integer.parseInt(args[2]);
+
+        List<String> commandLine = new ArrayList<String>();
+        commandLine.addAll(Arrays.asList(args).subList(3, args.length));
+        totalThreads = (totalThreads < 1) ? 1 : totalThreads;
+        ExecutorService ex = Executors.newFixedThreadPool(totalThreads);
+        ExecutorCompletionService<Integer> completionService =
+                new ExecutorCompletionService<Integer>(ex);
+
+        for (int i = 0; i < totalThreads; i++) {
+            StrawManTikaAppDriver driver =
+                    new StrawManTikaAppDriver(inputDir, outputDir, totalThreads, commandLine.toArray(new String[commandLine.size()]));
+            completionService.submit(driver);
+        }
+
+        int totalFilesProcessed = 0;
+        for (int i = 0; i < totalThreads; i++) {
+            try {
+                Future<Integer> future = completionService.take();
+                if (future != null) {
+                    totalFilesProcessed += future.get();
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (ExecutionException e) {
+                e.printStackTrace();
+            }
+        }
+        double elapsedSeconds = (double)(new Date().getTime()-start)/(double)1000;
+        System.out.println("Processed "+totalFilesProcessed + " in " + elapsedSeconds + " seconds");
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,41 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class ClassLoaderUtil {
+
+    @SuppressWarnings("unchecked")
+    public static <T> T buildClass(Class<T> iface, String className) {
+
+        ClassLoader loader = ClassLoader.getSystemClassLoader();
+        Class<?> clazz;
+        try {
+            clazz = loader.loadClass(className);
+            if (iface.isAssignableFrom(clazz)) {
+                return (T) clazz.newInstance();
+            }
+            throw new IllegalArgumentException(iface.toString() + " is not assignable from " + className);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        } catch (InstantiationException e) {
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,66 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Functionality and naming conventions (roughly) copied from org.apache.commons.lang3
+ * so that we didn't have to add another dependency.
+ */
+public class DurationFormatUtils {
+
+    public static String formatMillis(long duration) {
+        duration = Math.abs(duration);
+        StringBuilder sb = new StringBuilder();
+        int secs = (int) (duration / 1000) % 60;
+        int mins = (int) ((duration / (1000 * 60)) % 60);
+        int hrs = (int) ((duration / (1000 * 60 * 60)) % 24);
+        int days = (int) ((duration / (1000 * 60 * 60 * 24)) % 7);
+
+        //sb.append(millis + " milliseconds");
+        addUnitString(sb, days, "day");
+        addUnitString(sb, hrs, "hour");
+        addUnitString(sb, mins, "minute");
+        addUnitString(sb, secs, "second");
+        if (duration < 1000) {
+            addUnitString(sb, duration, "millisecond");
+        }
+
+        return sb.toString();
+    }
+
+    private static void addUnitString(StringBuilder sb, long unit, String unitString) {
+        //only add unit if >= 1
+        if (unit == 1) {
+            addComma(sb);
+            sb.append("1 ");
+            sb.append(unitString);
+        } else if (unit > 1) {
+            addComma(sb);
+            sb.append(unit);
+            sb.append(" ");
+            sb.append(unitString);
+            sb.append("s");
+        }
+    }
+
+    private static void addComma(StringBuilder sb) {
+        if (sb.length() > 0) {
+            sb.append(", ");
+        }
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,123 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.Locale;
+
+/**
+ * Utility class to handle properties.  If the value is null,
+ * or if there is a parser error, the defaultMissing value will be returned.
+ */
+public class PropsUtil {
+
+    /**
+     * Parses v.  If there is a problem, this returns defaultMissing.
+     *
+     * @param v string to parse
+     * @param defaultMissing value to return if value is null or unparseable
+     * @return parsed value
+     */
+    public static Boolean getBoolean(String v, Boolean defaultMissing) {
+        if (v == null || v.length() == 0) {
+            return defaultMissing;
+        }
+        if (v.toLowerCase(Locale.ROOT).equals("true")) {
+            return true;
+        }
+        if (v.toLowerCase(Locale.ROOT).equals("false")) {
+            return false;
+        }
+        return defaultMissing;
+    }
+
+    /**
+     * Parses v.  If there is a problem, this returns defaultMissing.
+     *
+     * @param v string to parse
+     * @param defaultMissing value to return if value is null or unparseable
+     * @return parsed value
+     */
+    public static Integer getInt(String v, Integer defaultMissing) {
+        if (v == null || v.length() == 0) {
+            return defaultMissing;
+        }
+        try {
+            return Integer.parseInt(v);
+        } catch (NumberFormatException e) {
+            //NO OP
+        }
+        return defaultMissing;
+    }
+
+    /**
+     * Parses v.  If there is a problem, this returns defaultMissing.
+     *
+     * @param v string to parse
+     * @param defaultMissing value to return if value is null or unparseable
+     * @return parsed value
+     */
+    public static Long getLong(String v, Long defaultMissing) {
+        if (v == null || v.length() == 0) {
+            return defaultMissing;
+        }
+        try {
+            return Long.parseLong(v);
+        } catch (NumberFormatException e) {
+            //swallow
+        }
+        return defaultMissing;
+    }
+
+
+    /**
+     * Parses v.  If there is a problem, this returns defaultMissing.
+     *
+     * @param v string to parse
+     * @param defaultMissing value to return if value is null or unparseable
+     * @return parsed value
+     */
+    public static File getFile(String v, File defaultMissing) {
+        if (v == null || v.length() == 0) {
+            return defaultMissing;
+        }
+        //trim initial and final " if they exist
+        if (v.startsWith("\"")) {
+            v = v.substring(1);
+        }
+        if (v.endsWith("\"")) {
+            v = v.substring(0, v.length()-1);
+        }
+
+        return new File(v);
+    }
+
+    /**
+     * Parses v.  If v is null, this returns defaultMissing.
+     *
+     * @param v string to parse
+     * @param defaultMissing value to return if value is null
+     * @return parsed value
+     */
+    public static String getString(String v, String defaultMissing) {
+        if (v == null) {
+            return defaultMissing;
+        }
+        return v;
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,63 @@
+package org.apache.tika.util;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.apache.tika.exception.TikaException;
+
+/**
+ * Unwrap TikaExceptions and other wrappers that we might not care about
+ * in downstream analysis.  This is similar to
+ * what tika-server does when returning stack traces.
+ */
+public class TikaExceptionFilter {
+
+    /**
+     * Unwrap TikaExceptions and other wrappers that users might not
+     * care about in downstream analysis.
+     *
+     * @param t throwable to filter
+     * @return filtered throwable
+     */
+    public Throwable filter(Throwable t) {
+        if (t instanceof TikaException) {
+            Throwable cause = t.getCause();
+            if (cause != null) {
+                return cause;
+            }
+        }
+        return t;
+    }
+
+    /**
+     * This calls {@link #filter} and then prints the filtered
+     * <code>Throwable</code>to a <code>String</code>.
+     *
+     * @param t throwable
+     * @return a filtered version of the StackTrace
+     */
+    public String getStackTrace(Throwable t) {
+        Throwable filtered = filter(t);
+        StringWriter stringWriter = new StringWriter();
+        PrintWriter w = new PrintWriter(stringWriter);
+        filtered.printStackTrace(w);
+        stringWriter.flush();
+        return stringWriter.toString();
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,109 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+
+public class XMLDOMUtil {
+
+    /**
+     * This grabs the attributes from a dom node and overwrites those values with those
+     * specified by the overwrite map.
+     *
+     * @param node node for building
+     * @param overwrite map of attributes to overwrite
+     * @return map of attributes
+     */
+    public static Map<String, String> mapifyAttrs(Node node, Map<String, String> overwrite) {
+        Map<String, String> map = new HashMap<String, String>();
+        NamedNodeMap nnMap = node.getAttributes();
+        for (int i = 0; i < nnMap.getLength(); i++) {
+            Node attr = nnMap.item(i);
+            map.put(attr.getNodeName(), attr.getNodeValue());
+        }
+        if (overwrite != null) {
+            for (Map.Entry<String, String> e : overwrite.entrySet()) {
+                map.put(e.getKey(), e.getValue());
+            }
+        }
+        return map;
+    }
+
+
+    /**
+     * Get an int value.  Try the runtime attributes first and then back off to
+     * the document element.  Throw a RuntimeException if the attribute is not
+     * found or if the value is not parseable as an int.
+     *
+     * @param attrName attribute name to find
+     * @param runtimeAttributes runtime attributes
+     * @param docElement correct element that should have specified attribute
+     * @return specified int value
+     */
+    public static int getInt(String attrName, Map<String, String> runtimeAttributes, Node docElement) {
+        String stringValue = getStringValue(attrName, runtimeAttributes, docElement);
+        if (stringValue != null) {
+            try {
+                return Integer.parseInt(stringValue);
+            } catch (NumberFormatException e) {
+                //swallow
+            }
+        }
+        throw new RuntimeException("Need to specify a parseable int value in -- "
+                +attrName+" -- in commandline or in config file!");
+    }
+
+
+    /**
+     * Get a long value.  Try the runtime attributes first and then back off to
+     * the document element.  Throw a RuntimeException if the attribute is not
+     * found or if the value is not parseable as a long.
+     *
+     * @param attrName attribute name to find
+     * @param runtimeAttributes runtime attributes
+     * @param docElement correct element that should have specified attribute
+     * @return specified long value
+     */
+    public static long getLong(String attrName, Map<String, String> runtimeAttributes, Node docElement) {
+        String stringValue = getStringValue(attrName, runtimeAttributes, docElement);
+        if (stringValue != null) {
+            try {
+                return Long.parseLong(stringValue);
+            } catch (NumberFormatException e) {
+                //swallow
+            }
+        }
+        throw new RuntimeException("Need to specify a \"long\" value in -- "
+                +attrName+" -- in commandline or in config file!");
+    }
+
+    private static String getStringValue(String attrName, Map<String, String> runtimeAttributes, Node docElement) {
+        String stringValue = runtimeAttributes.get(attrName);
+        if (stringValue == null) {
+            Node staleNode = docElement.getAttributes().getNamedItem(attrName);
+            if (staleNode != null) {
+                stringValue = staleNode.getNodeValue();
+            }
+        }
+        return stringValue;
+    }
+}

Added: tika/trunk/tika-batch/src/main/java/overview.html
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/overview.html?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/overview.html (added)
+++ tika/trunk/tika-batch/src/main/java/overview.html Mon Mar 23 16:09:10 2015
@@ -0,0 +1,41 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE html>
+<html>
+    <head lang="en">
+        <meta charset="UTF-8">
+        <title>Tika Batch Module</title>
+    </head>
+    <body>
+
+    <h1>The Batch Module for Apache Tika</h1>
+
+    <p>
+        The batch module is new to Tika in 1.8.  The goal is to enable robust
+        batch processing, with extensibility and logging.
+    </p>
+    <p>
+        This module currently enables file system directory to directory processing.
+        To build out other interfaces, follow the example of BasicTikaFSConsumer and
+        extend FileResourceConsumer.
+    </p>
+    <p>
+        <b>NOTE: This package is new and experimental and is subject to suddenly change in the next release.</b>
+    </p>
+
+    </body>
+</html>
\ No newline at end of file

Added: tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml (added)
+++ tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml Mon Mar 23 16:09:10 2015
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<!-- NOTE: tika-batch is still an experimental feature.
+    The configuration file will likely change and be backward incompatible
+    with new versions of Tika.  Please stay tuned.
+    -->
+
+<tika-batch-config
+        maxAliveTimeSeconds="-1"
+        pauseOnEarlyTerminationMillis="10000"
+        timeoutThresholdMillis="300000"
+        timeoutCheckPulseMillis="1000"
+        maxQueueSize="10000"
+        numConsumers="5">
+
+    <!-- options to allow on the commandline -->
+    <commandline>
+        <option opt="c" longOpt="tika-config" hasArg="true"
+                description="TikaConfig file"/>
+        <option opt="bc" longOpt="batch-config" hasArg="true"
+                description="xml batch config file"/>
+        <!-- We needed sorted for testing.  We added random for performance.
+             Where crawling a directory is slow, it might be beneficial to
+             go randomly so that the parsers are triggered earlier.  The
+             default is operating system's choice ("os") which means whatever order
+             the os returns files in .listFiles(). -->
+        <option opt="crawlOrder" hasArg="true"
+                description="how does the crawler sort the directories and files:
+                                (random|sorted|os)"/>
+        <option opt="numConsumers" hasArg="true"
+                description="number of fileConsumers threads"/>
+        <option opt="maxFileSizeBytes" hasArg="true"
+                description="maximum file size to process; do not process files larger than this"/>
+        <option opt="maxQueueSize" hasArg="true"
+                description="maximum queue size for FileResources"/>
+        <option opt="fileList" hasArg="true"
+                description="file that contains a list of files (relative to inputDir) to process"/>
+        <option opt="fileListEncoding" hasArg="true"
+                description="encoding for fileList"/>
+        <option opt="inputDir" hasArg="true"
+                description="root directory for the files to be processed"/>
+        <option opt="startDir" hasArg="true"
+                description="directory (under inputDir) at which to start crawling"/>
+        <option opt="outputDir" hasArg="true"
+                description="output directory for output"/> <!-- do we want to make this mandatory -->
+        <option opt="recursiveParserWrapper"
+                description="use the RecursiveParserWrapper or not (default = false)"/>
+        <option opt="handleExisting" hasArg="true"
+                description="if an output file already exists, do you want to: overwrite, rename or skip"/>
+        <option opt="basicHandlerType" hasArg="true"
+                description="what type of content handler: xml, text, html, body"/>
+        <option opt="outputSuffix" hasArg="true"
+                description="suffix to add to the end of the output file name"/>
+        <option opt="timeoutThresholdMillis" hasArg="true"
+                description="how long to wait before determining that a consumer is stale"/>
+        <option opt="includeFilePat" hasArg="true"
+                description="regex that specifies which files to process"/>
+        <option opt="excludeFilePat" hasArg="true"
+                description="regex that specifies which files to avoid processing"/>
+    </commandline>
+
+
+    <!-- can specify inputDir="input", but the default config should not include this -->
+    <!-- can also specify startDir="input/someDir" to specify which child directory
+         to start processing -->
+	<crawler builderClass="org.apache.tika.batch.fs.builders.FSCrawlerBuilder"
+        crawlOrder="random"
+		maxFilesToAdd="-1" 
+		maxFilesToConsider="-1" 
+		includeFilePat=""
+		excludeFilePat=""
+		maxFileSizeBytes="-1"
+        />
+<!--
+    This is an example of a crawler that reads a list of files to be processed from a
+    file.  This assumes that the files in the list are relative to inputDir.
+    <crawler class="org.apache.tika.batch.fs.builders.FSCrawlerBuilder"
+             fileList="files.txt"
+             fileListEncoding="UTF-8"
+             maxFilesToAdd="-1"
+             maxFilesToConsider="-1"
+             includeFilePat="(?i).pdf$"
+             excludeFilePat="(?i).msg$"
+             maxFileSizeBytes="-1"
+             inputDir="input"
+    />
+-->
+    <consumers builderClass="org.apache.tika.batch.fs.builders.BasicTikaFSConsumersBuilder"
+               recursiveParserWrapper="false">
+        <parser class="org.apache.tika.batch.AutoDetectParserFactory" parseRecursively="true"/>
+        <contenthandler builderClass="org.apache.tika.batch.builders.DefaultContentHandlerFactoryBuilder"
+                        basicHandlerType="xml" writeLimit="-1"/>
+
+        <!-- overwritePolicy: "skip" a file if output file exists, "rename" a output file, "overwrite" -->
+        <!-- can include e.g. outputDir="output", but we don't want to include this in the default! -->
+        <outputstream class="FSOutputStreamFactory" encoding="UTF-8" outputSuffix="xml"/>
+    </consumers>
+
+    <!-- reporter and interrupter are optional -->
+    <reporter builderClass="org.apache.tika.batch.builders.SimpleLogReporterBuilder" sleepMillis="1000"
+              reporterStaleThresholdMillis="60000"/>
+    <interrupter builderClass="org.apache.tika.batch.builders.InterrupterBuilder"/>
+</tika-batch-config>
\ No newline at end of file

Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java (added)
+++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,48 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import org.apache.commons.cli.Options;
+import org.apache.tika.batch.builders.CommandLineParserBuilder;
+import org.apache.tika.batch.fs.FSBatchTestBase;
+import org.apache.tika.io.IOUtils;
+import org.junit.Test;
+
+
+public class CommandLineParserBuilderTest extends FSBatchTestBase {
+
+    @Test
+    public void testBasic() throws Exception {
+        String configFile = this.getClass().getResource(
+                "/tika-batch-config-test.xml").getFile();
+        InputStream is = null;
+        try {
+            is = new FileInputStream(new File(configFile));
+            CommandLineParserBuilder builder = new CommandLineParserBuilder();
+            Options options = builder.build(is);
+            //TODO: insert actual tests :)
+        } finally {
+            IOUtils.closeQuietly(is);
+        }
+
+    }
+}

Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java (added)
+++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,217 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.io.IOUtils;
+import org.junit.Test;
+
+
+public class BatchDriverTest extends FSBatchTestBase {
+
+    //for debugging, turn logging off/on via resources/log4j.properties for the driver
+    //and log4j_process.properties for the process.
+
+    @Test(timeout = 15000)
+    public void oneHeavyHangTest() throws Exception {
+        //batch runner hits one heavy hang file, keep going
+        File outputDir = getNewOutputDir("daemon-");
+        assertNotNull(outputDir.listFiles());
+        //make sure output directory is empty!
+        assertEquals(0, outputDir.listFiles().length);
+
+        String[] args = getDefaultCommandLineArgsArr("one_heavy_hang", outputDir, null);
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args);
+        driver.execute();
+        assertEquals(0, driver.getNumRestarts());
+        assertFalse(driver.getUserInterrupted());
+        assertEquals(5, outputDir.listFiles().length);
+        assertContains("first test file",
+                FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+
+
+    }
+
+    @Test(timeout = 30000)
+    public void restartOnFullHangTest() throws Exception {
+        //batch runner hits more heavy hangs than threads; needs to restart
+        File outputDir = getNewOutputDir("daemon-");
+
+        //make sure output directory is empty!
+        assertEquals(0, outputDir.listFiles().length);
+
+        String[] args = getDefaultCommandLineArgsArr("heavy_heavy_hangs", outputDir, null);
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args);
+        driver.execute();
+        //could be one or two depending on timing
+        assertTrue(driver.getNumRestarts() > 0);
+        assertFalse(driver.getUserInterrupted());
+        assertContains("first test file",
+                FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+    }
+
+    @Test(timeout = 15000)
+    public void noRestartTest() throws Exception {
+        File outputDir = getNewOutputDir("daemon-");
+
+        //make sure output directory is empty!
+        assertEquals(0, outputDir.listFiles().length);
+
+        String[] args = getDefaultCommandLineArgsArr("no_restart", outputDir, null);
+        String[] mod = Arrays.copyOf(args, args.length + 2);
+        mod[args.length] = "-numConsumers";
+        mod[args.length+1] = "1";
+
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", mod);
+        driver.execute();
+        assertEquals(0, driver.getNumRestarts());
+        assertFalse(driver.getUserInterrupted());
+        File[] files = outputDir.listFiles();
+        assertEquals(2, files.length);
+        File test2 = new File(outputDir, "test2_norestart.xml.xml");
+        assertTrue("test2_norestart.xml", test2.exists());
+        File test3 = new File(outputDir, "test3_ok.xml.xml");
+        assertFalse("test3_ok.xml", test3.exists());
+        assertEquals(0, test3.length());
+    }
+
+    @Test(timeout = 15000)
+    public void restartOnOOMTest() throws Exception {
+        //batch runner hits more heavy hangs than threads; needs to restart
+        File outputDir = getNewOutputDir("daemon-");
+
+        //make sure output directory is empty!
+        assertEquals(0, outputDir.listFiles().length);
+
+        String[] args = getDefaultCommandLineArgsArr("oom", outputDir, null);
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args);
+        driver.execute();
+        assertEquals(1, driver.getNumRestarts());
+        assertFalse(driver.getUserInterrupted());
+        assertContains("first test file",
+                FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+    }
+
+    @Test(timeout = 30000)
+    public void allHeavyHangsTestWithStarvedCrawler() throws Exception {
+        //this tests that if all consumers are hung and the crawler is
+        //waiting to add to the queue, there isn't deadlock.  The BatchProcess should
+        //just shutdown, and the driver should restart
+        File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+        Map<String, String> args = new HashMap<String,String>();
+        args.put("-numConsumers", "2");
+        args.put("-maxQueueSize", "2");
+        String[] commandLine = getDefaultCommandLineArgsArr("heavy_heavy_hangs", outputDir, args);
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+        driver.execute();
+        assertEquals(3, driver.getNumRestarts());
+        assertFalse(driver.getUserInterrupted());
+        assertContains("first test file",
+                FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+    }
+
+    @Test(timeout = 30000)
+    public void maxRestarts() throws Exception {
+        //tests that maxRestarts works
+        //if -maxRestarts is not correctly removed from the commandline,
+        //FSBatchProcessCLI's cli parser will throw an Unrecognized option exception
+
+        File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+        Map<String, String> args = new HashMap<String,String>();
+        args.put("-numConsumers", "1");
+        args.put("-maxQueueSize", "10");
+        args.put("-maxRestarts", "2");
+
+        String[] commandLine = getDefaultCommandLineArgsArr("max_restarts", outputDir, args);
+
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+        driver.execute();
+        assertEquals(2, driver.getNumRestarts());
+        assertFalse(driver.getUserInterrupted());
+        assertEquals(3, outputDir.listFiles().length);
+    }
+
+    @Test(timeout = 30000)
+    public void maxRestartsBadParameter() throws Exception {
+        //tests that maxRestarts must be followed by an Integer
+        File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+        Map<String, String> args = new HashMap<String,String>();
+        args.put("-numConsumers", "1");
+        args.put("-maxQueueSize", "10");
+        args.put("-maxRestarts", "zebra");
+
+        String[] commandLine = getDefaultCommandLineArgsArr("max_restarts", outputDir, args);
+        boolean ex = false;
+        try {
+            BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+            driver.execute();
+        } catch (IllegalArgumentException e) {
+            ex = true;
+        }
+        assertTrue("IllegalArgumentException should have been thrown", ex);
+    }
+
+    @Test(timeout = 30000)
+    public void testNoRestartIfProcessFails() throws Exception {
+        //tests that if something goes horribly wrong with FSBatchProcessCLI
+        //the driver will not restart it again and again
+        //this calls a bad xml file which should trigger a no restart exit.
+        File outputDir = getNewOutputDir("nostart-norestart-");
+        Map<String, String> args = new HashMap<String,String>();
+        args.put("-numConsumers", "1");
+        args.put("-maxQueueSize", "10");
+
+        String[] commandLine = getDefaultCommandLineArgsArr("basic", outputDir, args);
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-broken.xml", commandLine);
+        driver.execute();
+        assertEquals(0, outputDir.listFiles().length);
+        assertEquals(0, driver.getNumRestarts());
+    }
+
+    @Test(timeout = 30000)
+    public void testNoRestartIfProcessFailsTake2() throws Exception {
+        File outputDir = getNewOutputDir("nostart-norestart-");
+        Map<String, String> args = new HashMap<String,String>();
+        args.put("-numConsumers", "1");
+        args.put("-maxQueueSize", "10");
+        args.put("-somethingOrOther", "I don't Know");
+
+        String[] commandLine = getDefaultCommandLineArgsArr("basic", outputDir, args);
+        BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+        driver.execute();
+        assertEquals(0, outputDir.listFiles().length);
+        assertEquals(0, driver.getNumRestarts());
+    }
+
+
+}

Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java (added)
+++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,343 @@
+package org.apache.tika.batch.fs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.batch.BatchProcess;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.io.IOUtils;
+import org.junit.Test;
+
+public class BatchProcessTest extends FSBatchTestBase {
+
+    @Test(timeout = 15000)
+    public void oneHeavyHangTest() throws Exception {
+
+        File outputDir = getNewOutputDir("one_heavy_hang-");
+
+        Map<String, String> args = getDefaultArgs("one_heavy_hang", outputDir);
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+        assertEquals(5, outputDir.listFiles().length);
+        File hvyHang = new File(outputDir, "test0_heavy_hang.xml.xml");
+        assertTrue(hvyHang.exists());
+        assertEquals(0, hvyHang.length());
+        assertNotContained(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+                streamStrings.getErrString());
+    }
+
+
+    @Test(timeout = 15000)
+    public void allHeavyHangsTest() throws Exception {
+        //each of the three threads hits a heavy hang.  The BatchProcess runs into
+        //all timedouts and shuts down.
+        File outputDir = getNewOutputDir("allHeavyHangs-");
+        Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir);
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+
+        assertEquals(3, outputDir.listFiles().length);
+        for (File hvyHang : outputDir.listFiles()){
+            assertTrue(hvyHang.exists());
+            assertEquals("file length for "+hvyHang.getName()+" should be 0, but is: " +hvyHang.length(),
+                    0, hvyHang.length());
+        }
+        assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+                streamStrings.getErrString());
+    }
+
+    @Test(timeout = 30000)
+    public void allHeavyHangsTestWithCrazyNumberConsumersTest() throws Exception {
+        File outputDir = getNewOutputDir("allHeavyHangsCrazyNumberConsumers-");
+        Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir);
+        args.put("numConsumers", "100");
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+        assertEquals(7, outputDir.listFiles().length);
+
+        for (int i = 0; i < 6; i++){
+            File hvyHang = new File(outputDir, "test"+i+"_heavy_hang.xml.xml");
+            assertTrue(hvyHang.exists());
+            assertEquals(0, hvyHang.length());
+        }
+        assertContains("This is tika-batch's first test file",
+                FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+
+        //key that the process realize that there were no more processable files
+        //in the queue and does not ask for a restart!
+        assertNotContained(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+                streamStrings.getErrString());
+    }
+
+    @Test(timeout = 30000)
+    public void allHeavyHangsTestWithStarvedCrawler() throws Exception {
+        //this tests that if all consumers are hung and the crawler is
+        //waiting to add to the queue, there isn't deadlock.  The batchrunner should
+        //shutdown and ask to be restarted.
+        File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+        Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir);
+        args.put("numConsumers", "2");
+        args.put("maxQueueSize", "2");
+        args.put("timeoutThresholdMillis", "100000000");//make sure that the batch process doesn't time out
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+        assertEquals(2, outputDir.listFiles().length);
+
+        for (int i = 0; i < 2; i++){
+            File hvyHang = new File(outputDir, "test"+i+"_heavy_hang.xml.xml");
+            assertTrue(hvyHang.exists());
+            assertEquals(0, hvyHang.length());
+        }
+        assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+                streamStrings.getErrString());
+        assertContains("Crawler timed out", streamStrings.getErrString());
+    }
+
+    @Test(timeout = 15000)
+    public void outOfMemory() throws Exception {
+        //the first consumer should sleep for 10 seconds
+        //the second should be tied up in a heavy hang
+        //the third one should hit the oom after processing test2_ok.xml
+        //no consumers should process test2-4.txt!
+        //i.e. the first consumer will finish in 10 seconds and
+        //then otherwise would be looking for more, but the oom should prevent that
+        File outputDir = getNewOutputDir("oom-");
+
+        Map<String, String> args = getDefaultArgs("oom", outputDir);
+        args.put("numConsumers", "3");
+        args.put("timeoutThresholdMillis", "30000");
+
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+
+        assertEquals(4, outputDir.listFiles().length);
+        assertContains("This is tika-batch's first test file",
+                FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+
+        assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+                streamStrings.getErrString());
+    }
+
+
+
+    @Test(timeout = 15000)
+    public void noRestart() throws Exception {
+        File outputDir = getNewOutputDir("no_restart");
+
+        Map<String, String> args = getDefaultArgs("no_restart", outputDir);
+        args.put("numConsumers", "1");
+
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+
+        StreamStrings streamStrings = ex.execute();
+        File[] files = outputDir.listFiles();
+        File test2 = new File(outputDir, "test2_norestart.xml.xml");
+        assertTrue("test2_norestart.xml", test2.exists());
+        File test3 = new File(outputDir, "test3_ok.xml.xml");
+        assertFalse("test3_ok.xml", test3.exists());
+        assertEquals(0, test3.length());
+        assertContains("exitStatus="+ BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE,
+                streamStrings.getOutString());
+        assertContains("causeForTermination='MAIN_LOOP_EXCEPTION_NO_RESTART'",
+                streamStrings.getOutString());
+    }
+
+    /**
+     * This tests to make sure that BatchProcess waits the appropriate
+     * amount of time on an early termination before stopping.
+     *
+     * If this fails, then interruptible parsers (e.g. those with
+     * nio channels) will be interrupted and there will be corrupted data.
+     */
+    @Test(timeout = 60000)
+    public void testWaitAfterEarlyTermination() throws Exception {
+        File outputDir = getNewOutputDir("wait_after_early_termination");
+
+        Map<String, String> args = getDefaultArgs("wait_after_early_termination", outputDir);
+        args.put("numConsumers", "1");
+        args.put("maxAliveTimeSeconds", "5");//main process loop should stop after 5 seconds
+        args.put("timeoutThresholdMillis", "300000");//effectively never
+        args.put("pauseOnEarlyTerminationMillis", "20000");//let the parser have up to 20 seconds
+
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+
+        StreamStrings streamStrings = ex.execute();
+        File[] files = outputDir.listFiles();
+        assertEquals(1, files.length);
+        assertContains("<p>some content</p>",
+                FileUtils.readFileToString(new File(outputDir, "test0_sleep.xml.xml"),
+                        IOUtils.UTF_8.toString()));
+
+        assertContains("exitStatus="+BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE, streamStrings.getOutString());
+        assertContains("causeForTermination='BATCH_PROCESS_ALIVE_TOO_LONG'",
+                streamStrings.getOutString());
+    }
+
+    @Test(timeout = 60000)
+    public void testTimeOutAfterBeingAskedToShutdown() throws Exception {
+        File outputDir = getNewOutputDir("timeout_after_early_termination");
+
+        Map<String, String> args = getDefaultArgs("timeout_after_early_termination", outputDir);
+        args.put("numConsumers", "1");
+        args.put("maxAliveTimeSeconds", "5");//main process loop should stop after 5 seconds
+        args.put("timeoutThresholdMillis", "10000");
+        args.put("pauseOnEarlyTerminationMillis", "20000");//let the parser have up to 20 seconds
+
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+        File[] files = outputDir.listFiles();
+        assertEquals(1, files.length);
+        assertEquals(0, files[0].length());
+        assertContains("exitStatus="+BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE, streamStrings.getOutString());
+        assertContains("causeForTermination='BATCH_PROCESS_ALIVE_TOO_LONG'",
+                streamStrings.getOutString());
+    }
+
+    @Test(timeout = 10000)
+    public void testRedirectionOfStreams() throws Exception {
+        //test redirection of system.err to system.out
+        File outputDir = getNewOutputDir("noisy_parsers");
+
+        Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir);
+        args.put("numConsumers", "1");
+        args.put("maxAliveTimeSeconds", "20");//main process loop should stop after 5 seconds
+
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+        StreamStrings streamStrings = ex.execute();
+        File[] files = outputDir.listFiles();
+        assertEquals(1, files.length);
+        assertContains("System.out", streamStrings.getOutString());
+        assertContains("System.err", streamStrings.getOutString());
+        assertEquals(0, streamStrings.getErrString().length());
+
+    }
+
+    @Test(timeout = 10000)
+    public void testConsumersManagerInitHang() throws Exception {
+        File outputDir = getNewOutputDir("init_hang");
+
+        Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir);
+        args.put("numConsumers", "1");
+        args.put("hangOnInit", "true");
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args, "/tika-batch-config-MockConsumersBuilder.xml");
+        StreamStrings streamStrings = ex.execute();
+        assertEquals(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, ex.getExitValue());
+        assertContains("causeForTermination='CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART'", streamStrings.getOutString());
+    }
+
+    @Test(timeout = 10000)
+    public void testConsumersManagerShutdownHang() throws Exception {
+        File outputDir = getNewOutputDir("shutdown_hang");
+
+        Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir);
+        args.put("numConsumers", "1");
+        args.put("hangOnShutdown", "true");
+
+        BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args, "/tika-batch-config-MockConsumersBuilder.xml");
+        StreamStrings streamStrings = ex.execute();
+        assertEquals(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, ex.getExitValue());
+        assertContains("ConsumersManager did not shutdown within", streamStrings.getOutString());
+    }
+
+    private class BatchProcessTestExecutor {
+        private final Map<String, String> args;
+        private final String configPath;
+        private int exitValue = Integer.MIN_VALUE;
+
+        public BatchProcessTestExecutor(Map<String, String> args) {
+            this(args, "/tika-batch-config-test.xml");
+        }
+
+        public BatchProcessTestExecutor(Map<String, String> args, String configPath) {
+            this.args = args;
+            this.configPath = configPath;
+        }
+
+        private StreamStrings execute() {
+            Process p = null;
+            try {
+                ProcessBuilder b = getNewBatchRunnerProcess(configPath, args);
+                p = b.start();
+                StringStreamGobbler errorGobbler = new StringStreamGobbler(p.getErrorStream());
+                StringStreamGobbler outGobbler = new StringStreamGobbler(p.getInputStream());
+                Thread errorThread = new Thread(errorGobbler);
+                Thread outThread = new Thread(outGobbler);
+                errorThread.start();
+                outThread.start();
+                while (true) {
+                    try {
+                        exitValue = p.exitValue();
+                        break;
+                    } catch (IllegalThreadStateException e) {
+                        //still going;
+                    }
+                }
+                errorGobbler.stopGobblingAndDie();
+                outGobbler.stopGobblingAndDie();
+                errorThread.interrupt();
+                outThread.interrupt();
+                return new StreamStrings(outGobbler.toString(), errorGobbler.toString());
+            } catch (IOException e) {
+                fail();
+            } finally {
+                destroyProcess(p);
+            }
+            return null;
+        }
+
+        private int getExitValue() {
+            return exitValue;
+        }
+
+    }
+
+    private class StreamStrings {
+        private final String outString;
+        private final String errString;
+
+        private StreamStrings(String outString, String errString) {
+            this.outString = outString;
+            this.errString = errString;
+        }
+
+        private String getOutString() {
+            return outString;
+        }
+
+        private String getErrString() {
+            return errString;
+        }
+
+        @Override
+        public String toString() {
+            return "OUT>>"+outString+"<<\n"+
+                    "ERR>>"+errString+"<<\n";
+        }
+    }
+}