You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/02/19 22:18:21 UTC

[tika] branch TIKA-3304 created (now 6c87de1)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git.


      at 6c87de1  TIKA-3304 -- experimental... WIP -- not to be merged

This branch includes the following new commits:

     new 2674c56  fix npe in when OCR is available and fix incorrect mark offset exception
     new 4eae1e4  fix indentation
     new e5a6039  Avoid reporting of temporary ocr-based mime type in xhtml output
     new 6c87de1  TIKA-3304 -- experimental... WIP -- not to be merged

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[tika] 02/04: fix indentation

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 4eae1e45226a9f5c5cb0dab4e53f2d6b77f3575c
Author: tballison <ta...@apache.org>
AuthorDate: Thu Feb 18 17:39:01 2021 -0500

    fix indentation
---
 .../src/main/java/org/apache/tika/gui/TikaGUI.java | 183 +++++++++++----------
 1 file changed, 94 insertions(+), 89 deletions(-)

diff --git a/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java b/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java
index e0b755b..3f9587b 100644
--- a/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java
+++ b/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java
@@ -100,7 +100,7 @@ public class TikaGUI extends JFrame
         implements ActionListener, HyperlinkListener {
 
     //maximum length to allow for mark for reparse to get JSON
-    private static final int MAX_MARK = 20*1024*1024;//20MB
+    private static final int MAX_MARK = 20 * 1024 * 1024;//20MB
 
     /**
      * Serial version UID.
@@ -130,7 +130,7 @@ public class TikaGUI extends JFrame
                         new CommonsDigester(MAX_MARK,
                                 CommonsDigester.DigestAlgorithm.MD5,
                                 CommonsDigester.DigestAlgorithm.SHA256)
-                        )).setVisible(true);
+                )).setVisible(true);
             }
         });
     }
@@ -144,7 +144,7 @@ public class TikaGUI extends JFrame
      * Configured parser instance.
      */
     private final Parser parser;
-    
+
     /**
      * Captures requested embedded images
      */
@@ -174,7 +174,7 @@ public class TikaGUI extends JFrame
      * Main content output.
      */
     private final JEditorPane textMain;
-    
+
     /**
      * Raw XHTML source.
      */
@@ -347,8 +347,8 @@ public class TikaGUI extends JFrame
         if (input.markSupported()) {
             int mark = -1;
             if (input instanceof TikaInputStream) {
-                if (((TikaInputStream)input).hasFile()) {
-                    mark = (int)((TikaInputStream)input).getLength();
+                if (((TikaInputStream) input).hasFile()) {
+                    mark = (int) ((TikaInputStream) input).getLength();
                 }
             }
             if (mark == -1) {
@@ -391,8 +391,8 @@ public class TikaGUI extends JFrame
             input.reset();
             isReset = true;
         } catch (IOException e) {
-            setText(json, "Error during stream reset.\n"+
-                    "There's a limit of "+MAX_MARK + " bytes for this type of processing in the GUI.\n"+
+            setText(json, "Error during stream reset.\n" +
+                    "There's a limit of " + MAX_MARK + " bytes for this type of processing in the GUI.\n" +
                     "Try the app with command line argument of -J."
             );
         }
@@ -420,7 +420,7 @@ public class TikaGUI extends JFrame
         t.printStackTrace(new PrintWriter(writer));
 
         JEditorPane editor =
-            new JEditorPane("text/plain", writer.toString());
+                new JEditorPane("text/plain", writer.toString());
         editor.setEditable(false);
         editor.setBackground(Color.WHITE);
         editor.setCaretPosition(0);
@@ -435,7 +435,7 @@ public class TikaGUI extends JFrame
     private void addWelcomeCard(JPanel panel, String name) {
         try {
             JEditorPane editor =
-                new JEditorPane(TikaGUI.class.getResource("welcome.html"));
+                    new JEditorPane(TikaGUI.class.getResource("welcome.html"));
             editor.setContentType("text/html");
             editor.setEditable(false);
             editor.setBackground(Color.WHITE);
@@ -480,7 +480,7 @@ public class TikaGUI extends JFrame
                 URL url = e.getURL();
                 try (InputStream stream = url.openStream()) {
                     JEditorPane editor =
-                        new JEditorPane("text/plain", IOUtils.toString(stream, UTF_8));
+                            new JEditorPane("text/plain", IOUtils.toString(stream, UTF_8));
                     editor.setEditable(false);
                     editor.setBackground(Color.WHITE);
                     editor.setCaretPosition(0);
@@ -519,7 +519,7 @@ public class TikaGUI extends JFrame
      * {@link JEditorPane} fail thinking that the document character set
      * is inconsistent.
      * <p>
-     * Additionally, it will use ImageSavingParser to re-write embedded:(image) 
+     * Additionally, it will use ImageSavingParser to re-write embedded:(image)
      * image links to be file:///(temporary file) so that they can be loaded.
      *
      * @param writer output writer
@@ -529,7 +529,7 @@ public class TikaGUI extends JFrame
     private ContentHandler getHtmlHandler(Writer writer)
             throws TransformerConfigurationException {
         SAXTransformerFactory factory = (SAXTransformerFactory)
-            SAXTransformerFactory.newInstance();
+                SAXTransformerFactory.newInstance();
         TransformerHandler handler = factory.newTransformerHandler();
         handler.getTransformer().setOutputProperty(OutputKeys.METHOD, "html");
         handler.setResult(new StreamResult(writer));
@@ -542,36 +542,37 @@ public class TikaGUI extends JFrame
                     uri = null;
                 }
                 if (!"head".equals(localName)) {
-                    if("img".equals(localName)) {
-                       AttributesImpl newAttrs;
-                       if(atts instanceof AttributesImpl) {
-                          newAttrs = (AttributesImpl)atts;
-                       } else {
-                          newAttrs = new AttributesImpl(atts);
-                       }
-                       
-                       for(int i=0; i<newAttrs.getLength(); i++) {
-                          if("src".equals(newAttrs.getLocalName(i))) {
-                             String src = newAttrs.getValue(i);
-                             if(src.startsWith("embedded:")) {
-                                String filename = src.substring(src.indexOf(':')+1);
-                                try {
-                                   File img = imageParser.requestSave(filename);
-                                   String newSrc = img.toURI().toString();
-                                   newAttrs.setValue(i, newSrc);
-                                } catch(IOException e) {
-                                   System.err.println("Error creating temp image file " + filename);
-                                   // The html viewer will show a broken image too to alert them
+                    if ("img".equals(localName)) {
+                        AttributesImpl newAttrs;
+                        if (atts instanceof AttributesImpl) {
+                            newAttrs = (AttributesImpl) atts;
+                        } else {
+                            newAttrs = new AttributesImpl(atts);
+                        }
+
+                        for (int i = 0; i < newAttrs.getLength(); i++) {
+                            if ("src".equals(newAttrs.getLocalName(i))) {
+                                String src = newAttrs.getValue(i);
+                                if (src.startsWith("embedded:")) {
+                                    String filename = src.substring(src.indexOf(':') + 1);
+                                    try {
+                                        File img = imageParser.requestSave(filename);
+                                        String newSrc = img.toURI().toString();
+                                        newAttrs.setValue(i, newSrc);
+                                    } catch (IOException e) {
+                                        System.err.println("Error creating temp image file " + filename);
+                                        // The html viewer will show a broken image too to alert them
+                                    }
                                 }
-                             }
-                          }
-                       }
-                       super.startElement(uri, localName, name, newAttrs);
+                            }
+                        }
+                        super.startElement(uri, localName, name, newAttrs);
                     } else {
-                       super.startElement(uri, localName, name, atts);
+                        super.startElement(uri, localName, name, atts);
                     }
                 }
             }
+
             @Override
             public void endElement(String uri, String localName, String name)
                     throws SAXException {
@@ -582,9 +583,11 @@ public class TikaGUI extends JFrame
                     super.endElement(uri, localName, name);
                 }
             }
+
             @Override
             public void startPrefixMapping(String prefix, String uri) {
             }
+
             @Override
             public void endPrefixMapping(String prefix) {
             }
@@ -594,6 +597,7 @@ public class TikaGUI extends JFrame
     private ContentHandler getTextContentHandler(Writer writer) {
         return new BodyContentHandler(writer);
     }
+
     private ContentHandler getTextMainContentHandler(Writer writer) {
         return new BoilerpipeContentHandler(writer);
     }
@@ -601,7 +605,7 @@ public class TikaGUI extends JFrame
     private ContentHandler getXmlContentHandler(Writer writer)
             throws TransformerConfigurationException {
         SAXTransformerFactory factory = (SAXTransformerFactory)
-            SAXTransformerFactory.newInstance();
+                SAXTransformerFactory.newInstance();
         TransformerHandler handler = factory.newTransformerHandler();
         handler.getTransformer().setOutputProperty(OutputKeys.METHOD, "xml");
         handler.setResult(new StreamResult(writer));
@@ -612,62 +616,63 @@ public class TikaGUI extends JFrame
      * A {@link DocumentSelector} that accepts only images.
      */
     private static class ImageDocumentSelector implements DocumentSelector {
-      public boolean select(Metadata metadata) {
-         String type = metadata.get(Metadata.CONTENT_TYPE);
-         return type != null && type.startsWith("image/");
-      }
+        public boolean select(Metadata metadata) {
+            String type = metadata.get(Metadata.CONTENT_TYPE);
+            return type != null && type.startsWith("image/");
+        }
     }
-    
+
     /**
      * A recursive parser that saves certain images into the temporary
-     *  directory, and delegates everything else to another downstream
-     *  parser.
+     * directory, and delegates everything else to another downstream
+     * parser.
      */
     private static class ImageSavingParser extends AbstractParser {
-      private Map<String,File> wanted = new HashMap<String,File>();
-      private Parser downstreamParser;
-      private File tmpDir;
-      
-      private ImageSavingParser(Parser downstreamParser) {
-         this.downstreamParser = downstreamParser;
-         
-         try {
-            File t = File.createTempFile("tika", ".test");
-            tmpDir = t.getParentFile();
-         } catch(IOException e) {}
-      }
-      
-      public File requestSave(String embeddedName) throws IOException {
-         String suffix = ".tika";
-         
-         int splitAt = embeddedName.lastIndexOf('.');
-         if (splitAt > 0) {
-            embeddedName.substring(splitAt);
-         }
-         
-         File tmp = File.createTempFile("tika-embedded-", suffix);
-         wanted.put(embeddedName, tmp);
-         return tmp;
-      }
-
-      public Set<MediaType> getSupportedTypes(ParseContext context) {
-         return downstreamParser.getSupportedTypes(context);
-      }
-
-      public void parse(InputStream stream, ContentHandler handler,
-            Metadata metadata, ParseContext context) throws IOException,
-            SAXException, TikaException {
-         String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
-         if(name != null && wanted.containsKey(name)) {
-            FileOutputStream out = new FileOutputStream(wanted.get(name));
-            IOUtils.copy(stream, out);
-            out.close();
-         } else {
-            if(downstreamParser != null) {
-               downstreamParser.parse(stream, handler, metadata, context);
+        private Map<String, File> wanted = new HashMap<String, File>();
+        private Parser downstreamParser;
+        private File tmpDir;
+
+        private ImageSavingParser(Parser downstreamParser) {
+            this.downstreamParser = downstreamParser;
+
+            try {
+                File t = File.createTempFile("tika", ".test");
+                tmpDir = t.getParentFile();
+            } catch (IOException e) {
             }
-         }
-      }
+        }
+
+        public File requestSave(String embeddedName) throws IOException {
+            String suffix = ".tika";
+
+            int splitAt = embeddedName.lastIndexOf('.');
+            if (splitAt > 0) {
+                embeddedName.substring(splitAt);
+            }
+
+            File tmp = File.createTempFile("tika-embedded-", suffix);
+            wanted.put(embeddedName, tmp);
+            return tmp;
+        }
+
+        public Set<MediaType> getSupportedTypes(ParseContext context) {
+            return downstreamParser.getSupportedTypes(context);
+        }
+
+        public void parse(InputStream stream, ContentHandler handler,
+                          Metadata metadata, ParseContext context) throws IOException,
+                SAXException, TikaException {
+            String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY);
+            if (name != null && wanted.containsKey(name)) {
+                FileOutputStream out = new FileOutputStream(wanted.get(name));
+                IOUtils.copy(stream, out);
+                out.close();
+            } else {
+                if (downstreamParser != null) {
+                    downstreamParser.parse(stream, handler, metadata, context);
+                }
+            }
+        }
 
     }
 


[tika] 01/04: fix npe in when OCR is available and fix incorrect mark offset exception

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 2674c560c6dc148506f4a7d0f53030b33a2e0ff3
Author: tballison <ta...@apache.org>
AuthorDate: Thu Feb 18 17:38:20 2021 -0500

    fix npe in when OCR is available and fix incorrect mark offset exception
---
 tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java b/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java
index 50b3c22..e0b755b 100644
--- a/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java
+++ b/tika-app/src/main/java/org/apache/tika/gui/TikaGUI.java
@@ -72,6 +72,7 @@ import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.mime.MediaType;
 import org.apache.tika.parser.AbstractParser;
 import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.DelegatingParser;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
@@ -341,8 +342,7 @@ public class TikaGUI extends JFrame
 
         context.set(DocumentSelector.class, new ImageDocumentSelector());
 
-        input = TikaInputStream.get(new ProgressMonitorInputStream(
-                this, "Parsing stream", input));
+        input = TikaInputStream.get(input);
 
         if (input.markSupported()) {
             int mark = -1;
@@ -649,10 +649,9 @@ public class TikaGUI extends JFrame
          wanted.put(embeddedName, tmp);
          return tmp;
       }
-      
+
       public Set<MediaType> getSupportedTypes(ParseContext context) {
-         // Never used in an auto setup
-         return null;
+         return downstreamParser.getSupportedTypes(context);
       }
 
       public void parse(InputStream stream, ContentHandler handler,


[tika] 04/04: TIKA-3304 -- experimental... WIP -- not to be merged

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 6c87de132289e2e2d9f65bae5a091203464c1c29
Author: tballison <ta...@apache.org>
AuthorDate: Fri Feb 19 17:17:59 2021 -0500

    TIKA-3304 -- experimental... WIP -- not to be merged
---
 .../java/org/apache/tika/config/ServiceLoader.java |   6 +-
 .../java/org/apache/tika/config/TikaConfig.java    |   2 +-
 .../tika/pipes/fetchiterator/FetchEmitTuple.java   |   5 +-
 .../tika/pipes/fetchiterator/FetchIterator.java    |   9 +-
 .../fetchiterator/FileSystemFetchIterator.java     |   1 -
 tika-pipes/pom.xml                                 |   1 +
 tika-pipes/tika-pipes-app/pom.xml                  |  82 ++++
 .../java/org/apache/tika/pipes/async/AsyncCli.java | 349 +++++++++++++++
 .../org/apache/tika/pipes/async/AsyncData.java     |  20 +
 .../org/apache/tika/pipes/async/AsyncEmitHook.java |   8 +
 .../org/apache/tika/pipes/async/AsyncEmitter.java  | 166 ++++++++
 .../tika/pipes/async/AsyncPipesEmitHook.java       |  45 ++
 .../org/apache/tika/pipes/async/AsyncTask.java     |  36 ++
 .../org/apache/tika/pipes/async/AsyncWorker.java   | 171 ++++++++
 .../tika/pipes/async/AsyncWorkerProcess.java       | 474 +++++++++++++++++++++
 .../src/main/resources/log4j.properties            |  24 ++
 .../org/apache/tika/pipes/driver/AsyncCliTest.java |  14 +
 .../apache/tika/pipes/driver/TestPipesDriver.java  | 118 +++++
 18 files changed, 1518 insertions(+), 13 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/config/ServiceLoader.java b/tika-core/src/main/java/org/apache/tika/config/ServiceLoader.java
index 6debe2e..c33f863 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ServiceLoader.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ServiceLoader.java
@@ -148,13 +148,11 @@ public class ServiceLoader {
     }
 
     public ServiceLoader(ClassLoader loader) {
-    	this(loader, Boolean.getBoolean("org.apache.tika.service.error.warn") 
-    			? LoadErrorHandler.WARN:LoadErrorHandler.IGNORE);
+    	this(loader, LoadErrorHandler.THROW);
     }
 
     public ServiceLoader() {
-    	this(getContextClassLoader(), Boolean.getBoolean("org.apache.tika.service.error.warn") 
-    			? LoadErrorHandler.WARN:LoadErrorHandler.IGNORE, true);
+    	this(getContextClassLoader(), LoadErrorHandler.THROW, true);
     }
     
     /**
diff --git a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
index 041a6a4..ed80773 100644
--- a/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
@@ -577,7 +577,7 @@ public class TikaConfig {
 
         if (serviceLoaderElement != null) {
             boolean dynamic = Boolean.parseBoolean(serviceLoaderElement.getAttribute("dynamic"));
-            LoadErrorHandler loadErrorHandler = LoadErrorHandler.IGNORE;
+            LoadErrorHandler loadErrorHandler = LoadErrorHandler.THROW;
             String loadErrorHandleConfig = serviceLoaderElement.getAttribute("loadErrorHandler");
             if (LoadErrorHandler.WARN.toString().equalsIgnoreCase(loadErrorHandleConfig)) {
                 loadErrorHandler = LoadErrorHandler.WARN;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
index efab236..f6e7c74 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
@@ -28,7 +28,7 @@ public class FetchEmitTuple {
     }
     public static final ON_PARSE_EXCEPTION DEFAULT_ON_PARSE_EXCEPTION = ON_PARSE_EXCEPTION.EMIT;
     private final FetchKey fetchKey;
-    private final EmitKey emitKey;
+    private EmitKey emitKey;
     private final Metadata metadata;
     private final ON_PARSE_EXCEPTION onParseException;
 
@@ -59,6 +59,9 @@ public class FetchEmitTuple {
         return onParseException;
     }
 
+    public void setEmitKey(EmitKey emitKey) {
+        this.emitKey = emitKey;
+    }
     @Override
     public String toString() {
         return "FetchEmitTuple{" +
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index 505d042..4794691 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -122,14 +122,11 @@ public abstract class FetchIterator implements Callable<Integer>, Initializable
         if (queue == null || numConsumers < 0) {
             throw new IllegalStateException("Must call 'init' before calling this object");
         }
-
+        System.out.println("fetch iterator");
         enqueue();
+        System.out.println("fetch iterator finshed enqueing");
         for (int i = 0; i < numConsumers; i++) {
-            try {
-                tryToAdd(COMPLETED_SEMAPHORE);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
+            tryToAdd(COMPLETED_SEMAPHORE);
         }
         return added;
     }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 68a96a1..8a06769 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -40,7 +40,6 @@ public class FileSystemFetchIterator
     private Path basePath;
 
     public FileSystemFetchIterator() {
-
     }
 
     public FileSystemFetchIterator(String fetcherName, Path basePath) {
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index b90d3c3..dec23a1 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -35,6 +35,7 @@
         <module>tika-fetchers</module>
         <module>tika-emitters</module>
         <module>tika-fetch-iterators</module>
+        <module>tika-pipes-app</module>
         <module>tika-pipes-integration-tests</module>
     </modules>
 
diff --git a/tika-pipes/tika-pipes-app/pom.xml b/tika-pipes/tika-pipes-app/pom.xml
new file mode 100644
index 0000000..1042d42
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.tika</groupId>
+        <artifactId>tika-pipes</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>tika-pipes-app</artifactId>
+    <packaging>pom</packaging>
+    <name>Apache Tika emitters</name>
+    <url>http://tika.apache.org/</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-serialization</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons.io.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>${h2.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-emitter-fs</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-parsers-classic</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
new file mode 100644
index 0000000..eea1fc3
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -0,0 +1,349 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class AsyncCli {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
+
+    public static void main(String[] args) throws Exception {
+        Path configPath = Paths.get(args[0]);
+        int maxConsumers = 20;
+        AsyncCli asyncCli = new AsyncCli();
+        Path dbDir = Paths.get("/Users/allison/Desktop/tmp-db");//Files.createTempDirectory("tika-async-db-");
+        try {
+            asyncCli.execute(dbDir, configPath, maxConsumers);
+        } finally {
+            FileUtils.deleteDirectory(dbDir.toFile());
+        }
+
+    }
+
+    private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
+        TikaConfig tikaConfig = new TikaConfig(configPath);
+
+        String connectionString = setupTables(dbDir);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(maxConsumers + 3);
+        ExecutorCompletionService<Integer> executorCompletionService =
+                new ExecutorCompletionService<>(executorService);
+
+        try (Connection connection = DriverManager.getConnection(connectionString)) {
+            FetchIterator fetchIterator = tikaConfig.getFetchIterator();
+            if (fetchIterator instanceof EmptyFetchIterator) {
+                throw new IllegalArgumentException("can't have empty fetch iterator");
+            }
+            ArrayBlockingQueue<FetchEmitTuple> q = fetchIterator.init(maxConsumers);
+            AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
+            executorCompletionService.submit(fetchIterator);
+            executorCompletionService.submit(enqueuer);
+            executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
+
+            for (int i = 0; i < maxConsumers; i++) {
+                executorCompletionService.submit(new AsyncWorker(connection,
+                        connectionString, i, configPath));
+            }
+            int completed = 0;
+            while (completed < maxConsumers+3) {
+                Future<Integer> future = executorCompletionService.take();
+                if (future != null) {
+                    int val = future.get();
+                    completed++;
+                    LOG.debug("finished " + val);
+                }
+            }
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
+    private String setupTables(Path dbDir) throws SQLException {
+        Path dbFile = dbDir.resolve("tika-async");
+        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
+                ";AUTO_SERVER=TRUE";
+        Connection connection = DriverManager.getConnection(url);
+
+        String sql = "create table parse_queue " +
+                "(id bigint auto_increment primary key," +
+                "status tinyint," +//byte
+                "worker_id integer," +
+                "retry smallint," + //short
+                "time_stamp timestamp," +
+                "json varchar(64000))";
+        connection.createStatement().execute(sql);
+        //no clear benefit to creating an index on timestamp
+//        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
+        sql = "create table workers (worker_id int primary key)";
+        connection.createStatement().execute(sql);
+
+        sql = "create table workers_shutdown (worker_id int primary key)";
+        connection.createStatement().execute(sql);
+
+        sql = "create table error_log (task_id bigint, " +
+                "fetch_key varchar(10000)," +
+                "time_stamp timestamp," +
+                "retry integer," +
+                "error_code tinyint)";
+        connection.createStatement().execute(sql);
+
+        return url;
+    }
+
+
+    //this reads fetchemittuples from the queue and inserts them in the db
+    //for the workers to read
+    private static class AsyncTaskEnqueuer implements Callable<Integer> {
+        private final PreparedStatement insert;
+
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
+        private final Connection connection;
+        private final Random random = new Random();
+
+        private volatile boolean isComplete = false;
+
+        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
+                          Connection connection) throws SQLException {
+            this.queue = queue;
+            this.connection = connection;
+            String sql = "insert into parse_queue (status, time_stamp, worker_id, retry, json) " +
+                    "values (?,CURRENT_TIMESTAMP(),?,?,?)";
+            insert = connection.prepareStatement(sql);
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            List<Integer> workers = new ArrayList<>();
+            while (true) {
+                FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
+                LOG.debug("enqueing to db "+t);
+                if (t == null) {
+                    //log.trace?
+                } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                    isComplete = true;
+                    return 1;
+                } else {
+                    long start = System.currentTimeMillis();
+                    long elapsed = System.currentTimeMillis() - start;
+                    //TODO -- fix this
+                    while (workers.size() == 0 && elapsed < 600000) {
+                        workers = getActiveWorkers(connection);
+                        Thread.sleep(100);
+                        elapsed = System.currentTimeMillis()-start;
+                    }
+                    insert(t, workers);
+                }
+            }
+        }
+
+        boolean isComplete() {
+            return isComplete;
+        }
+        private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+            int workerId = workers.size() == 1 ? workers.get(0) :
+                    workers.get(random.nextInt(workers.size()));
+            insert.clearParameters();
+            insert.setByte(1, (byte) AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+            insert.setInt(2, workerId);
+            insert.setShort(3, (short) 0);
+            insert.setString(4, JsonFetchEmitTuple.toJson(t));
+            insert.execute();
+        }
+    }
+
+    private static class AssignmentManager implements Callable {
+
+        private final Connection connection;
+        private final AsyncTaskEnqueuer enqueuer;
+        private final PreparedStatement getQueueDistribution;
+        private final PreparedStatement findMissingWorkers;
+        private final PreparedStatement allocateNonworkersToWorkers;
+        private final PreparedStatement reallocate;
+        private final PreparedStatement countAvailableTasks;
+        private final PreparedStatement insertWorkersShutdown;
+        private final Random random = new Random();
+
+
+        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+            this.connection = connection;
+            this.enqueuer = enqueuer;
+            //this gets workers and # of tasks in desc order of number of tasks
+            String sql = "select w.worker_id, p.cnt " +
+                    "from workers w " +
+                    "left join (select worker_id, count(1) as cnt from parse_queue " +
+                    "where status=0 group by worker_id)" +
+                    " p on p.worker_id=w.worker_id order by p.cnt desc";
+            getQueueDistribution = connection.prepareStatement(sql);
+            //find workers that have assigned tasks but are not in the
+            //workers table
+            sql = "select p.worker_id, count(1) as cnt from parse_queue p " +
+                    "left join workers w on p.worker_id=w.worker_id " +
+                    "where w.worker_id is null group by p.worker_id";
+            findMissingWorkers = connection.prepareStatement(sql);
+
+            sql = "update parse_queue set worker_id=? where worker_id=?";
+            allocateNonworkersToWorkers = connection.prepareStatement(sql);
+
+            //current strategy reallocate tasks from longest queue to shortest
+            //TODO: might consider randomly shuffling or other algorithms
+            sql = "update parse_queue set worker_id= ? where id in " +
+                    "(select id from parse_queue where " +
+                    "worker_id = ? and " +
+                    "rand() < 0.8 " +
+                    "and status=0 for update)";
+            reallocate = connection.prepareStatement(sql);
+
+            sql = "select count(1) from parse_queue where status="
+                    + AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal();
+            countAvailableTasks = connection.prepareStatement(sql);
+
+            sql = "insert into workers_shutdown (worker_id) values (?)";
+            insertWorkersShutdown = connection.prepareStatement(sql);
+        }
+
+        @Override
+        public Integer call() throws Exception {
+
+            while (true) {
+                List<Integer> missingWorkers = getMissingWorkers();
+                reallocateFromMissingWorkers(missingWorkers);
+                redistribute();
+                if (isComplete()) {
+                    notifyWorkers();
+                    return 1;
+                }
+                Thread.sleep(100);
+            }
+        }
+
+        private void notifyWorkers() throws SQLException {
+            for (int workerId : getActiveWorkers(connection)) {
+                insertWorkersShutdown.clearParameters();
+                insertWorkersShutdown.setInt(1, workerId);
+                insertWorkersShutdown.execute();
+            }
+        }
+
+        private boolean isComplete() throws SQLException {
+            if (! enqueuer.isComplete) {
+                return false;
+            }
+            try (ResultSet rs = countAvailableTasks.executeQuery()) {
+                while (rs.next()) {
+                    return rs.getInt(1) == 0;
+                }
+            }
+            return false;
+        }
+
+        private void redistribute() throws SQLException {
+            //parallel lists of workerid = task queue size
+            List<Integer> workerIds = new ArrayList<>();
+            List<Integer> queueSize = new ArrayList<>();
+            int totalTasks = 0;
+
+            try (ResultSet rs = getQueueDistribution.executeQuery()) {
+                while (rs.next()) {
+                    int workerId = rs.getInt(1);
+                    int numTasks = rs.getInt(2);
+                    workerIds.add(workerId);
+                    queueSize.add(numTasks);
+                    LOG.debug("workerId: ({}) numTasks: ({})", workerId, numTasks);
+                    totalTasks += numTasks;
+                }
+            }
+            if (workerIds.size() == 0) {
+                return;
+            }
+            int averagePerWorker = Math.round((float) totalTasks / (float) workerIds.size());
+            int midPoint = Math.round((float) queueSize.size() / 2) + 1;
+            for (int i = queueSize.size() - 1, j = 0; i > midPoint && j < midPoint; i--, j++) {
+                int shortestQueue = queueSize.get(i);
+                int longestQueue = queueSize.get(j);
+                if ((shortestQueue < 5 && longestQueue > 5) ||
+                        longestQueue > 5 && longestQueue > (int) (1.5 * averagePerWorker)) {
+                    int shortestQueueWorker = workerIds.get(i);
+                    int longestQueueWorker = workerIds.get(j);
+                    reallocate.clearParameters();
+                    reallocate.setLong(1, shortestQueueWorker);
+                    reallocate.setLong(2, longestQueueWorker);
+                    reallocate.execute();
+                }
+            }
+
+        }
+
+        private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+
+            if (missingWorkers.size() == 0) {
+                return;
+            }
+
+            List<Integer> activeWorkers = getActiveWorkers(connection);
+            if (activeWorkers.size() == 0) {
+                return;
+            }
+
+            for (int missing : missingWorkers) {
+                int active = activeWorkers.get(random.nextInt(activeWorkers.size()));
+                allocateNonworkersToWorkers.clearParameters();
+                allocateNonworkersToWorkers.setInt(1, active);
+                allocateNonworkersToWorkers.setInt(2, missing);
+                allocateNonworkersToWorkers.execute();
+                LOG.debug("allocating missing working ({}) to ({})",
+                        missing, active);
+            }
+        }
+
+        private List<Integer> getMissingWorkers() throws SQLException {
+            List<Integer> missingWorkers = new ArrayList<>();
+            try (ResultSet rs = findMissingWorkers.executeQuery()) {
+                while (rs.next()) {
+                    int workerId = rs.getInt(1);
+                    missingWorkers.add(workerId);
+                    LOG.debug("Worker ({}) no longer active", workerId);
+                }
+            }
+            return missingWorkers;
+        }
+    }
+
+    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+        PreparedStatement findActiveWorkers = connection.prepareStatement(
+                "select worker_id from workers");
+        List<Integer> workers = new ArrayList<>();
+        try (ResultSet rs = findActiveWorkers.executeQuery()) {
+            while (rs.next()) {
+                workers.add(rs.getInt(1));
+            }
+        }
+        return workers;
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java
new file mode 100644
index 0000000..d7e058d
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncData.java
@@ -0,0 +1,20 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+
+import java.util.List;
+
+public class AsyncData extends EmitData {
+
+    private final AsyncTask asyncTask;
+
+    public AsyncData(AsyncTask asyncTask, List<Metadata> metadataList) {
+        super(asyncTask.getEmitKey(), metadataList);
+        this.asyncTask = asyncTask;
+    }
+
+    public AsyncTask getAsyncTask() {
+        return asyncTask;
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
new file mode 100644
index 0000000..02d7fec
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
@@ -0,0 +1,8 @@
+package org.apache.tika.pipes.async;
+
+public interface AsyncEmitHook {
+
+    void onSuccess(AsyncTask task);
+
+    void onFail(AsyncTask task);
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
new file mode 100644
index 0000000..22ede76
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -0,0 +1,166 @@
+package org.apache.tika.pipes.async;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Worker thread that takes EmitData off the queue, batches it
+ * and tries to emit it as a batch
+ */
+public class AsyncEmitter implements Callable<Integer> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
+
+    private final EmitterManager emitterManager;
+    private final AsyncEmitHook asyncEmitHook;
+    private final long emitWithinMs;
+    private final long emitMaxBytes;
+    private final EmitDataCache cache;
+    ArrayBlockingQueue<AsyncData> dataQueue = new ArrayBlockingQueue<>(1000);
+
+    Instant lastEmitted = Instant.now();
+
+    public AsyncEmitter(EmitterManager emitterManager, AsyncEmitHook asyncEmitHook,
+                        long emitWithinMs, long emitMaxBytes) {
+        this.emitterManager = emitterManager;
+        this.asyncEmitHook = asyncEmitHook;
+        this.emitWithinMs = emitWithinMs;
+        this.emitMaxBytes = emitMaxBytes;
+        this.cache = new EmitDataCache();
+    }
+
+    public boolean emit(AsyncData asyncData, long pollMs)
+            throws InterruptedException {
+        if (asyncData == null) {
+            return true;
+        }
+        return dataQueue.offer(asyncData, pollMs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        while (true) {
+            AsyncData asyncData = dataQueue.poll(100, TimeUnit.MILLISECONDS);
+            if (asyncData != null) {
+                cache.add(asyncData);
+            }
+            long elapsed = ChronoUnit.MILLIS.between(lastEmitted, Instant.now());
+            if (elapsed > emitWithinMs) {
+                LOG.debug("{} elapsed > {}, going to emitAll",
+                        elapsed, emitWithinMs);
+                //this can block for a bit
+                emitAll();
+            }
+        }
+    }
+
+    public void emitAll() {
+        cache.emitAll();
+    }
+
+    private class EmitDataCache {
+
+        long estimatedSize = 0;
+        int size = 0;
+        Map<String, List<AsyncData>> map = new HashMap<>();
+
+
+        void updateEstimatedSize(long newBytes) {
+            estimatedSize += newBytes;
+        }
+
+        synchronized void add(AsyncData data) {
+            size++;
+            long sz = AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), data.getMetadataList());
+            if (estimatedSize + sz > emitMaxBytes) {
+                LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
+                        (estimatedSize + sz), emitMaxBytes);
+                emitAll();
+            }
+            List<AsyncData> cached = map.get(data.getEmitKey().getEmitterName());
+            if (cached == null) {
+                cached = new ArrayList<>();
+                map.put(data.getEmitKey().getEmitterName(), cached);
+            }
+            updateEstimatedSize(sz);
+            cached.add(data);
+        }
+
+        private synchronized void emitAll() {
+            int emitted = 0;
+            LOG.debug("about to emit all {}", size);
+            for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
+                Emitter emitter = emitterManager.getEmitter(e.getKey());
+                tryToEmit(emitter, e.getKey(), e.getValue());
+                emitted += e.getValue().size();
+            }
+            LOG.debug("emitted: {}", emitted);
+            estimatedSize = 0;
+            size = 0;
+            map.clear();
+            lastEmitted = Instant.now();
+        }
+
+        private void tryToEmit(Emitter emitter, String emitterName, List<AsyncData> cachedEmitData) {
+            if (emitter == null) {
+                LOG.error("Can't find emitter '{}' in TikaConfig!", emitterName);
+            }
+            List<EmitData> emitData = new ArrayList<>();
+            Set<AsyncTask> asyncTasks = new HashSet<>();
+            for (AsyncData d : cachedEmitData) {
+                emitData.add(new EmitData(d.getAsyncTask().getEmitKey(), d.getMetadataList()));
+                asyncTasks.add(d.getAsyncTask());
+            }
+            try {
+                emitter.emit(emitData);
+                for (AsyncData d : cachedEmitData) {
+                    asyncEmitHook.onSuccess(d.getAsyncTask());
+                }
+            } catch (IOException | TikaEmitterException e) {
+                e.printStackTrace();
+                for (AsyncData d : cachedEmitData) {
+                    asyncEmitHook.onFail(d.getAsyncTask());
+                }
+                e.printStackTrace();
+                LOG.warn("emitter class ({}): {}", emitter.getClass(),
+                        ExceptionUtils.getStackTrace(e));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
new file mode 100644
index 0000000..f8665d7
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
@@ -0,0 +1,45 @@
+package org.apache.tika.pipes.async;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class AsyncPipesEmitHook implements AsyncEmitHook {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncPipesEmitHook.class);
+
+    private final PreparedStatement markSuccess;
+    private final PreparedStatement markFailure;
+
+    public AsyncPipesEmitHook(Connection connection) throws SQLException  {
+        String sql = "delete from parse_queue where id=?";
+        markSuccess = connection.prepareStatement(sql);
+        //TODO --fix this
+        markFailure = connection.prepareStatement(sql);
+    }
+
+    @Override
+    public void onSuccess(AsyncTask task) {
+        try {
+            markSuccess.clearParameters();
+            markSuccess.setLong(1, task.getTaskId());
+            markSuccess.execute();
+        } catch (SQLException e) {
+            LOG.warn("problem with on success: "+task.getTaskId(), e);
+        }
+    }
+
+    @Override
+    public void onFail(AsyncTask task) {
+        try {
+            markFailure.clearParameters();
+            markFailure.setLong(1, task.getTaskId());
+            markFailure.execute();
+        } catch (SQLException e) {
+            LOG.warn("problem with on fail: "+task.getTaskId(), e);
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
new file mode 100644
index 0000000..2770abb
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -0,0 +1,36 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+
+public class AsyncTask extends FetchEmitTuple {
+
+    public static final AsyncTask SHUTDOWN_SEMAPHORE
+            = new AsyncTask(-1, (short)-1, new FetchEmitTuple(null, null, null));
+
+    private final long taskId;
+    private final short retry;
+
+    public AsyncTask(long taskId, short retry,
+                     FetchEmitTuple fetchEmitTuple) {
+        super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(), fetchEmitTuple.getMetadata());
+        this.taskId = taskId;
+        this.retry = retry;
+    }
+
+    public long getTaskId() {
+        return taskId;
+    }
+
+    public short getRetry() {
+        return retry;
+    }
+
+    @Override
+    public String toString() {
+        return "AsyncTask{" +
+                "taskId=" + taskId +
+                ", retry=" + retry +
+                '}';
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
new file mode 100644
index 0000000..4f51e7a
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -0,0 +1,171 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.ProcessUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This controls monitoring of the AsyncWorkerProcess
+ * and updates to the db on crashes etc.
+ */
+public class AsyncWorker implements Callable<Integer> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncWorker.class);
+
+
+    private final String connectionString;
+    private final int workerId;
+    private final Path tikaConfigPath;
+    private final Connection connection;
+    private final PreparedStatement delete;
+    private final PreparedStatement selectActiveTasks;
+    private final PreparedStatement insertErrorLog;
+    private final PreparedStatement resetStatus;
+
+    public AsyncWorker(Connection connection,
+                       String connectionString, int workerId,
+                       Path tikaConfigPath) throws SQLException {
+        this.connectionString = connectionString;
+        this.workerId = workerId;
+        this.tikaConfigPath = tikaConfigPath;
+        this.connection = connection;
+        String sql = "delete from workers where worker_id = (" + workerId + ")";
+        delete = connection.prepareStatement(sql);
+
+        //this checks if the process was able to reset the status
+        sql = "select id, retry, json from parse_queue where worker_id="
+                + workerId +
+                " and status=" + AsyncWorkerProcess.STATUS_CODES.IN_PROCESS.ordinal();
+        selectActiveTasks = connection.prepareStatement(sql);
+
+        //if not, this is called to insert into the error log
+        insertErrorLog = prepareInsertErrorLog(connection);
+
+        //and this is called to reset the status on error
+        resetStatus = prepareReset(connection);
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        Process p = null;
+        try {
+            p = start();
+            int restarts = 0;
+            while (restarts++ < 2) {
+                boolean finished = p.waitFor(60, TimeUnit.SECONDS);
+                if (finished) {
+                    int exitValue = p.exitValue();
+                    if (exitValue == 0) {
+                        LOG.info("child process finished with exitValue=0");
+                        return 1;
+                    }
+                    reportCrash(exitValue);
+                    p = start();
+                }
+            }
+        } finally {
+            if (p != null) {
+                p.destroyForcibly();
+            }
+            delete.execute();
+        }
+        return -1 * workerId;
+    }
+
+    private Process start() throws IOException {
+        String[] args = new String[]{
+                "java", "-Djava.awt.headless=true",
+                "-cp", System.getProperty("java.class.path"),
+                "org.apache.tika.pipes.async.AsyncWorkerProcess",
+                connectionString, Integer.toString(workerId),
+                ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString())
+        };
+        ProcessBuilder pb = new ProcessBuilder(args);
+        pb.inheritIO();
+        return pb.start();
+    }
+    private void reportCrash(int exitValue) throws SQLException, IOException {
+        LOG.warn("worker id={} terminated, exitValue={}",
+                workerId, exitValue);
+        delete.execute();
+        List<AsyncTask> activeTasks = new ArrayList<>();
+        try (ResultSet rs = selectActiveTasks.executeQuery()) {
+            long taskId = rs.getLong(1);
+            short retry = rs.getShort(2);
+            String json = rs.getString(3);
+            FetchEmitTuple tuple = JsonFetchEmitTuple.fromJson(new StringReader(json));
+            activeTasks.add(new AsyncTask(taskId, retry, tuple));
+        }
+        if (activeTasks.size() == 0) {
+            LOG.info("worker reset active tasks, nothing extra to report");
+            return;
+        }
+        if (activeTasks.size() > 1) {
+            LOG.warn("more than one active task? this should never happen!");
+        }
+
+        for (AsyncTask t : activeTasks) {
+            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN,
+                    insertErrorLog, resetStatus, LOG);
+        }
+
+    }
+
+    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
+                             PreparedStatement insertErrorLog, PreparedStatement resetStatus,
+                             Logger logger) {
+        try {
+            insertErrorLog.clearParameters();
+            insertErrorLog.setLong(1, task.getTaskId());
+            insertErrorLog.setString(2, task.getFetchKey().getKey());
+            insertErrorLog.setInt(3, task.getRetry());
+            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
+            insertErrorLog.execute();
+        } catch (SQLException e) {
+            logger.error("Can't update error log", e);
+        }
+
+        try {
+            resetStatus.clearParameters();
+            resetStatus.setByte(1, (byte) AsyncWorkerProcess.STATUS_CODES.AVAILABLE.ordinal());
+            resetStatus.setShort(2, (short)(task.getRetry()+1));
+            resetStatus.setLong(3, task.getTaskId());
+            resetStatus.execute();
+        } catch (SQLException e) {
+            logger.error("Can't reset try status", e);
+        }
+    }
+
+    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
+        //if not, this is called to insert into the error log
+        return connection.prepareStatement(
+                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
+                " values (?,?,CURRENT_TIMESTAMP(),?,?)"
+        );
+    }
+
+    static PreparedStatement prepareReset(Connection connection) throws SQLException {
+        //and this is called to reset the status on error
+        return connection.prepareStatement(
+                "update parse_queue set " +
+                        "status=?, " +
+                        "time_stamp=CURRENT_TIMESTAMP(), " +
+                        "retry=? " +
+                        "where id=?");
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
new file mode 100644
index 0000000..8273c5c
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -0,0 +1,474 @@
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
+import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
+
+public class AsyncWorkerProcess {
+
+    enum STATUS_CODES {
+        AVAILABLE,
+        SELECTED,
+        IN_PROCESS,
+    }
+
+    enum ERROR_CODES {
+        TIMEOUT,
+        SECURITY_EXCEPTION,
+        OTHER_EXCEPTION,
+        OOM,
+        OTHER_ERROR,
+        UNKNOWN
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerProcess.class);
+
+    //make these all configurable
+    private static final long SHUTDOWN_AFTER_MS = 120000;
+    private static long PULSE_MS = 1000;
+    private long emitWithinMs = 1000;
+    private long emitMaxBytes = 10_000_000;
+    private long parseTimeoutMs = 60000;
+
+    public static void main(String[] args) throws Exception {
+        String db = args[0];
+        int workerId = Integer.parseInt(args[1]);
+        TikaConfig tikaConfig = new TikaConfig(Paths.get(args[2]));
+        LOG.debug("trying to get connection {} >{}<", workerId, db);
+        try (Connection connection = DriverManager.getConnection(db)) {
+            AsyncWorkerProcess asyncWorker = new AsyncWorkerProcess();
+            asyncWorker.execute(connection, workerId, tikaConfig);
+        }
+        System.exit(0);
+    }
+
+    private void execute(Connection connection,
+                         int workerId, TikaConfig tikaConfig) throws SQLException {
+
+        AsyncEmitHook asyncEmitHook = new AsyncPipesEmitHook(connection);
+        AsyncEmitter asyncEmitter = new AsyncEmitter(tikaConfig.getEmitterManager(),
+                asyncEmitHook,
+                emitWithinMs, emitMaxBytes);
+
+        ExecutorService service = Executors.newFixedThreadPool(3);
+        ExecutorCompletionService<Integer> executorCompletionService =
+                new ExecutorCompletionService<>(service);
+
+        executorCompletionService.submit(new Worker(connection, workerId, asyncEmitter,
+                tikaConfig, parseTimeoutMs));
+        executorCompletionService.submit(asyncEmitter);
+        executorCompletionService.submit(new ForkWatcher(System.in));
+
+        int completed = 0;
+
+        //if either one stops, we need to stop
+        try {
+            while (completed < 1) {
+                Future<Integer> future = executorCompletionService.poll(60, TimeUnit.SECONDS);
+                if (future != null) {
+                    completed++;
+                    future.get();
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error("worker " + workerId + " had a mainloop exception", e);
+        } finally {
+            service.shutdownNow();
+            asyncEmitter.emitAll();
+        }
+        return;
+    }
+
+    private static class TaskQueue {
+        private final Connection connection;
+        private final int workerId;
+
+        private final PreparedStatement markForSelecting;
+        private final PreparedStatement selectForProcessing;
+        private final PreparedStatement markForProcessing;
+        private final PreparedStatement checkForShutdown;
+
+        TaskQueue(Connection connection, int workerId) throws SQLException {
+            this.connection = connection;
+            this.workerId = workerId;
+
+            String sql = "update parse_queue set status=" +
+                    STATUS_CODES.SELECTED.ordinal()+
+                    " where id = " +
+                    " (select id from parse_queue where worker_id = " + workerId +
+                    " and status="+STATUS_CODES.AVAILABLE.ordinal()+
+                    " order by time_stamp asc limit 1 for update)";
+            markForSelecting = connection.prepareStatement(sql);
+            sql = "select id, retry, json from parse_queue where status=" +
+                    STATUS_CODES.SELECTED.ordinal() +
+                    " and " +
+                    " worker_id=" + workerId +
+                    " order by time_stamp asc limit 1";
+            selectForProcessing = connection.prepareStatement(sql);
+            sql = "update parse_queue set status="+
+                    STATUS_CODES.IN_PROCESS.ordinal()+
+                    " where id=?";
+            markForProcessing = connection.prepareStatement(sql);
+
+            sql = "select count(1) from workers_shutdown where worker_id=" + workerId;
+            checkForShutdown = connection.prepareStatement(sql);
+        }
+
+        AsyncTask poll(long pollMs) throws InterruptedException, IOException, SQLException {
+            long start = System.currentTimeMillis();
+            long elapsed = System.currentTimeMillis() - start;
+            while (elapsed < pollMs) {
+
+                if (shouldShutdown()) {
+                    return SHUTDOWN_SEMAPHORE;
+                }
+
+                int i = markForSelecting.executeUpdate();
+                if (i == 0) {
+                    //debugQueue();
+                    Thread.sleep(PULSE_MS);
+                } else {
+                    long taskId = -1;
+                    short retry = -1;
+                    String json = "";
+                    try (ResultSet rs = selectForProcessing.executeQuery()) {
+                        while (rs.next()) {
+                            taskId = rs.getLong(1);
+                            retry = rs.getShort(2);
+                            json = rs.getString(3);
+                        }
+                    }
+                    markForProcessing.clearParameters();
+                    markForProcessing.setLong(1, taskId);
+                    markForProcessing.execute();
+
+                    FetchEmitTuple t = null;
+                    try (Reader reader = new StringReader(json)) {
+                        t = JsonFetchEmitTuple.fromJson(reader);
+                    }
+                    AsyncTask task = new AsyncTask(taskId, retry, t);
+                    return task;
+                }
+                elapsed = System.currentTimeMillis() - start;
+            }
+            return null;
+        }
+
+        private void debugQueue() throws SQLException {
+            try (ResultSet rs = connection.createStatement().executeQuery(
+                    "select * from parse_queue limit 10")) {
+                while (rs.next()) {
+                    for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
+                        System.out.print(rs.getString(i)+ " ");
+                    }
+                    System.out.println("");
+                }
+            }
+        }
+
+        boolean shouldShutdown() throws SQLException {
+            try (ResultSet rs = checkForShutdown.executeQuery()) {
+                if (rs.next()) {
+                    int val = rs.getInt(1);
+                    return val > 0;
+                }
+            }
+            return false;
+        }
+    }
+
+
+    private static class Worker implements Callable<Integer> {
+
+        private final Connection connection;
+        private final int workerId;
+        private final AsyncEmitter asyncEmitter;
+        private final RecursiveParserWrapper parser;
+        private final TikaConfig tikaConfig;
+        private final long parseTimeoutMs;
+        private ExecutorService executorService;
+        private ExecutorCompletionService<AsyncData> executorCompletionService;
+        private final PreparedStatement insertErrorLog;
+        private final PreparedStatement resetStatus;
+
+
+        public Worker(Connection connection,
+                      int workerId,
+                      AsyncEmitter asyncEmitter,
+                      TikaConfig tikaConfig, long parseTimeoutMs) throws SQLException {
+            this.connection = connection;
+            this.workerId = workerId;
+            this.asyncEmitter = asyncEmitter;
+            this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
+            this.tikaConfig = tikaConfig;
+            this.executorService = Executors.newFixedThreadPool(1);
+            this.executorCompletionService = new ExecutorCompletionService<>(executorService);
+            this.parseTimeoutMs = parseTimeoutMs;
+            String sql = "insert into workers (worker_id) values (" + workerId + ")";
+            connection.createStatement().execute(sql);
+            insertErrorLog = prepareInsertErrorLog(connection);
+            resetStatus = prepareReset(connection);
+        }
+
+
+        public Integer call() throws Exception {
+            AsyncTask task = null;
+            try {
+
+                TaskQueue queue = new TaskQueue(connection, workerId);
+
+                long lastProcessed = System.currentTimeMillis();
+
+                while (true) {
+
+                    task = queue.poll(1000);
+                    if (task == null) {
+                        long elapsed = System.currentTimeMillis() - lastProcessed;
+                        if (elapsed > SHUTDOWN_AFTER_MS) {
+                            LOG.debug("shutting down after no assignments in {}ms", elapsed);
+                            return 1;
+                        }
+                    } else if (task == SHUTDOWN_SEMAPHORE) {
+                        break;
+                    } else {
+                        processTask(task);
+                        lastProcessed = System.currentTimeMillis();
+                    }
+                }
+            } catch (TimeoutException e) {
+                e.printStackTrace();
+                LOG.warn(task.getFetchKey().getKey(), e);
+                reportAndReset(task, ERROR_CODES.TIMEOUT,
+                        insertErrorLog, resetStatus, LOG);
+            } catch (SecurityException e) {
+                e.printStackTrace();
+                LOG.warn(task.getFetchKey().getKey(), e);
+                reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION,
+                        insertErrorLog, resetStatus, LOG);
+            } catch (Exception e) {
+                e.printStackTrace();
+                LOG.warn(task.getFetchKey().getKey(), e);
+                reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION,
+                        insertErrorLog, resetStatus, LOG);
+            } catch (OutOfMemoryError e) {
+                e.printStackTrace();
+                LOG.warn(task.getFetchKey().getKey(), e);
+                reportAndReset(task, ERROR_CODES.OOM,
+                        insertErrorLog, resetStatus, LOG);
+            } catch (Error e) {
+                e.printStackTrace();
+                LOG.warn(task.getFetchKey().getKey(), e);
+                reportAndReset(task, ERROR_CODES.OTHER_ERROR,
+                        insertErrorLog, resetStatus, LOG);
+            } finally {
+                asyncEmitter.emitAll();
+                executorService.shutdownNow();
+                return 1;
+            }
+        }
+
+        private void processTask(AsyncTask task) throws Exception {
+
+            if (task == SHUTDOWN_SEMAPHORE) {
+                LOG.debug("received shutdown notification");
+                return;
+            } else {
+                executorCompletionService.submit(new TaskProcessor(task, tikaConfig, parser,
+                        workerId));
+                Future<AsyncData> future = executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
+                if (future == null) {
+                    handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+                } else {
+                    AsyncData asyncData = future.get(1000, TimeUnit.MILLISECONDS);
+                    if (asyncData == null) {
+                        handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+                    }
+                    boolean shouldEmit = checkForParseException(asyncData);
+                    if (shouldEmit) {
+                        boolean offered = asyncEmitter.emit(asyncData, 600000);//parameterize this
+                        if (!offered) {
+                            //TODO: deal with this
+                            LOG.warn("Failed to add ({}) " +
+                                            "to emit queue after 10 minutes.",
+                                    task.getFetchKey().getKey());
+                        }
+                    }
+                }
+            }
+        }
+
+        private void handleTimeout(long taskId, String key) throws TimeoutException {
+            LOG.warn("timeout taskid:{} fetchKey:{}", taskId, key);
+            throw new TimeoutException(key);
+        }
+
+
+        private boolean checkForParseException(AsyncData asyncData) {
+            if (asyncData == null || asyncData.getMetadataList() == null ||
+                    asyncData.getMetadataList().size() == 0) {
+                LOG.warn("empty or null emit data ({})", asyncData.getAsyncTask()
+                        .getFetchKey().getKey());
+                return false;
+            }
+            boolean shouldEmit = true;
+            Metadata container = asyncData.getMetadataList().get(0);
+            String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
+            if (stack != null) {
+                LOG.warn("fetchKey ({}) container parse exception ({})",
+                        asyncData.getAsyncTask().getFetchKey().getKey(), stack);
+                if (asyncData.getAsyncTask().getOnParseException()
+                        == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+                    shouldEmit = false;
+                }
+            }
+
+            for (int i = 1; i < asyncData.getMetadataList().size(); i++) {
+                Metadata m = asyncData.getMetadataList().get(i);
+                String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
+                if (embeddedStack != null) {
+                    LOG.warn("fetchKey ({}) embedded parse exception ({})",
+                            asyncData.getAsyncTask().getFetchKey().getKey(), embeddedStack);
+                }
+            }
+            return shouldEmit;
+        }
+    }
+
+    private static class TaskProcessor implements Callable<AsyncData> {
+
+        private final AsyncTask task;
+        private final Parser parser;
+        private final TikaConfig tikaConfig;
+        private final int workerId;
+
+        public TaskProcessor(AsyncTask task,
+                             TikaConfig tikaConfig,
+                             Parser parser, int workerId) {
+            this.task = task;
+            this.parser = parser;
+            this.tikaConfig = tikaConfig;
+            this.workerId = workerId;
+        }
+
+        public AsyncData call() throws Exception {
+            Metadata userMetadata = task.getMetadata();
+            Metadata metadata = new Metadata();
+            String fetcherName = task.getFetchKey().getFetcherName();
+            String fetchKey = task.getFetchKey().getKey();
+            List<Metadata> metadataList = null;
+            try (InputStream stream = tikaConfig.getFetcherManager()
+                    .getFetcher(fetcherName)
+                    .fetch(fetchKey, metadata)) {
+                metadataList = parseMetadata(task.getFetchKey(),
+                        stream,
+                        metadata);
+            } catch (SecurityException e) {
+                throw e;
+            }
+
+            injectUserMetadata(userMetadata, metadataList);
+            EmitKey emitKey = task.getEmitKey();
+            if (StringUtils.isBlank(emitKey.getKey())) {
+                emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+                task.setEmitKey(emitKey);
+            }
+            return new AsyncData(task, metadataList);
+        }
+
+        private List<Metadata> parseMetadata(FetchKey fetchKey,
+                                             InputStream stream, Metadata metadata) {
+            //make these configurable
+            BasicContentHandlerFactory.HANDLER_TYPE type =
+                    BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+            int writeLimit = -1;
+            int maxEmbeddedResources = 1000;
+
+            RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
+                    new BasicContentHandlerFactory(type, writeLimit), maxEmbeddedResources,
+                    tikaConfig.getMetadataFilter());
+            ParseContext parseContext = new ParseContext();
+            try {
+                parser.parse(stream, handler, metadata, parseContext);
+            } catch (SAXException e) {
+                LOG.warn("problem:" + fetchKey.getKey(), e);
+            } catch (EncryptedDocumentException e) {
+                LOG.warn("encrypted:" + fetchKey.getKey(), e);
+            } catch (SecurityException e) {
+                LOG.warn("security exception: " + fetchKey.getKey());
+                throw e;
+            } catch (Exception e) {
+                LOG.warn("exception: " + fetchKey.getKey());
+            } catch (OutOfMemoryError e) {
+                LOG.error("oom: " + fetchKey.getKey());
+                throw e;
+            }
+            return handler.getMetadataList();
+        }
+
+        private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
+            for (String n : userMetadata.names()) {
+                //overwrite whatever was there
+                metadataList.get(0).set(n, null);
+                for (String val : userMetadata.getValues(n)) {
+                    metadataList.get(0).add(n, val);
+                }
+            }
+        }
+    }
+
+    private static class ForkWatcher implements Callable<Integer> {
+        private final InputStream in;
+        public ForkWatcher(InputStream in) {
+            this.in = in;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            //this should block forever
+            //if the forking process dies,
+            // this will either throw an IOException or read -1.
+            int i = in.read();
+            LOG.info("forking process notified forked to shutdown");
+            return 1;
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/main/resources/log4j.properties b/tika-pipes/tika-pipes-app/src/main/resources/log4j.properties
new file mode 100644
index 0000000..585b03b
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#info,debug, error,fatal ...
+log4j.rootLogger=debug,stderr
+
+#console
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.Target=System.err
+
+log4j.appender.stderr.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
diff --git a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
new file mode 100644
index 0000000..efb5d3b
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/AsyncCliTest.java
@@ -0,0 +1,14 @@
+package org.apache.tika.pipes.driver;
+
+import org.apache.tika.pipes.async.AsyncCli;
+import org.junit.Test;
+
+public class AsyncCliTest {
+    @Test
+    public void testBasic() throws Exception {
+        String[] args = {
+                "/Users/allison/Desktop/tika-tmp/tika-config.xml"
+        };
+        AsyncCli.main(args);
+    }
+}
diff --git a/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
new file mode 100644
index 0000000..2fb38df
--- /dev/null
+++ b/tika-pipes/tika-pipes-app/src/test/java/org/apache/tika/pipes/driver/TestPipesDriver.java
@@ -0,0 +1,118 @@
+package org.apache.tika.pipes.driver;
+
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestPipesDriver {
+
+    static Path TMP_DIR;
+    static Path DB;
+
+    static AtomicInteger PROCESSED = new AtomicInteger(0);
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TMP_DIR = Files.createTempDirectory("pipes-driver-");
+        DB = Files.createTempFile(TMP_DIR, "", "");
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        FileUtils.deleteDirectory(TMP_DIR.toFile());
+    }
+
+
+    @Test
+    public void testQueue() throws Exception {
+        int numThreads = 20;
+        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10000 + numThreads);
+        for (int i = 0; i < 10000; i++) {
+            queue.add(1);
+        }
+        for (int i = 0; i < numThreads; i++) {
+            queue.offer(-1);
+        }
+        ExecutorService service = Executors.newFixedThreadPool(numThreads);
+        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(service);
+
+        long start = System.currentTimeMillis();
+        executorCompletionService.submit(new Watcher(queue));
+        for (int i = 0; i < numThreads; i++) {
+            executorCompletionService.submit(new QueueWorker(queue));
+        }
+        int finished = 0;
+        while (finished++ < numThreads) {
+            executorCompletionService.take();
+        }
+        long elapsed = System.currentTimeMillis() - start;
+        System.out.println("elapsed: " + elapsed);
+        service.shutdownNow();
+    }
+
+    private static class Watcher implements Callable<Integer> {
+        private final ArrayBlockingQueue<Integer> queue;
+
+        Watcher(ArrayBlockingQueue<Integer> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            long start = System.currentTimeMillis();
+            while (true) {
+                long elapsed = System.currentTimeMillis() - start;
+                Thread.sleep(1000);
+            }
+        }
+    }
+
+    private static class QueueWorker implements Callable<Integer> {
+        static AtomicInteger counter = new AtomicInteger(0);
+
+
+        private final int id;
+        private final ArrayBlockingQueue<Integer> queue;
+
+        QueueWorker(ArrayBlockingQueue<Integer> queue) {
+            id = counter.incrementAndGet();
+            this.queue = queue;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            while (true) {
+                Integer val = queue.poll(1, TimeUnit.SECONDS);
+                if (val != null) {
+                    if (val < 0) {
+                        return 1;
+                    } else {
+                        long sleep = id * 100;
+                        Thread.sleep(sleep);
+                    }
+                }
+            }
+        }
+    }
+}


[tika] 03/04: Avoid reporting of temporary ocr-based mime type in xhtml output

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit e5a6039f0285dc92a879f7a4c72b5bc4bdfc236b
Author: tballison <ta...@apache.org>
AuthorDate: Thu Feb 18 17:41:37 2021 -0500

    Avoid reporting of temporary ocr-based mime type in xhtml output
---
 .../java/org/apache/tika/parser/ocr/TesseractOCRParser.java  | 12 ++++++++++++
 .../org/apache/tika/parser/ocr/TesseractOCRParserTest.java   |  3 ++-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java b/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
index 3705f99..9d59d2b 100644
--- a/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
+++ b/tika-parsers/tika-parsers-classic/tika-parsers-classic-modules/tika-parser-ocr-module/src/main/java/org/apache/tika/parser/ocr/TesseractOCRParser.java
@@ -26,6 +26,7 @@ import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.HttpHeaders;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.Property;
 import org.apache.tika.mime.MediaType;
@@ -222,6 +223,17 @@ public class TesseractOCRParser extends AbstractParser implements Initializable
     public void parse(InputStream stream, ContentHandler handler, Metadata metadata, ParseContext parseContext)
             throws IOException, SAXException, TikaException {
 
+        //we have to do this so that the temporary ocr-* content type
+        //doesn't show up in the xhtml. We're correctly resetting
+        //it in the AbstractImageParser, but it gets written to xhtml
+        //in the content.
+        String mediaType = metadata.get(Metadata.CONTENT_TYPE);
+        if (mediaType != null) {
+            MediaType mt = MediaType.parse(mediaType);
+            MediaType nonOcr = new MediaType(mt.getType(),
+                    mt.getSubtype().replace("ocr-", ""));
+            metadata.set(Metadata.CONTENT_TYPE, nonOcr.toString());
+        }
         TesseractOCRConfig userConfig = parseContext.get(TesseractOCRConfig.class);
         TesseractOCRConfig config = defaultConfig;
         if (userConfig != null) {
diff --git a/tika-parsers/tika-parsers-classic/tika-parsers-classic-package/src/test/java/org/apache/tika/parser/ocr/TesseractOCRParserTest.java b/tika-parsers/tika-parsers-classic/tika-parsers-classic-package/src/test/java/org/apache/tika/parser/ocr/TesseractOCRParserTest.java
index e719e7a..a398a9e 100644
--- a/tika-parsers/tika-parsers-classic/tika-parsers-classic-package/src/test/java/org/apache/tika/parser/ocr/TesseractOCRParserTest.java
+++ b/tika-parsers/tika-parsers-classic/tika-parsers-classic-package/src/test/java/org/apache/tika/parser/ocr/TesseractOCRParserTest.java
@@ -163,7 +163,6 @@ public class TesseractOCRParserTest extends TikaTest {
     @Test
     public void testSingleImage() throws Exception {
         Assume.assumeTrue("can run OCR", canRun());
-
         String xml = getXML("testOCR.jpg").xml;
         assertContains("OCR Testing", xml);
         //test metadata extraction
@@ -176,6 +175,8 @@ public class TesseractOCRParserTest extends TikaTest {
         assertContainsCount("<body", xml, 1);
         assertContainsCount("</body", xml, 1);
         assertContainsCount("</html", xml, 1);
+
+        assertNotContained("<meta name=\"Content-Type\" content=\"image/ocr-jpeg\" />", xml);
     }