You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2015/03/23 17:09:12 UTC
svn commit: r1668673 [4/6] - in /tika/trunk: ./ tika-app/
tika-app/src/main/java/org/apache/tika/cli/ tika-app/src/main/resources/
tika-app/src/test/java/org/apache/tika/cli/ tika-batch/ tika-batch/src/
tika-batch/src/main/ tika-batch/src/main/examples...
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/RecursiveParserWrapperFSConsumer.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,163 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.log4j.Level;
+import org.apache.tika.batch.BatchNoRestartError;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceConsumer;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.batch.ParserFactory;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.io.IOUtils;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.util.TikaExceptionFilter;
+import org.xml.sax.helpers.DefaultHandler;
+
+/**
+ * Basic FileResourceConsumer that reads files from an input
+ * directory and writes content to the output directory.
+ * <p/>
+ * This tries to catch most of the common exceptions, log them and
+ * store them in the metadata list output.
+ */
+public class RecursiveParserWrapperFSConsumer extends AbstractFSConsumer {
+
+
+ private final ParserFactory parserFactory;
+ private final ContentHandlerFactory contentHandlerFactory;
+ private final OutputStreamFactory fsOSFactory;
+ private final TikaConfig tikaConfig;
+ private String outputEncoding = "UTF-8";
+ //TODO: parameterize this
+ private TikaExceptionFilter exceptionFilter = new TikaExceptionFilter();
+
+
+ public RecursiveParserWrapperFSConsumer(ArrayBlockingQueue<FileResource> queue,
+ ParserFactory parserFactory,
+ ContentHandlerFactory contentHandlerFactory,
+ OutputStreamFactory fsOSFactory, TikaConfig tikaConfig) {
+ super(queue);
+ this.parserFactory = parserFactory;
+ this.contentHandlerFactory = contentHandlerFactory;
+ this.fsOSFactory = fsOSFactory;
+ this.tikaConfig = tikaConfig;
+ }
+
+ @Override
+ public boolean processFileResource(FileResource fileResource) {
+
+ Parser wrapped = parserFactory.getParser(tikaConfig);
+ RecursiveParserWrapper parser = new RecursiveParserWrapper(wrapped, contentHandlerFactory);
+ ParseContext context = new ParseContext();
+
+// if (parseRecursively == true) {
+ context.set(Parser.class, parser);
+// }
+
+ //try to open outputstream first
+ OutputStream os = getOutputStream(fsOSFactory, fileResource);
+
+ if (os == null) {
+ logger.debug("Skipping: " + fileResource.getMetadata().get(FSProperties.FS_REL_PATH));
+ return false;
+ }
+
+ //try to open the inputstream before the parse.
+ //if the parse hangs or throws a nasty exception, at least there will
+ //be a zero byte file there so that the batchrunner can skip that problematic
+ //file during the next run.
+ InputStream is = getInputStream(fileResource);
+ if (is == null) {
+ IOUtils.closeQuietly(os);
+ return false;
+ }
+
+ Throwable thrown = null;
+ List<Metadata> metadataList = null;
+ Metadata containerMetadata = fileResource.getMetadata();
+ try {
+ parse(fileResource.getResourceId(), parser, is, new DefaultHandler(),
+ containerMetadata, context);
+ metadataList = parser.getMetadata();
+ } catch (Throwable t) {
+ thrown = t;
+ metadataList = parser.getMetadata();
+ if (metadataList == null) {
+ metadataList = new LinkedList<Metadata>();
+ }
+ Metadata m = null;
+ if (metadataList.size() == 0) {
+ m = containerMetadata;
+ } else {
+ //take the top metadata item
+ m = metadataList.remove(0);
+ }
+ String stackTrace = exceptionFilter.getStackTrace(t);
+ m.add(TikaCoreProperties.TIKA_META_EXCEPTION_PREFIX+"runtime", stackTrace);
+ metadataList.add(0, m);
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
+
+ Writer writer = null;
+
+ try {
+ writer = new OutputStreamWriter(os, getOutputEncoding());
+ JsonMetadataList.toJson(metadataList, writer);
+ } catch (Exception e) {
+ logWithResourceId(Level.ERROR, "json_ex",
+ fileResource.getResourceId(), e);
+ } finally {
+ flushAndClose(writer);
+ }
+
+ if (thrown != null) {
+ if (thrown instanceof Error) {
+ throw (Error) thrown;
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public String getOutputEncoding() {
+ return outputEncoding;
+ }
+
+ public void setOutputEncoding(String outputEncoding) {
+ this.outputEncoding = outputEncoding;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.batch.fs.builders;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.batch.ConsumersManager;
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceConsumer;
+import org.apache.tika.batch.OutputStreamFactory;
+import org.apache.tika.batch.ParserFactory;
+import org.apache.tika.batch.builders.AbstractConsumersBuilder;
+import org.apache.tika.batch.builders.BatchProcessBuilder;
+import org.apache.tika.batch.builders.IContentHandlerFactoryBuilder;
+import org.apache.tika.batch.fs.BasicTikaFSConsumer;
+import org.apache.tika.batch.fs.FSConsumersManager;
+import org.apache.tika.batch.fs.FSOutputStreamFactory;
+import org.apache.tika.batch.fs.FSUtil;
+import org.apache.tika.batch.fs.RecursiveParserWrapperFSConsumer;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.sax.ContentHandlerFactory;
+import org.apache.tika.util.ClassLoaderUtil;
+import org.apache.tika.util.PropsUtil;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class BasicTikaFSConsumersBuilder extends AbstractConsumersBuilder {
+
+ @Override
+ public ConsumersManager build(Node node, Map<String, String> runtimeAttributes,
+ ArrayBlockingQueue<FileResource> queue) {
+
+ //figure out if we're building a recursiveParserWrapper
+ boolean recursiveParserWrapper = false;
+ String recursiveParserWrapperString = runtimeAttributes.get("recursiveParserWrapper");
+ if (recursiveParserWrapperString != null){
+ recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperString, recursiveParserWrapper);
+ } else {
+ Node recursiveParserWrapperNode = node.getAttributes().getNamedItem("recursiveParserWrapper");
+ if (recursiveParserWrapperNode != null) {
+ recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperNode.getNodeValue(), recursiveParserWrapper);
+ }
+ }
+
+ //how long to let the consumersManager run on init() and shutdown()
+ Long consumersManagerMaxMillis = null;
+ String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis");
+ if (consumersManagerMaxMillisString != null){
+ consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisString, null);
+ } else {
+ Node consumersManagerMaxMillisNode = node.getAttributes().getNamedItem("consumersManagerMaxMillis");
+ if (consumersManagerMaxMillis == null) {
+ consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisNode.getNodeValue(),
+ null);
+ }
+ }
+
+ TikaConfig config = null;
+ String tikaConfigPath = runtimeAttributes.get("c");
+
+ if( tikaConfigPath == null) {
+ Node tikaConfigNode = node.getAttributes().getNamedItem("tikaConfig");
+ if (tikaConfigNode != null) {
+ tikaConfigPath = PropsUtil.getString(tikaConfigNode.getNodeValue(), null);
+ }
+ }
+ if (tikaConfigPath != null) {
+ try {
+ config = new TikaConfig(new File(tikaConfigPath));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ config = TikaConfig.getDefaultConfig();
+ }
+
+ List<FileResourceConsumer> consumers = new LinkedList<FileResourceConsumer>();
+ int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes);
+
+ NodeList nodeList = node.getChildNodes();
+ Node contentHandlerFactoryNode = null;
+ Node parserFactoryNode = null;
+ Node outputStreamFactoryNode = null;
+
+ for (int i = 0; i < nodeList.getLength(); i++){
+ Node child = nodeList.item(i);
+ String cn = child.getNodeName();
+ if (cn.equals("parser")){
+ parserFactoryNode = child;
+ } else if (cn.equals("contenthandler")) {
+ contentHandlerFactoryNode = child;
+ } else if (cn.equals("outputstream")) {
+ outputStreamFactoryNode = child;
+ }
+ }
+
+ if (contentHandlerFactoryNode == null || parserFactoryNode == null
+ || outputStreamFactoryNode == null) {
+ throw new RuntimeException("You must specify a ContentHandlerFactory, "+
+ "a ParserFactory and an OutputStreamFactory");
+ }
+ ContentHandlerFactory contentHandlerFactory = getContentHandlerFactory(contentHandlerFactoryNode, runtimeAttributes);
+ ParserFactory parserFactory = getParserFactory(parserFactoryNode, runtimeAttributes);
+ OutputStreamFactory outputStreamFactory = getOutputStreamFactory(outputStreamFactoryNode, runtimeAttributes);
+
+ if (recursiveParserWrapper) {
+ for (int i = 0; i < numConsumers; i++) {
+ FileResourceConsumer c = new RecursiveParserWrapperFSConsumer(queue,
+ parserFactory, contentHandlerFactory, outputStreamFactory, config);
+ consumers.add(c);
+ }
+ } else {
+ for (int i = 0; i < numConsumers; i++) {
+ FileResourceConsumer c = new BasicTikaFSConsumer(queue,
+ parserFactory, contentHandlerFactory, outputStreamFactory, config);
+ consumers.add(c);
+ }
+ }
+ ConsumersManager manager = new FSConsumersManager(consumers);
+ if (consumersManagerMaxMillis != null) {
+ manager.setConsumersManagerMaxMillis(consumersManagerMaxMillis);
+ }
+ return manager;
+ }
+
+
+ private ContentHandlerFactory getContentHandlerFactory(Node node, Map<String, String> runtimeAttributes) {
+
+ Map<String, String> localAttrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+ String className = localAttrs.get("builderClass");
+ if (className == null) {
+ throw new RuntimeException("Must specify builderClass for contentHandler");
+ }
+ IContentHandlerFactoryBuilder builder = ClassLoaderUtil.buildClass(IContentHandlerFactoryBuilder.class, className);
+ return builder.build(node, runtimeAttributes);
+ }
+
+ private ParserFactory getParserFactory(Node node, Map<String, String> runtimeAttributes) {
+ //TODO: add ability to set TikaConfig file path
+ Map<String, String> localAttrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+ String className = localAttrs.get("class");
+ return ClassLoaderUtil.buildClass(ParserFactory.class, className);
+ }
+
+ private OutputStreamFactory getOutputStreamFactory(Node node, Map<String, String> runtimeAttributes) {
+ Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+
+ File outputDir = PropsUtil.getFile(attrs.get("outputDir"), null);
+/* FSUtil.HANDLE_EXISTING handleExisting = null;
+ String handleExistingString = attrs.get("handleExisting");
+ if (handleExistingString == null) {
+ handleExistingException();
+ } else if (handleExistingString.equals("overwrite")){
+ handleExisting = FSUtil.HANDLE_EXISTING.OVERWRITE;
+ } else if (handleExistingString.equals("rename")) {
+ handleExisting = FSUtil.HANDLE_EXISTING.RENAME;
+ } else if (handleExistingString.equals("skip")) {
+ handleExisting = FSUtil.HANDLE_EXISTING.SKIP;
+ } else {
+ handleExistingException();
+ }
+*/
+ String compressionString = attrs.get("compression");
+ FSOutputStreamFactory.COMPRESSION compression = FSOutputStreamFactory.COMPRESSION.NONE;
+ if (compressionString == null) {
+ //do nothing
+ } else if (compressionString.contains("bz")) {
+ compression = FSOutputStreamFactory.COMPRESSION.BZIP2;
+ } else if (compressionString.contains("gz")) {
+ compression = FSOutputStreamFactory.COMPRESSION.GZIP;
+ } else if (compressionString.contains("zip")) {
+ compression = FSOutputStreamFactory.COMPRESSION.ZIP;
+ }
+ String suffix = attrs.get("outputSuffix");
+
+ //TODO: possibly open up the different handle existings in the future
+ //but for now, lock it down to require skip. Too dangerous otherwise
+ //if the driver restarts and this is set to overwrite...
+ return new FSOutputStreamFactory(outputDir, FSUtil.HANDLE_EXISTING.SKIP,
+ compression, suffix);
+ }
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/builders/FSCrawlerBuilder.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,129 @@
+package org.apache.tika.batch.fs.builders;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import java.io.File;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.regex.Pattern;
+
+import org.apache.tika.batch.FileResource;
+import org.apache.tika.batch.FileResourceCrawler;
+import org.apache.tika.batch.builders.BatchProcessBuilder;
+import org.apache.tika.batch.builders.ICrawlerBuilder;
+import org.apache.tika.batch.fs.FSDirectoryCrawler;
+import org.apache.tika.batch.fs.FSDocumentSelector;
+import org.apache.tika.extractor.DocumentSelector;
+import org.apache.tika.util.PropsUtil;
+import org.apache.tika.util.XMLDOMUtil;
+import org.w3c.dom.Node;
+
+/**
+ * Builds either an FSDirectoryCrawler or an FSListCrawler.
+ */
+public class FSCrawlerBuilder implements ICrawlerBuilder {
+
+ private final static String MAX_CONSEC_WAIT_MILLIS = "maxConsecWaitMillis";
+ private final static String MAX_FILES_TO_ADD_ATTR = "maxFilesToAdd";
+ private final static String MAX_FILES_TO_CONSIDER_ATTR = "maxFilesToConsider";
+
+
+ private final static String CRAWL_ORDER = "crawlOrder";
+ private final static String INPUT_DIR_ATTR = "inputDir";
+ private final static String INPUT_START_DIR_ATTR = "startDir";
+ private final static String MAX_FILE_SIZE_BYTES_ATTR = "maxFileSizeBytes";
+ private final static String MIN_FILE_SIZE_BYTES_ATTR = "minFileSizeBytes";
+
+
+ private final static String INCLUDE_FILE_PAT_ATTR = "includeFilePat";
+ private final static String EXCLUDE_FILE_PAT_ATTR = "excludeFilePat";
+
+ @Override
+ public FileResourceCrawler build(Node node, Map<String, String> runtimeAttributes,
+ ArrayBlockingQueue<FileResource> queue) {
+
+ Map<String, String> attributes = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
+
+ int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes);
+ File inputDir = PropsUtil.getFile(attributes.get(INPUT_DIR_ATTR), new File("input"));
+ FileResourceCrawler crawler = null;
+ if (attributes.containsKey("fileList")) {
+ String randomCrawlString = attributes.get(CRAWL_ORDER);
+
+ if (randomCrawlString != null) {
+ //TODO: change to logger warn or throw RuntimeException?
+ System.err.println("randomCrawl attribute is ignored by FSListCrawler");
+ }
+ File fileList = PropsUtil.getFile(attributes.get("fileList"), null);
+ String encoding = PropsUtil.getString(attributes.get("fileListEncoding"), "UTF-8");
+ try {
+ crawler = new org.apache.tika.batch.fs.FSListCrawler(queue, numConsumers, inputDir, fileList, encoding);
+ } catch (java.io.FileNotFoundException e) {
+ throw new RuntimeException("fileList file not found for FSListCrawler: " + fileList.getAbsolutePath());
+ } catch (java.io.UnsupportedEncodingException e) {
+ throw new RuntimeException("fileList encoding not supported: "+encoding);
+ }
+ } else {
+ FSDirectoryCrawler.CRAWL_ORDER crawlOrder = getCrawlOrder(attributes.get(CRAWL_ORDER));
+ File startDir = PropsUtil.getFile(attributes.get(INPUT_START_DIR_ATTR), null);
+ if (startDir == null) {
+ crawler = new FSDirectoryCrawler(queue, numConsumers, inputDir, crawlOrder);
+ } else {
+ crawler = new FSDirectoryCrawler(queue, numConsumers, inputDir, startDir, crawlOrder);
+ }
+ }
+
+ crawler.setMaxFilesToConsider(PropsUtil.getInt(attributes.get(MAX_FILES_TO_CONSIDER_ATTR), -1));
+ crawler.setMaxFilesToAdd(PropsUtil.getInt(attributes.get(MAX_FILES_TO_ADD_ATTR), -1));
+
+ DocumentSelector selector = buildSelector(attributes);
+ if (selector != null) {
+ crawler.setDocumentSelector(selector);
+ }
+
+ crawler.setMaxConsecWaitInMillis(PropsUtil.getLong(attributes.get(MAX_CONSEC_WAIT_MILLIS), 300000L));//5 minutes
+ return crawler;
+ }
+
+ private FSDirectoryCrawler.CRAWL_ORDER getCrawlOrder(String s) {
+ if (s == null || s.trim().length() == 0 || s.equals("os")) {
+ return FSDirectoryCrawler.CRAWL_ORDER.OS_ORDER;
+ } else if (s.toLowerCase(Locale.ROOT).contains("rand")) {
+ return FSDirectoryCrawler.CRAWL_ORDER.RANDOM;
+ } else if (s.toLowerCase(Locale.ROOT).contains("sort")) {
+ return FSDirectoryCrawler.CRAWL_ORDER.SORTED;
+ } else {
+ return FSDirectoryCrawler.CRAWL_ORDER.OS_ORDER;
+ }
+ }
+
+ private DocumentSelector buildSelector(Map<String, String> attributes) {
+ String includeString = attributes.get(INCLUDE_FILE_PAT_ATTR);
+ String excludeString = attributes.get(EXCLUDE_FILE_PAT_ATTR);
+ long maxFileSize = PropsUtil.getLong(attributes.get(MAX_FILE_SIZE_BYTES_ATTR), -1L);
+ long minFileSize = PropsUtil.getLong(attributes.get(MIN_FILE_SIZE_BYTES_ATTR), -1L);
+ Pattern includePat = (includeString != null && includeString.length() > 0) ? Pattern.compile(includeString) : null;
+ Pattern excludePat = (excludeString != null && excludeString.length() > 0) ? Pattern.compile(excludeString) : null;
+
+ return new FSDocumentSelector(includePat, excludePat, minFileSize, maxFileSize);
+ }
+
+
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/fs/strawman/StrawManTikaAppDriver.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,249 @@
+package org.apache.tika.batch.fs.strawman;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.tika.io.IOUtils;
+
+/**
+ * Simple single-threaded class that calls tika-app against every file in a directory.
+ *
+ * This is exceedingly robust. One file per process.
+ *
+ * However, you can use this to compare performance against tika-batch fs code.
+ *
+ *
+ */
+public class StrawManTikaAppDriver implements Callable<Integer> {
+
+ private static AtomicInteger threadCount = new AtomicInteger(0);
+ private final int totalThreads;
+ private final int threadNum;
+ private int rootLen = -1;
+ private File inputDir = null;
+ private File outputDir = null;
+ private String[] args = null;
+ private Logger logger = Logger.getLogger(StrawManTikaAppDriver.class);
+
+
+ public StrawManTikaAppDriver(File inputDir, File outputDir, int totalThreads, String[] args) {
+ rootLen = inputDir.getAbsolutePath().length()+1;
+ this.inputDir = inputDir;
+ this.outputDir = outputDir;
+ this.args = args;
+ threadNum = threadCount.getAndIncrement();
+ this.totalThreads = totalThreads;
+ }
+
+
+ private int processDirectory(File inputDir) {
+ int processed = 0;
+ if (inputDir == null || inputDir.listFiles() == null) {
+ return processed;
+ }
+ for (File f : inputDir.listFiles()) {
+ List<File> childDirs = new ArrayList<File>();
+ if (f.isDirectory()) {
+ childDirs.add(f);
+ } else {
+ processed += processFile(f);
+ }
+ for (File dir : childDirs) {
+ processed += processDirectory(dir);
+
+ }
+ }
+ return processed;
+ }
+
+ private int processFile(File f) {
+ if (totalThreads > 1) {
+ int hashCode = f.getAbsolutePath().hashCode();
+ if (Math.abs(hashCode % totalThreads) != threadNum) {
+ return 0;
+ }
+ }
+ File outputFile = new File(outputDir, f.getAbsolutePath().substring(rootLen)+".txt");
+ outputFile.getAbsoluteFile().getParentFile().mkdirs();
+ if (! outputFile.getParentFile().exists()) {
+ logger.fatal("parent directory for "+ outputFile + " was not made!");
+ throw new RuntimeException("couldn't make parent file for " + outputFile);
+ }
+ List<String> commandLine = new ArrayList<String>();
+ for (String arg : args) {
+ commandLine.add(arg);
+ }
+ commandLine.add("-t");
+ commandLine.add("\""+f.getAbsolutePath()+"\"");
+ ProcessBuilder builder = new ProcessBuilder(commandLine.toArray(new String[commandLine.size()]));
+ logger.info("about to process: "+f.getAbsolutePath());
+ Process proc = null;
+ RedirectGobbler gobbler = null;
+ Thread gobblerThread = null;
+ try {
+ OutputStream os = new FileOutputStream(outputFile);
+ proc = builder.start();
+ gobbler = new RedirectGobbler(proc.getInputStream(), os);
+ gobblerThread = new Thread(gobbler);
+ gobblerThread.start();
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ return 0;
+ }
+
+ boolean finished = false;
+ long totalTime = 180000;//3 minutes
+ long pulse = 100;
+ for (int i = 0; i < totalTime; i += pulse) {
+ try {
+ Thread.currentThread().sleep(pulse);
+ } catch (InterruptedException e) {
+ //swallow
+ }
+ try {
+ int exit = proc.exitValue();
+ finished = true;
+ break;
+ } catch (IllegalThreadStateException e) {
+ //swallow
+ }
+ }
+ if (!finished) {
+ logger.warn("Had to kill process working on: " + f.getAbsolutePath());
+ proc.destroy();
+ }
+ gobbler.close();
+ gobblerThread.interrupt();
+ return 1;
+ }
+
+
+ @Override
+ public Integer call() throws Exception {
+ long start = new Date().getTime();
+
+ int processed = processDirectory(inputDir);
+ double elapsedSecs = ((double)new Date().getTime()-(double)start)/(double)1000;
+ logger.info("Finished processing " + processed + " files in " + elapsedSecs + " seconds.");
+ return processed;
+ }
+
+ private class RedirectGobbler implements Runnable {
+ private OutputStream redirectOs = null;
+ private InputStream redirectIs = null;
+
+ private RedirectGobbler(InputStream is, OutputStream os) {
+ this.redirectIs = is;
+ this.redirectOs = os;
+ }
+
+ private void close() {
+ if (redirectOs != null) {
+ try {
+ redirectOs.flush();
+ } catch (IOException e) {
+ logger.error("can't flush");
+ }
+ try {
+ redirectIs.close();
+ } catch (IOException e) {
+ logger.error("can't close input in redirect gobbler");
+ }
+ try {
+ redirectOs.close();
+ } catch (IOException e) {
+ logger.error("can't close output in redirect gobbler");
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ IOUtils.copy(redirectIs, redirectOs);
+ } catch (IOException e) {
+ logger.error("IOException while gobbling");
+ }
+ }
+ }
+
+ public static String usage() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Example usage:\n");
+ sb.append("java -cp <CP> org.apache.batch.fs.strawman.StrawManTikaAppDriver ");
+ sb.append("<inputDir> <outputDir> <numThreads> ");
+ sb.append("java -jar tika-app-X.Xjar <...commandline arguments for tika-app>\n\n");
+ return sb.toString();
+ }
+
+ public static void main(String[] args) {
+ long start = new Date().getTime();
+ if (args.length < 6) {
+ System.err.println(StrawManTikaAppDriver.usage());
+ }
+ File inputDir = new File(args[0]);
+ File outputDir = new File(args[1]);
+ int totalThreads = Integer.parseInt(args[2]);
+
+ List<String> commandLine = new ArrayList<String>();
+ commandLine.addAll(Arrays.asList(args).subList(3, args.length));
+ totalThreads = (totalThreads < 1) ? 1 : totalThreads;
+ ExecutorService ex = Executors.newFixedThreadPool(totalThreads);
+ ExecutorCompletionService<Integer> completionService =
+ new ExecutorCompletionService<Integer>(ex);
+
+ for (int i = 0; i < totalThreads; i++) {
+ StrawManTikaAppDriver driver =
+ new StrawManTikaAppDriver(inputDir, outputDir, totalThreads, commandLine.toArray(new String[commandLine.size()]));
+ completionService.submit(driver);
+ }
+
+ int totalFilesProcessed = 0;
+ for (int i = 0; i < totalThreads; i++) {
+ try {
+ Future<Integer> future = completionService.take();
+ if (future != null) {
+ totalFilesProcessed += future.get();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ double elapsedSeconds = (double)(new Date().getTime()-start)/(double)1000;
+ System.out.println("Processed "+totalFilesProcessed + " in " + elapsedSeconds + " seconds");
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/ClassLoaderUtil.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,41 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class ClassLoaderUtil {
+
+ @SuppressWarnings("unchecked")
+ public static <T> T buildClass(Class<T> iface, String className) {
+
+ ClassLoader loader = ClassLoader.getSystemClassLoader();
+ Class<?> clazz;
+ try {
+ clazz = loader.loadClass(className);
+ if (iface.isAssignableFrom(clazz)) {
+ return (T) clazz.newInstance();
+ }
+ throw new IllegalArgumentException(iface.toString() + " is not assignable from " + className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/DurationFormatUtils.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,66 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Functionality and naming conventions (roughly) copied from org.apache.commons.lang3
+ * so that we didn't have to add another dependency.
+ */
+public class DurationFormatUtils {
+
+ public static String formatMillis(long duration) {
+ duration = Math.abs(duration);
+ StringBuilder sb = new StringBuilder();
+ int secs = (int) (duration / 1000) % 60;
+ int mins = (int) ((duration / (1000 * 60)) % 60);
+ int hrs = (int) ((duration / (1000 * 60 * 60)) % 24);
+ int days = (int) ((duration / (1000 * 60 * 60 * 24)) % 7);
+
+ //sb.append(millis + " milliseconds");
+ addUnitString(sb, days, "day");
+ addUnitString(sb, hrs, "hour");
+ addUnitString(sb, mins, "minute");
+ addUnitString(sb, secs, "second");
+ if (duration < 1000) {
+ addUnitString(sb, duration, "millisecond");
+ }
+
+ return sb.toString();
+ }
+
+ private static void addUnitString(StringBuilder sb, long unit, String unitString) {
+ //only add unit if >= 1
+ if (unit == 1) {
+ addComma(sb);
+ sb.append("1 ");
+ sb.append(unitString);
+ } else if (unit > 1) {
+ addComma(sb);
+ sb.append(unit);
+ sb.append(" ");
+ sb.append(unitString);
+ sb.append("s");
+ }
+ }
+
+ private static void addComma(StringBuilder sb) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/PropsUtil.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,123 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.Locale;
+
+/**
+ * Utility class to handle properties. If the value is null,
+ * or if there is a parser error, the defaultMissing value will be returned.
+ */
+public class PropsUtil {
+
+ /**
+ * Parses v. If there is a problem, this returns defaultMissing.
+ *
+ * @param v string to parse
+ * @param defaultMissing value to return if value is null or unparseable
+ * @return parsed value
+ */
+ public static Boolean getBoolean(String v, Boolean defaultMissing) {
+ if (v == null || v.length() == 0) {
+ return defaultMissing;
+ }
+ if (v.toLowerCase(Locale.ROOT).equals("true")) {
+ return true;
+ }
+ if (v.toLowerCase(Locale.ROOT).equals("false")) {
+ return false;
+ }
+ return defaultMissing;
+ }
+
+ /**
+ * Parses v. If there is a problem, this returns defaultMissing.
+ *
+ * @param v string to parse
+ * @param defaultMissing value to return if value is null or unparseable
+ * @return parsed value
+ */
+ public static Integer getInt(String v, Integer defaultMissing) {
+ if (v == null || v.length() == 0) {
+ return defaultMissing;
+ }
+ try {
+ return Integer.parseInt(v);
+ } catch (NumberFormatException e) {
+ //NO OP
+ }
+ return defaultMissing;
+ }
+
+ /**
+ * Parses v. If there is a problem, this returns defaultMissing.
+ *
+ * @param v string to parse
+ * @param defaultMissing value to return if value is null or unparseable
+ * @return parsed value
+ */
+ public static Long getLong(String v, Long defaultMissing) {
+ if (v == null || v.length() == 0) {
+ return defaultMissing;
+ }
+ try {
+ return Long.parseLong(v);
+ } catch (NumberFormatException e) {
+ //swallow
+ }
+ return defaultMissing;
+ }
+
+
+ /**
+ * Parses v. If there is a problem, this returns defaultMissing.
+ *
+ * @param v string to parse
+ * @param defaultMissing value to return if value is null or unparseable
+ * @return parsed value
+ */
+ public static File getFile(String v, File defaultMissing) {
+ if (v == null || v.length() == 0) {
+ return defaultMissing;
+ }
+ //trim initial and final " if they exist
+ if (v.startsWith("\"")) {
+ v = v.substring(1);
+ }
+ if (v.endsWith("\"")) {
+ v = v.substring(0, v.length()-1);
+ }
+
+ return new File(v);
+ }
+
+ /**
+ * Parses v. If v is null, this returns defaultMissing.
+ *
+ * @param v string to parse
+ * @param defaultMissing value to return if value is null
+ * @return parsed value
+ */
+ public static String getString(String v, String defaultMissing) {
+ if (v == null) {
+ return defaultMissing;
+ }
+ return v;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/TikaExceptionFilter.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,63 @@
+package org.apache.tika.util;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.apache.tika.exception.TikaException;
+
+/**
+ * Unwrap TikaExceptions and other wrappers that we might not care about
+ * in downstream analysis. This is similar to
+ * what tika-server does when returning stack traces.
+ */
+public class TikaExceptionFilter {
+
+ /**
+ * Unwrap TikaExceptions and other wrappers that users might not
+ * care about in downstream analysis.
+ *
+ * @param t throwable to filter
+ * @return filtered throwable
+ */
+ public Throwable filter(Throwable t) {
+ if (t instanceof TikaException) {
+ Throwable cause = t.getCause();
+ if (cause != null) {
+ return cause;
+ }
+ }
+ return t;
+ }
+
+ /**
+ * This calls {@link #filter} and then prints the filtered
+ * <code>Throwable</code>to a <code>String</code>.
+ *
+ * @param t throwable
+ * @return a filtered version of the StackTrace
+ */
+ public String getStackTrace(Throwable t) {
+ Throwable filtered = filter(t);
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter w = new PrintWriter(stringWriter);
+ filtered.printStackTrace(w);
+ stringWriter.flush();
+ return stringWriter.toString();
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java (added)
+++ tika/trunk/tika-batch/src/main/java/org/apache/tika/util/XMLDOMUtil.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,109 @@
+package org.apache.tika.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+
+public class XMLDOMUtil {
+
+ /**
+ * This grabs the attributes from a dom node and overwrites those values with those
+ * specified by the overwrite map.
+ *
+ * @param node node for building
+ * @param overwrite map of attributes to overwrite
+ * @return map of attributes
+ */
+ public static Map<String, String> mapifyAttrs(Node node, Map<String, String> overwrite) {
+ Map<String, String> map = new HashMap<String, String>();
+ NamedNodeMap nnMap = node.getAttributes();
+ for (int i = 0; i < nnMap.getLength(); i++) {
+ Node attr = nnMap.item(i);
+ map.put(attr.getNodeName(), attr.getNodeValue());
+ }
+ if (overwrite != null) {
+ for (Map.Entry<String, String> e : overwrite.entrySet()) {
+ map.put(e.getKey(), e.getValue());
+ }
+ }
+ return map;
+ }
+
+
+ /**
+ * Get an int value. Try the runtime attributes first and then back off to
+ * the document element. Throw a RuntimeException if the attribute is not
+ * found or if the value is not parseable as an int.
+ *
+ * @param attrName attribute name to find
+ * @param runtimeAttributes runtime attributes
+ * @param docElement correct element that should have specified attribute
+ * @return specified int value
+ */
+ public static int getInt(String attrName, Map<String, String> runtimeAttributes, Node docElement) {
+ String stringValue = getStringValue(attrName, runtimeAttributes, docElement);
+ if (stringValue != null) {
+ try {
+ return Integer.parseInt(stringValue);
+ } catch (NumberFormatException e) {
+ //swallow
+ }
+ }
+ throw new RuntimeException("Need to specify a parseable int value in -- "
+ +attrName+" -- in commandline or in config file!");
+ }
+
+
+ /**
+ * Get a long value. Try the runtime attributes first and then back off to
+ * the document element. Throw a RuntimeException if the attribute is not
+ * found or if the value is not parseable as a long.
+ *
+ * @param attrName attribute name to find
+ * @param runtimeAttributes runtime attributes
+ * @param docElement correct element that should have specified attribute
+ * @return specified long value
+ */
+ public static long getLong(String attrName, Map<String, String> runtimeAttributes, Node docElement) {
+ String stringValue = getStringValue(attrName, runtimeAttributes, docElement);
+ if (stringValue != null) {
+ try {
+ return Long.parseLong(stringValue);
+ } catch (NumberFormatException e) {
+ //swallow
+ }
+ }
+ throw new RuntimeException("Need to specify a \"long\" value in -- "
+ +attrName+" -- in commandline or in config file!");
+ }
+
+ private static String getStringValue(String attrName, Map<String, String> runtimeAttributes, Node docElement) {
+ String stringValue = runtimeAttributes.get(attrName);
+ if (stringValue == null) {
+ Node staleNode = docElement.getAttributes().getNamedItem(attrName);
+ if (staleNode != null) {
+ stringValue = staleNode.getNodeValue();
+ }
+ }
+ return stringValue;
+ }
+}
Added: tika/trunk/tika-batch/src/main/java/overview.html
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/overview.html?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/java/overview.html (added)
+++ tika/trunk/tika-batch/src/main/java/overview.html Mon Mar 23 16:09:10 2015
@@ -0,0 +1,41 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE html>
+<html>
+ <head lang="en">
+ <meta charset="UTF-8">
+ <title>Tika Batch Module</title>
+ </head>
+ <body>
+
+ <h1>The Batch Module for Apache Tika</h1>
+
+ <p>
+ The batch module is new to Tika in 1.8. The goal is to enable robust
+ batch processing, with extensibility and logging.
+ </p>
+ <p>
+ This module currently enables file system directory to directory processing.
+ To build out other interfaces, follow the example of BasicTikaFSConsumer and
+ extend FileResourceConsumer.
+ </p>
+ <p>
+ <b>NOTE: This package is new and experimental and is subject to suddenly change in the next release.</b>
+ </p>
+
+ </body>
+</html>
\ No newline at end of file
Added: tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml (added)
+++ tika/trunk/tika-batch/src/main/resources/org/apache/tika/batch/fs/default-tika-batch-config.xml Mon Mar 23 16:09:10 2015
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!-- NOTE: tika-batch is still an experimental feature.
+ The configuration file will likely change and be backward incompatible
+ with new versions of Tika. Please stay tuned.
+ -->
+
+<tika-batch-config
+ maxAliveTimeSeconds="-1"
+ pauseOnEarlyTerminationMillis="10000"
+ timeoutThresholdMillis="300000"
+ timeoutCheckPulseMillis="1000"
+ maxQueueSize="10000"
+ numConsumers="5">
+
+ <!-- options to allow on the commandline -->
+ <commandline>
+ <option opt="c" longOpt="tika-config" hasArg="true"
+ description="TikaConfig file"/>
+ <option opt="bc" longOpt="batch-config" hasArg="true"
+ description="xml batch config file"/>
+ <!-- We needed sorted for testing. We added random for performance.
+ Where crawling a directory is slow, it might be beneficial to
+ go randomly so that the parsers are triggered earlier. The
+ default is operating system's choice ("os") which means whatever order
+ the os returns files in .listFiles(). -->
+ <option opt="crawlOrder" hasArg="true"
+ description="how does the crawler sort the directories and files:
+ (random|sorted|os)"/>
+ <option opt="numConsumers" hasArg="true"
+ description="number of fileConsumers threads"/>
+ <option opt="maxFileSizeBytes" hasArg="true"
+ description="maximum file size to process; do not process files larger than this"/>
+ <option opt="maxQueueSize" hasArg="true"
+ description="maximum queue size for FileResources"/>
+ <option opt="fileList" hasArg="true"
+ description="file that contains a list of files (relative to inputDir) to process"/>
+ <option opt="fileListEncoding" hasArg="true"
+ description="encoding for fileList"/>
+ <option opt="inputDir" hasArg="true"
+ description="root directory for the files to be processed"/>
+ <option opt="startDir" hasArg="true"
+ description="directory (under inputDir) at which to start crawling"/>
+ <option opt="outputDir" hasArg="true"
+ description="output directory for output"/> <!-- do we want to make this mandatory -->
+ <option opt="recursiveParserWrapper"
+ description="use the RecursiveParserWrapper or not (default = false)"/>
+ <option opt="handleExisting" hasArg="true"
+ description="if an output file already exists, do you want to: overwrite, rename or skip"/>
+ <option opt="basicHandlerType" hasArg="true"
+ description="what type of content handler: xml, text, html, body"/>
+ <option opt="outputSuffix" hasArg="true"
+ description="suffix to add to the end of the output file name"/>
+ <option opt="timeoutThresholdMillis" hasArg="true"
+ description="how long to wait before determining that a consumer is stale"/>
+ <option opt="includeFilePat" hasArg="true"
+ description="regex that specifies which files to process"/>
+ <option opt="excludeFilePat" hasArg="true"
+ description="regex that specifies which files to avoid processing"/>
+ </commandline>
+
+
+ <!-- can specify inputDir="input", but the default config should not include this -->
+ <!-- can also specify startDir="input/someDir" to specify which child directory
+ to start processing -->
+ <crawler builderClass="org.apache.tika.batch.fs.builders.FSCrawlerBuilder"
+ crawlOrder="random"
+ maxFilesToAdd="-1"
+ maxFilesToConsider="-1"
+ includeFilePat=""
+ excludeFilePat=""
+ maxFileSizeBytes="-1"
+ />
+<!--
+ This is an example of a crawler that reads a list of files to be processed from a
+ file. This assumes that the files in the list are relative to inputDir.
+ <crawler class="org.apache.tika.batch.fs.builders.FSCrawlerBuilder"
+ fileList="files.txt"
+ fileListEncoding="UTF-8"
+ maxFilesToAdd="-1"
+ maxFilesToConsider="-1"
+ includeFilePat="(?i).pdf$"
+ excludeFilePat="(?i).msg$"
+ maxFileSizeBytes="-1"
+ inputDir="input"
+ />
+-->
+ <consumers builderClass="org.apache.tika.batch.fs.builders.BasicTikaFSConsumersBuilder"
+ recursiveParserWrapper="false">
+ <parser class="org.apache.tika.batch.AutoDetectParserFactory" parseRecursively="true"/>
+ <contenthandler builderClass="org.apache.tika.batch.builders.DefaultContentHandlerFactoryBuilder"
+ basicHandlerType="xml" writeLimit="-1"/>
+
+ <!-- overwritePolicy: "skip" a file if output file exists, "rename" a output file, "overwrite" -->
+ <!-- can include e.g. outputDir="output", but we don't want to include this in the default! -->
+ <outputstream class="FSOutputStreamFactory" encoding="UTF-8" outputSuffix="xml"/>
+ </consumers>
+
+ <!-- reporter and interrupter are optional -->
+ <reporter builderClass="org.apache.tika.batch.builders.SimpleLogReporterBuilder" sleepMillis="1000"
+ reporterStaleThresholdMillis="60000"/>
+ <interrupter builderClass="org.apache.tika.batch.builders.InterrupterBuilder"/>
+</tika-batch-config>
\ No newline at end of file
Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java (added)
+++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/CommandLineParserBuilderTest.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,48 @@
+package org.apache.tika.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+
+import org.apache.commons.cli.Options;
+import org.apache.tika.batch.builders.CommandLineParserBuilder;
+import org.apache.tika.batch.fs.FSBatchTestBase;
+import org.apache.tika.io.IOUtils;
+import org.junit.Test;
+
+
+public class CommandLineParserBuilderTest extends FSBatchTestBase {
+
+ @Test
+ public void testBasic() throws Exception {
+ String configFile = this.getClass().getResource(
+ "/tika-batch-config-test.xml").getFile();
+ InputStream is = null;
+ try {
+ is = new FileInputStream(new File(configFile));
+ CommandLineParserBuilder builder = new CommandLineParserBuilder();
+ Options options = builder.build(is);
+ //TODO: insert actual tests :)
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
+
+ }
+}
Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java (added)
+++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchDriverTest.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,217 @@
+package org.apache.tika.batch.fs;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.io.IOUtils;
+import org.junit.Test;
+
+
+public class BatchDriverTest extends FSBatchTestBase {
+
+ //for debugging, turn logging off/on via resources/log4j.properties for the driver
+ //and log4j_process.properties for the process.
+
+ @Test(timeout = 15000)
+ public void oneHeavyHangTest() throws Exception {
+ //batch runner hits one heavy hang file, keep going
+ File outputDir = getNewOutputDir("daemon-");
+ assertNotNull(outputDir.listFiles());
+ //make sure output directory is empty!
+ assertEquals(0, outputDir.listFiles().length);
+
+ String[] args = getDefaultCommandLineArgsArr("one_heavy_hang", outputDir, null);
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args);
+ driver.execute();
+ assertEquals(0, driver.getNumRestarts());
+ assertFalse(driver.getUserInterrupted());
+ assertEquals(5, outputDir.listFiles().length);
+ assertContains("first test file",
+ FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"),
+ IOUtils.UTF_8.toString()));
+
+
+ }
+
+ @Test(timeout = 30000)
+ public void restartOnFullHangTest() throws Exception {
+ //batch runner hits more heavy hangs than threads; needs to restart
+ File outputDir = getNewOutputDir("daemon-");
+
+ //make sure output directory is empty!
+ assertEquals(0, outputDir.listFiles().length);
+
+ String[] args = getDefaultCommandLineArgsArr("heavy_heavy_hangs", outputDir, null);
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args);
+ driver.execute();
+ //could be one or two depending on timing
+ assertTrue(driver.getNumRestarts() > 0);
+ assertFalse(driver.getUserInterrupted());
+ assertContains("first test file",
+ FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"),
+ IOUtils.UTF_8.toString()));
+ }
+
+ @Test(timeout = 15000)
+ public void noRestartTest() throws Exception {
+ File outputDir = getNewOutputDir("daemon-");
+
+ //make sure output directory is empty!
+ assertEquals(0, outputDir.listFiles().length);
+
+ String[] args = getDefaultCommandLineArgsArr("no_restart", outputDir, null);
+ String[] mod = Arrays.copyOf(args, args.length + 2);
+ mod[args.length] = "-numConsumers";
+ mod[args.length+1] = "1";
+
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", mod);
+ driver.execute();
+ assertEquals(0, driver.getNumRestarts());
+ assertFalse(driver.getUserInterrupted());
+ File[] files = outputDir.listFiles();
+ assertEquals(2, files.length);
+ File test2 = new File(outputDir, "test2_norestart.xml.xml");
+ assertTrue("test2_norestart.xml", test2.exists());
+ File test3 = new File(outputDir, "test3_ok.xml.xml");
+ assertFalse("test3_ok.xml", test3.exists());
+ assertEquals(0, test3.length());
+ }
+
+ @Test(timeout = 15000)
+ public void restartOnOOMTest() throws Exception {
+ //batch runner hits more heavy hangs than threads; needs to restart
+ File outputDir = getNewOutputDir("daemon-");
+
+ //make sure output directory is empty!
+ assertEquals(0, outputDir.listFiles().length);
+
+ String[] args = getDefaultCommandLineArgsArr("oom", outputDir, null);
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", args);
+ driver.execute();
+ assertEquals(1, driver.getNumRestarts());
+ assertFalse(driver.getUserInterrupted());
+ assertContains("first test file",
+ FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"),
+ IOUtils.UTF_8.toString()));
+ }
+
+ @Test(timeout = 30000)
+ public void allHeavyHangsTestWithStarvedCrawler() throws Exception {
+ //this tests that if all consumers are hung and the crawler is
+ //waiting to add to the queue, there isn't deadlock. The BatchProcess should
+ //just shutdown, and the driver should restart
+ File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+ Map<String, String> args = new HashMap<String,String>();
+ args.put("-numConsumers", "2");
+ args.put("-maxQueueSize", "2");
+ String[] commandLine = getDefaultCommandLineArgsArr("heavy_heavy_hangs", outputDir, args);
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+ driver.execute();
+ assertEquals(3, driver.getNumRestarts());
+ assertFalse(driver.getUserInterrupted());
+ assertContains("first test file",
+ FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"),
+ IOUtils.UTF_8.toString()));
+ }
+
+ @Test(timeout = 30000)
+ public void maxRestarts() throws Exception {
+ //tests that maxRestarts works
+ //if -maxRestarts is not correctly removed from the commandline,
+ //FSBatchProcessCLI's cli parser will throw an Unrecognized option exception
+
+ File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+ Map<String, String> args = new HashMap<String,String>();
+ args.put("-numConsumers", "1");
+ args.put("-maxQueueSize", "10");
+ args.put("-maxRestarts", "2");
+
+ String[] commandLine = getDefaultCommandLineArgsArr("max_restarts", outputDir, args);
+
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+ driver.execute();
+ assertEquals(2, driver.getNumRestarts());
+ assertFalse(driver.getUserInterrupted());
+ assertEquals(3, outputDir.listFiles().length);
+ }
+
+ @Test(timeout = 30000)
+ public void maxRestartsBadParameter() throws Exception {
+ //tests that maxRestarts must be followed by an Integer
+ File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+ Map<String, String> args = new HashMap<String,String>();
+ args.put("-numConsumers", "1");
+ args.put("-maxQueueSize", "10");
+ args.put("-maxRestarts", "zebra");
+
+ String[] commandLine = getDefaultCommandLineArgsArr("max_restarts", outputDir, args);
+ boolean ex = false;
+ try {
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+ driver.execute();
+ } catch (IllegalArgumentException e) {
+ ex = true;
+ }
+ assertTrue("IllegalArgumentException should have been thrown", ex);
+ }
+
+ @Test(timeout = 30000)
+ public void testNoRestartIfProcessFails() throws Exception {
+ //tests that if something goes horribly wrong with FSBatchProcessCLI
+ //the driver will not restart it again and again
+ //this calls a bad xml file which should trigger a no restart exit.
+ File outputDir = getNewOutputDir("nostart-norestart-");
+ Map<String, String> args = new HashMap<String,String>();
+ args.put("-numConsumers", "1");
+ args.put("-maxQueueSize", "10");
+
+ String[] commandLine = getDefaultCommandLineArgsArr("basic", outputDir, args);
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-broken.xml", commandLine);
+ driver.execute();
+ assertEquals(0, outputDir.listFiles().length);
+ assertEquals(0, driver.getNumRestarts());
+ }
+
+ @Test(timeout = 30000)
+ public void testNoRestartIfProcessFailsTake2() throws Exception {
+ File outputDir = getNewOutputDir("nostart-norestart-");
+ Map<String, String> args = new HashMap<String,String>();
+ args.put("-numConsumers", "1");
+ args.put("-maxQueueSize", "10");
+ args.put("-somethingOrOther", "I don't Know");
+
+ String[] commandLine = getDefaultCommandLineArgsArr("basic", outputDir, args);
+ BatchProcessDriverCLI driver = getNewDriver("/tika-batch-config-test.xml", commandLine);
+ driver.execute();
+ assertEquals(0, outputDir.listFiles().length);
+ assertEquals(0, driver.getNumRestarts());
+ }
+
+
+}
Added: tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java
URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java?rev=1668673&view=auto
==============================================================================
--- tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java (added)
+++ tika/trunk/tika-batch/src/test/java/org/apache/tika/batch/fs/BatchProcessTest.java Mon Mar 23 16:09:10 2015
@@ -0,0 +1,343 @@
+package org.apache.tika.batch.fs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.batch.BatchProcess;
+import org.apache.tika.batch.BatchProcessDriverCLI;
+import org.apache.tika.io.IOUtils;
+import org.junit.Test;
+
+public class BatchProcessTest extends FSBatchTestBase {
+
+ @Test(timeout = 15000)
+ public void oneHeavyHangTest() throws Exception {
+
+ File outputDir = getNewOutputDir("one_heavy_hang-");
+
+ Map<String, String> args = getDefaultArgs("one_heavy_hang", outputDir);
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+ assertEquals(5, outputDir.listFiles().length);
+ File hvyHang = new File(outputDir, "test0_heavy_hang.xml.xml");
+ assertTrue(hvyHang.exists());
+ assertEquals(0, hvyHang.length());
+ assertNotContained(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+ streamStrings.getErrString());
+ }
+
+
+ @Test(timeout = 15000)
+ public void allHeavyHangsTest() throws Exception {
+ //each of the three threads hits a heavy hang. The BatchProcess runs into
+ //all timedouts and shuts down.
+ File outputDir = getNewOutputDir("allHeavyHangs-");
+ Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir);
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+
+ assertEquals(3, outputDir.listFiles().length);
+ for (File hvyHang : outputDir.listFiles()){
+ assertTrue(hvyHang.exists());
+ assertEquals("file length for "+hvyHang.getName()+" should be 0, but is: " +hvyHang.length(),
+ 0, hvyHang.length());
+ }
+ assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+ streamStrings.getErrString());
+ }
+
+ @Test(timeout = 30000)
+ public void allHeavyHangsTestWithCrazyNumberConsumersTest() throws Exception {
+ File outputDir = getNewOutputDir("allHeavyHangsCrazyNumberConsumers-");
+ Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir);
+ args.put("numConsumers", "100");
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+ assertEquals(7, outputDir.listFiles().length);
+
+ for (int i = 0; i < 6; i++){
+ File hvyHang = new File(outputDir, "test"+i+"_heavy_hang.xml.xml");
+ assertTrue(hvyHang.exists());
+ assertEquals(0, hvyHang.length());
+ }
+ assertContains("This is tika-batch's first test file",
+ FileUtils.readFileToString(new File(outputDir, "test6_ok.xml.xml"),
+ IOUtils.UTF_8.toString()));
+
+ //key that the process realize that there were no more processable files
+ //in the queue and does not ask for a restart!
+ assertNotContained(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+ streamStrings.getErrString());
+ }
+
+ @Test(timeout = 30000)
+ public void allHeavyHangsTestWithStarvedCrawler() throws Exception {
+ //this tests that if all consumers are hung and the crawler is
+ //waiting to add to the queue, there isn't deadlock. The batchrunner should
+ //shutdown and ask to be restarted.
+ File outputDir = getNewOutputDir("allHeavyHangsStarvedCrawler-");
+ Map<String, String> args = getDefaultArgs("heavy_heavy_hangs", outputDir);
+ args.put("numConsumers", "2");
+ args.put("maxQueueSize", "2");
+ args.put("timeoutThresholdMillis", "100000000");//make sure that the batch process doesn't time out
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+ assertEquals(2, outputDir.listFiles().length);
+
+ for (int i = 0; i < 2; i++){
+ File hvyHang = new File(outputDir, "test"+i+"_heavy_hang.xml.xml");
+ assertTrue(hvyHang.exists());
+ assertEquals(0, hvyHang.length());
+ }
+ assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+ streamStrings.getErrString());
+ assertContains("Crawler timed out", streamStrings.getErrString());
+ }
+
+ @Test(timeout = 15000)
+ public void outOfMemory() throws Exception {
+ //the first consumer should sleep for 10 seconds
+ //the second should be tied up in a heavy hang
+ //the third one should hit the oom after processing test2_ok.xml
+ //no consumers should process test2-4.txt!
+ //i.e. the first consumer will finish in 10 seconds and
+ //then otherwise would be looking for more, but the oom should prevent that
+ File outputDir = getNewOutputDir("oom-");
+
+ Map<String, String> args = getDefaultArgs("oom", outputDir);
+ args.put("numConsumers", "3");
+ args.put("timeoutThresholdMillis", "30000");
+
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+
+ assertEquals(4, outputDir.listFiles().length);
+ assertContains("This is tika-batch's first test file",
+ FileUtils.readFileToString(new File(outputDir, "test2_ok.xml.xml"),
+ IOUtils.UTF_8.toString()));
+
+ assertContains(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString(),
+ streamStrings.getErrString());
+ }
+
+
+
+ @Test(timeout = 15000)
+ public void noRestart() throws Exception {
+ File outputDir = getNewOutputDir("no_restart");
+
+ Map<String, String> args = getDefaultArgs("no_restart", outputDir);
+ args.put("numConsumers", "1");
+
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+
+ StreamStrings streamStrings = ex.execute();
+ File[] files = outputDir.listFiles();
+ File test2 = new File(outputDir, "test2_norestart.xml.xml");
+ assertTrue("test2_norestart.xml", test2.exists());
+ File test3 = new File(outputDir, "test3_ok.xml.xml");
+ assertFalse("test3_ok.xml", test3.exists());
+ assertEquals(0, test3.length());
+ assertContains("exitStatus="+ BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE,
+ streamStrings.getOutString());
+ assertContains("causeForTermination='MAIN_LOOP_EXCEPTION_NO_RESTART'",
+ streamStrings.getOutString());
+ }
+
+ /**
+ * This tests to make sure that BatchProcess waits the appropriate
+ * amount of time on an early termination before stopping.
+ *
+ * If this fails, then interruptible parsers (e.g. those with
+ * nio channels) will be interrupted and there will be corrupted data.
+ */
+ @Test(timeout = 60000)
+ public void testWaitAfterEarlyTermination() throws Exception {
+ File outputDir = getNewOutputDir("wait_after_early_termination");
+
+ Map<String, String> args = getDefaultArgs("wait_after_early_termination", outputDir);
+ args.put("numConsumers", "1");
+ args.put("maxAliveTimeSeconds", "5");//main process loop should stop after 5 seconds
+ args.put("timeoutThresholdMillis", "300000");//effectively never
+ args.put("pauseOnEarlyTerminationMillis", "20000");//let the parser have up to 20 seconds
+
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+
+ StreamStrings streamStrings = ex.execute();
+ File[] files = outputDir.listFiles();
+ assertEquals(1, files.length);
+ assertContains("<p>some content</p>",
+ FileUtils.readFileToString(new File(outputDir, "test0_sleep.xml.xml"),
+ IOUtils.UTF_8.toString()));
+
+ assertContains("exitStatus="+BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE, streamStrings.getOutString());
+ assertContains("causeForTermination='BATCH_PROCESS_ALIVE_TOO_LONG'",
+ streamStrings.getOutString());
+ }
+
+ @Test(timeout = 60000)
+ public void testTimeOutAfterBeingAskedToShutdown() throws Exception {
+ File outputDir = getNewOutputDir("timeout_after_early_termination");
+
+ Map<String, String> args = getDefaultArgs("timeout_after_early_termination", outputDir);
+ args.put("numConsumers", "1");
+ args.put("maxAliveTimeSeconds", "5");//main process loop should stop after 5 seconds
+ args.put("timeoutThresholdMillis", "10000");
+ args.put("pauseOnEarlyTerminationMillis", "20000");//let the parser have up to 20 seconds
+
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+ File[] files = outputDir.listFiles();
+ assertEquals(1, files.length);
+ assertEquals(0, files[0].length());
+ assertContains("exitStatus="+BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE, streamStrings.getOutString());
+ assertContains("causeForTermination='BATCH_PROCESS_ALIVE_TOO_LONG'",
+ streamStrings.getOutString());
+ }
+
+ @Test(timeout = 10000)
+ public void testRedirectionOfStreams() throws Exception {
+ //test redirection of system.err to system.out
+ File outputDir = getNewOutputDir("noisy_parsers");
+
+ Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir);
+ args.put("numConsumers", "1");
+ args.put("maxAliveTimeSeconds", "20");//main process loop should stop after 5 seconds
+
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args);
+ StreamStrings streamStrings = ex.execute();
+ File[] files = outputDir.listFiles();
+ assertEquals(1, files.length);
+ assertContains("System.out", streamStrings.getOutString());
+ assertContains("System.err", streamStrings.getOutString());
+ assertEquals(0, streamStrings.getErrString().length());
+
+ }
+
+ @Test(timeout = 10000)
+ public void testConsumersManagerInitHang() throws Exception {
+ File outputDir = getNewOutputDir("init_hang");
+
+ Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir);
+ args.put("numConsumers", "1");
+ args.put("hangOnInit", "true");
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args, "/tika-batch-config-MockConsumersBuilder.xml");
+ StreamStrings streamStrings = ex.execute();
+ assertEquals(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, ex.getExitValue());
+ assertContains("causeForTermination='CONSUMERS_MANAGER_DIDNT_INIT_IN_TIME_NO_RESTART'", streamStrings.getOutString());
+ }
+
+ @Test(timeout = 10000)
+ public void testConsumersManagerShutdownHang() throws Exception {
+ File outputDir = getNewOutputDir("shutdown_hang");
+
+ Map<String, String> args = getDefaultArgs("noisy_parsers", outputDir);
+ args.put("numConsumers", "1");
+ args.put("hangOnShutdown", "true");
+
+ BatchProcessTestExecutor ex = new BatchProcessTestExecutor(args, "/tika-batch-config-MockConsumersBuilder.xml");
+ StreamStrings streamStrings = ex.execute();
+ assertEquals(BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE, ex.getExitValue());
+ assertContains("ConsumersManager did not shutdown within", streamStrings.getOutString());
+ }
+
+ private class BatchProcessTestExecutor {
+ private final Map<String, String> args;
+ private final String configPath;
+ private int exitValue = Integer.MIN_VALUE;
+
+ public BatchProcessTestExecutor(Map<String, String> args) {
+ this(args, "/tika-batch-config-test.xml");
+ }
+
+ public BatchProcessTestExecutor(Map<String, String> args, String configPath) {
+ this.args = args;
+ this.configPath = configPath;
+ }
+
+ private StreamStrings execute() {
+ Process p = null;
+ try {
+ ProcessBuilder b = getNewBatchRunnerProcess(configPath, args);
+ p = b.start();
+ StringStreamGobbler errorGobbler = new StringStreamGobbler(p.getErrorStream());
+ StringStreamGobbler outGobbler = new StringStreamGobbler(p.getInputStream());
+ Thread errorThread = new Thread(errorGobbler);
+ Thread outThread = new Thread(outGobbler);
+ errorThread.start();
+ outThread.start();
+ while (true) {
+ try {
+ exitValue = p.exitValue();
+ break;
+ } catch (IllegalThreadStateException e) {
+ //still going;
+ }
+ }
+ errorGobbler.stopGobblingAndDie();
+ outGobbler.stopGobblingAndDie();
+ errorThread.interrupt();
+ outThread.interrupt();
+ return new StreamStrings(outGobbler.toString(), errorGobbler.toString());
+ } catch (IOException e) {
+ fail();
+ } finally {
+ destroyProcess(p);
+ }
+ return null;
+ }
+
+ private int getExitValue() {
+ return exitValue;
+ }
+
+ }
+
+ private class StreamStrings {
+ private final String outString;
+ private final String errString;
+
+ private StreamStrings(String outString, String errString) {
+ this.outString = outString;
+ this.errString = errString;
+ }
+
+ private String getOutString() {
+ return outString;
+ }
+
+ private String getErrString() {
+ return errString;
+ }
+
+ @Override
+ public String toString() {
+ return "OUT>>"+outString+"<<\n"+
+ "ERR>>"+errString+"<<\n";
+ }
+ }
+}