You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/03/26 21:49:06 UTC

[tika] branch TIKA-3304 updated (5dbffcd -> 9dbbdd3)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git.


    from 5dbffcd  merge from main and required updates/conflict resolution
     new c3ca99a  TIKA-3304 -- basically works...lots more remains
     new 9dbbdd3  checkstyle

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |   2 +-
 .../org/apache/tika/pipes/fetcher/FetchKey.java    |   2 +-
 .../fetchiterator/FileSystemFetchIteratorTest.java |   2 +-
 tika-pipes/pom.xml                                 |  37 ++-
 tika-pipes/tika-emitters/tika-emitter-fs/pom.xml   |  13 +-
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   |  44 ++-
 tika-pipes/tika-emitters/tika-emitter-s3/pom.xml   |  13 +-
 .../apache/tika/pipes/emitter/s3/S3Emitter.java    | 103 +++---
 tika-pipes/tika-emitters/tika-emitter-solr/pom.xml |  13 +-
 .../tika/pipes/emitter/solr/SolrEmitter.java       | 129 ++++----
 .../apache/tika/pipes/emitter/solr/TestBasic.java  |  34 +-
 .../src/test/resources/log4j.properties            |   6 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |  82 ++---
 .../src/test/java/TestCSVFetchIterator.java        |  23 +-
 .../tika-fetch-iterator-jdbc/pom.xml               |  17 +-
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |  82 ++---
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |  66 ++--
 .../src/test/resources/log4j.properties            |   4 +-
 .../tika-fetch-iterator-s3/pom.xml                 |  13 +-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  39 ++-
 .../fetchiterator/s3/TestS3FetchIterator.java      |  17 +-
 .../src/test/resources/log4j.properties            |   4 +-
 tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml |  13 +-
 .../tika/pipes/fetcher/http/HttpFetcher.java       |  33 +-
 .../tika/pipes/fetcher/http/HttpFetcherTest.java   |  50 +--
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    |  70 ++--
 .../tika/pipes/fetcher/s3/TestS3Fetcher.java       |  23 +-
 .../org/apache/tika/client/HttpClientFactory.java  | 147 ++++-----
 .../org/apache/tika/client/HttpClientUtil.java     |  26 +-
 tika-pipes/tika-pipes-async/pom.xml                |   4 +-
 .../java/org/apache/tika/pipes/async/AsyncCli.java | 128 ++++----
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  44 ++-
 .../org/apache/tika/pipes/async/AsyncData.java     |  53 ++-
 .../org/apache/tika/pipes/async/AsyncEmitHook.java |  16 +
 .../org/apache/tika/pipes/async/AsyncEmitter.java  | 130 ++------
 .../tika/pipes/async/AsyncEmitterProcess.java      | 309 +++++++++++------
 .../tika/pipes/async/AsyncPipesEmitHook.java       |  28 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    | 364 ++++++++++++++-------
 .../org/apache/tika/pipes/async/AsyncTask.java     |  40 ++-
 .../org/apache/tika/pipes/async/AsyncWorker.java   | 157 +++++----
 .../tika/pipes/async/AsyncWorkerProcess.java       | 298 ++++++++---------
 .../src/main/resources/log4j.properties            |   4 +-
 .../org/apache/tika/pipes/async/AsyncCliTest.java  |  48 ---
 .../tika/pipes/async/AsyncProcessorTest.java       |  89 +++--
 .../org/apache/tika/pipes/async/MockEmitter.java   |  59 +++-
 .../org/apache/tika/pipes/async/MockFetcher.java   |  33 +-
 .../apache/tika/pipes/async/SerializationTest.java |  49 +++
 .../apache/tika/pipes/async/TestPipesDriver.java   | 109 ------
 .../apache/tika/pipes/PipeIntegrationTests.java    |  81 +++--
 .../src/test/resources/log4j.properties            |   4 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |   2 +-
 .../tika/metadata/serialization/JsonMetadata.java  |  14 +-
 .../serialization/JsonMetadataDeserializer.java    |  28 +-
 .../serialization/JsonMetadataSerializer.java      |  28 +-
 .../tika/server/core/resource/AsyncParser.java     |  11 +-
 .../tika/server/core/resource/AsyncResource.java   |   2 +-
 .../tika/server/core/resource/EmitterResource.java |   8 +-
 57 files changed, 1740 insertions(+), 1507 deletions(-)
 delete mode 100644 tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
 create mode 100644 tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
 delete mode 100644 tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
 copy tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java => tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java (54%)
 copy tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java => tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java (56%)

[tika] 01/02: TIKA-3304 -- basically works...lots more remains

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit c3ca99a8d1032c3011ed8d2fc83dc6f0d3d5c0d9
Author: tballison <ta...@apache.org>
AuthorDate: Fri Mar 26 17:35:30 2021 -0400

    TIKA-3304 -- basically works...lots more remains
---
 pom.xml                                            |   2 +-
 .../org/apache/tika/pipes/fetcher/FetchKey.java    |   2 +-
 .../fetchiterator/FileSystemFetchIteratorTest.java |   2 +-
 tika-pipes/pom.xml                                 |  37 ++-
 tika-pipes/tika-emitters/tika-emitter-fs/pom.xml   |  13 +-
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   |  44 ++-
 tika-pipes/tika-emitters/tika-emitter-s3/pom.xml   |  13 +-
 .../apache/tika/pipes/emitter/s3/S3Emitter.java    | 103 +++---
 tika-pipes/tika-emitters/tika-emitter-solr/pom.xml |  13 +-
 .../tika/pipes/emitter/solr/SolrEmitter.java       | 129 ++++----
 .../apache/tika/pipes/emitter/solr/TestBasic.java  |  34 +-
 .../src/test/resources/log4j.properties            |   6 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |  82 ++---
 .../src/test/java/TestCSVFetchIterator.java        |  23 +-
 .../tika-fetch-iterator-jdbc/pom.xml               |  17 +-
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |  82 ++---
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |  66 ++--
 .../src/test/resources/log4j.properties            |   4 +-
 .../tika-fetch-iterator-s3/pom.xml                 |  13 +-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  39 ++-
 .../fetchiterator/s3/TestS3FetchIterator.java      |  17 +-
 .../src/test/resources/log4j.properties            |   4 +-
 tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml |  13 +-
 .../tika/pipes/fetcher/http/HttpFetcher.java       |  33 +-
 .../tika/pipes/fetcher/http/HttpFetcherTest.java   |  50 +--
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    |  70 ++--
 .../tika/pipes/fetcher/s3/TestS3Fetcher.java       |  23 +-
 .../org/apache/tika/client/HttpClientFactory.java  | 147 ++++-----
 .../org/apache/tika/client/HttpClientUtil.java     |  26 +-
 tika-pipes/tika-pipes-async/pom.xml                |   4 +-
 .../java/org/apache/tika/pipes/async/AsyncCli.java | 128 ++++----
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  44 ++-
 .../org/apache/tika/pipes/async/AsyncData.java     |  53 ++-
 .../org/apache/tika/pipes/async/AsyncEmitHook.java |  16 +
 .../org/apache/tika/pipes/async/AsyncEmitter.java  | 130 ++------
 .../tika/pipes/async/AsyncEmitterProcess.java      | 309 +++++++++++------
 .../tika/pipes/async/AsyncPipesEmitHook.java       |  28 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    | 364 ++++++++++++++-------
 .../org/apache/tika/pipes/async/AsyncTask.java     |  40 ++-
 .../org/apache/tika/pipes/async/AsyncWorker.java   | 157 +++++----
 .../tika/pipes/async/AsyncWorkerProcess.java       | 298 ++++++++---------
 .../src/main/resources/log4j.properties            |   4 +-
 .../org/apache/tika/pipes/async/AsyncCliTest.java  |  48 ---
 .../tika/pipes/async/AsyncProcessorTest.java       |  89 +++--
 .../org/apache/tika/pipes/async/MockEmitter.java   |  59 +++-
 .../org/apache/tika/pipes/async/MockFetcher.java   |  33 +-
 .../apache/tika/pipes/async/SerializationTest.java |  49 +++
 .../apache/tika/pipes/async/TestPipesDriver.java   | 109 ------
 .../apache/tika/pipes/PipeIntegrationTests.java    |  81 +++--
 .../src/test/resources/log4j.properties            |   4 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |   2 +-
 .../tika/metadata/serialization/JsonMetadata.java  |  14 +-
 .../serialization/JsonMetadataDeserializer.java    |  43 +++
 .../serialization/JsonMetadataSerializer.java      |  41 +++
 .../tika/server/core/resource/AsyncParser.java     |  10 +-
 .../tika/server/core/resource/AsyncResource.java   |   2 +-
 .../tika/server/core/resource/EmitterResource.java |   8 +-
 57 files changed, 1794 insertions(+), 1480 deletions(-)

diff --git a/pom.xml b/pom.xml
index 17bc12d..1ad3c85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,11 +37,11 @@
   <modules>
     <module>tika-parent</module>
     <module>tika-core</module>
+    <module>tika-serialization</module>
     <module>tika-pipes</module>
     <module>tika-parsers</module>
     <module>tika-bundles</module>
     <module>tika-xmp</module>
-    <module>tika-serialization</module>
     <module>tika-batch</module>
     <module>tika-langdetect</module>
     <module>tika-app</module>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index deb4232..2c0ea64 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -38,7 +38,7 @@ public class FetchKey {
         return fetcherName;
     }
 
-    public String getKey() {
+    public String getFetchKey() {
         return fetchKey;
     }
 
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index 7d99828..4e314bc 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -61,7 +61,7 @@ public class FileSystemFetchIteratorTest {
 
         Set<String> iteratorSet = new HashSet<>();
         for (FetchEmitTuple p : it) {
-            iteratorSet.add(p.getFetchKey().getKey());
+            iteratorSet.add(p.getFetchKey().getFetchKey());
         }
 
         assertEquals(truthSet, iteratorSet);
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 560ed2d..21a5a49 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -14,7 +14,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <packaging>pom</packaging>
 
@@ -39,4 +40,38 @@
         <module>tika-pipes-integration-tests</module>
     </modules>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${checkstyle.plugin.version}</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>com.puppycrawl.tools</groupId>
+                        <artifactId>checkstyle</artifactId>
+                        <version>8.41</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <configuration>
+                            <configLocation>checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>false</consoleOutput>
+                            <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                            <testSourceDirectories>${project.basedir}/src/test/java</testSourceDirectories>
+                            <violationSeverity>error</violationSeverity>
+                            <failOnViolation>true</failOnViolation>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml b/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
index cf214da..df61b96 100644
--- a/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-emitters</artifactId>
@@ -90,15 +90,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index d4a0448..285142b 100644
--- a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -16,16 +16,6 @@
  */
 package org.apache.tika.pipes.emitter.fs;
 
-import org.apache.tika.config.Field;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.StreamEmitter;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Writer;
@@ -35,9 +25,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
 
 /**
  * Emitter to write to a file system.
@@ -56,7 +52,8 @@ import java.util.Set;
  *                  &lt;param name="basePath" type="string"&gt;/path/to/output&lt;/param&gt;
  *                  &lt;!-- optional; default is 'json' --&gt;
  *                  &lt;param name="fileExtension" type="string"&gt;json&lt;/param&gt;
- *                  &lt;!-- optional; if the file already exists, options ('skip', 'replace', 'exception')
+ *                  &lt;!-- optional; if the file already exists,
+ *                       options ('skip', 'replace', 'exception')
  *                  default is 'exception' --&gt;
  *                  &lt;param name="onExists" type="string"&gt;skip&lt;/param&gt;
  *              &lt;/params&gt;
@@ -66,17 +63,13 @@ import java.util.Set;
  */
 public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter {
 
-    enum ON_EXISTS {
-        SKIP, EXCEPTION, REPLACE
-    }
-
     private Path basePath = null;
     private String fileExtension = "json";
     private ON_EXISTS onExists = ON_EXISTS.EXCEPTION;
 
-
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
         Path output;
         if (metadataList == null || metadataList.size() == 0) {
             throw new TikaEmitterException("metadata list must not be null or of size 0");
@@ -124,15 +117,14 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter
         } else if (onExists.equals("exception")) {
             this.onExists = ON_EXISTS.EXCEPTION;
         } else {
-            throw new IllegalArgumentException(
-                    "Don't understand '" + onExists +
-                            "'; must be one of: 'skip', 'replace', 'exception'");
+            throw new IllegalArgumentException("Don't understand '" + onExists +
+                    "'; must be one of: 'skip', 'replace', 'exception'");
         }
     }
 
     @Override
-    public void emit(String path, InputStream inputStream, Metadata userMetadata) throws IOException,
-            TikaEmitterException {
+    public void emit(String path, InputStream inputStream, Metadata userMetadata)
+            throws IOException, TikaEmitterException {
         Path target = basePath.resolve(path);
 
         if (!Files.isDirectory(target.getParent())) {
@@ -152,4 +144,8 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter
             }
         }
     }
+
+    enum ON_EXISTS {
+        SKIP, EXCEPTION, REPLACE
+    }
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
index 7b7def3..773865c 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-emitters</artifactId>
@@ -137,15 +137,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index 68fc1ee..6f9c745 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -16,15 +16,31 @@
  */
 package org.apache.tika.pipes.emitter.s3;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.SdkClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.http.client.CredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -33,31 +49,13 @@ import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.StreamEmitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.metadata.Metadata;
 import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 /**
  * Emits to existing s3 bucket
@@ -71,17 +69,21 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *                  &lt;!-- required --&gt;
  *                  &lt;param name="region" type="string"&gt;us-east-1&lt;/param&gt;
  *                  &lt;!-- required --&gt;
- *                  &lt;param name="credentialsProvider" type="string"&gt;(profile|instance)&lt;/param&gt;
+ *                  &lt;param name="credentialsProvider"
+ *                       type="string"&gt;(profile|instance)&lt;/param&gt;
  *                  &lt;!-- required if credentialsProvider=profile--&gt;
  *                  &lt;param name="profile" type="string"&gt;my-profile&lt;/param&gt;
  *                  &lt;!-- required --&gt;
  *                  &lt;param name="bucket" type="string"&gt;my-bucket&lt;/param&gt;
- *                  &lt;!-- optional; prefix to add to the path before emitting; default is no prefix --&gt;
+ *                  &lt;!-- optional; prefix to add to the path before emitting;
+ *                       default is no prefix --&gt;
  *                  &lt;param name="prefix" type="string"&gt;my-prefix&lt;/param&gt;
  *                  &lt;!-- optional; default is 'json' this will be added to the SOURCE_PATH
- *                                    if no emitter key is specified. Do not add a "." before the extension --&gt;
+ *                                    if no emitter key is specified. Do not add a "."
+ *                                     before the extension --&gt;
  *                  &lt;param name="fileExtension" type="string"&gt;json&lt;/param&gt;
- *                  &lt;!-- optional; default is 'true'-- whether to copy the json to a local file before putting to s3 --&gt;
+ *                  &lt;!-- optional; default is 'true'-- whether to copy the
+ *                     json to a local file before putting to s3 --&gt;
  *                  &lt;param name="spoolToTemp" type="bool"&gt;true&lt;/param&gt;
  *              &lt;/params&gt;
  *          &lt;/emitter&gt;
@@ -108,15 +110,16 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
      * @throws TikaException
      */
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
         if (metadataList == null || metadataList.size() == 0) {
             throw new TikaEmitterException("metadata list must not be null or of size 0");
         }
 
-        if (! spoolToTemp) {
+        if (!spoolToTemp) {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            try (Writer writer =
-                         new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
+            try (Writer writer = new BufferedWriter(
+                    new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
                 JsonMetadataList.toJson(metadataList, writer);
             } catch (IOException e) {
                 throw new TikaEmitterException("can't jsonify", e);
@@ -129,8 +132,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
             TemporaryResources tmp = new TemporaryResources();
             try {
                 Path tmpPath = tmp.createTempFile();
-                try (Writer writer = Files.newBufferedWriter(tmpPath,
-                        StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
+                try (Writer writer = Files.newBufferedWriter(tmpPath, StandardCharsets.UTF_8,
+                        StandardOpenOption.CREATE)) {
                     JsonMetadataList.toJson(metadataList, writer);
                 } catch (IOException e) {
                     throw new TikaEmitterException("can't jsonify", e);
@@ -145,28 +148,27 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     }
 
     /**
-     *
-     * @param path -- object path, not including the bucket
-     * @param is inputStream to copy
+     * @param path         -- object path, not including the bucket
+     * @param is           inputStream to copy
      * @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata
      * @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception
      */
     @Override
-    public void emit(String path, InputStream is, Metadata userMetadata) throws IOException, TikaEmitterException {
+    public void emit(String path, InputStream is, Metadata userMetadata)
+            throws IOException, TikaEmitterException {
 
         if (!StringUtils.isBlank(prefix)) {
             path = prefix + "/" + path;
         }
 
-        if (! StringUtils.isBlank(fileExtension)) {
+        if (!StringUtils.isBlank(fileExtension)) {
             path += "." + fileExtension;
         }
 
-        LOGGER.debug("about to emit to target bucket: ({}) path:({})",
-                bucket, path);
+        LOGGER.debug("about to emit to target bucket: ({}) path:({})", bucket, path);
         long length = -1;
         if (is instanceof TikaInputStream) {
-            if (((TikaInputStream)is).hasFile()) {
+            if (((TikaInputStream) is).hasFile()) {
                 try {
                     length = ((TikaInputStream) is).getLength();
                 } catch (IOException e) {
@@ -183,7 +185,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
                 String[] vals = userMetadata.getValues(n);
                 if (vals.length > 1) {
                     LOGGER.warn("Can only write the first value for key {}. I see {} values.",
-                            n, vals.length);
+                            n,
+                            vals.length);
                 }
                 objectMetadata.addUserMetadata(n, vals[0]);
             }
@@ -197,6 +200,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
      * Whether or not to spool the metadatalist to a tmp file before putting object.
      * Default: <code>true</code>.  If this is set to <code>false</code>,
      * this emitter writes the json object to memory and then puts that into s3.
+     *
      * @param spoolToTemp
      */
     @Field
@@ -223,7 +227,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     public void setPrefix(String prefix) {
         //strip final "/" if it exists
         if (prefix.endsWith("/")) {
-            this.prefix = prefix.substring(0, prefix.length()-1);
+            this.prefix = prefix.substring(0, prefix.length() - 1);
         } else {
             this.prefix = prefix;
         }
@@ -231,8 +235,9 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
 
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
-        if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
-            throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+        if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+            throw new IllegalArgumentException(
+                    "credentialsProvider must be either 'profile' or instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -240,6 +245,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     /**
      * If you want to customize the output file's file extension.
      * Do not include the "."
+     *
      * @param fileExtension
      */
     @Field
@@ -248,10 +254,10 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     }
 
 
-
     /**
      * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
      * e.g. AmazonClientException in a TikaConfigException.
+     *
      * @param params params to use for initialization
      * @throws TikaConfigException
      */
@@ -261,7 +267,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
         AWSCredentialsProvider provider = null;
         if ("instance".equals(credentialsProvider)) {
             provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if ("profile".equals(credentialsProvider)){
+        } else if ("profile".equals(credentialsProvider)) {
             provider = new ProfileCredentialsProvider(profile);
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
@@ -269,9 +275,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
         }
 
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withRegion(region)
-                    .withCredentials(provider)
+            s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 emitter", e);
@@ -279,7 +283,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
         mustNotBeEmpty("bucket", this.bucket);
         mustNotBeEmpty("region", this.region);
     }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index e36d1c8..a831fa9 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-emitters</artifactId>
@@ -102,15 +102,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 7fb7f1b..f43963e 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,9 +16,23 @@
  */
 package org.apache.tika.pipes.emitter.solr;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.zip.GZIPOutputStream;
+
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.client.HttpClientFactory;
 import org.apache.tika.client.HttpClientUtil;
 import org.apache.tika.client.TikaClientException;
@@ -26,39 +40,19 @@ import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.zip.GZIPOutputStream;
 
 public class SolrEmitter extends AbstractEmitter implements Initializable {
 
-    enum AttachmentStrategy {
-        SKIP,
-        CONCATENATE_CONTENT,
-        PARENT_CHILD,
-        //anything else?
-    }
     private static final String ATTACHMENTS = "attachments";
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class);
     //one day this will be allowed or can be configured?
     private final boolean gzipJson = false;
-
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
     private String url;
     private String contentField = "content";
@@ -66,55 +60,48 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     private int commitWithin = 100;
     private HttpClientFactory httpClientFactory;
     private HttpClient httpClient;
-
     public SolrEmitter() throws TikaConfigException {
         httpClientFactory = new HttpClientFactory();
     }
+
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException,
-            TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
 
         if (metadataList == null || metadataList.size() == 0) {
             LOG.warn("metadataList is null or empty");
             return;
         }
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Writer writer = gzipJson ?
-                new BufferedWriter(
-                        new OutputStreamWriter(
-                                new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
-                new BufferedWriter(
-                        new OutputStreamWriter(bos, StandardCharsets.UTF_8));
-        try (
-            JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+        Writer writer = gzipJson ? new BufferedWriter(
+                new OutputStreamWriter(new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+                new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartArray();
             jsonify(jsonGenerator, emitKey, metadataList);
             jsonGenerator.writeEndArray();
         }
-        LOG.debug("emitting json ({})",
-                new String(bos.toByteArray(), StandardCharsets.UTF_8));
+        LOG.debug("emitting json ({})", new String(bos.toByteArray(), StandardCharsets.UTF_8));
         try {
-            HttpClientUtil.postJson(httpClient,
-                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), bos.toByteArray(), gzipJson);
+            HttpClientUtil
+                    .postJson(httpClient, url + UPDATE_PATH +
+                                    "?commitWithin=" + getCommitWithin(),
+                            bos.toByteArray(), gzipJson);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("can't post", e);
         }
     }
 
     @Override
-    public void emit(List<? extends EmitData> batch) throws IOException,
-            TikaEmitterException {
+    public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
         if (batch == null || batch.size() == 0) {
             LOG.warn("batch is null or empty");
             return;
         }
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Writer writer = gzipJson ?
-                new BufferedWriter(
-                        new OutputStreamWriter(
-                                new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
-            new BufferedWriter(
-                new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+        Writer writer = gzipJson ? new BufferedWriter(
+                new OutputStreamWriter(new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+                new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8));
         try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartArray();
             for (EmitData d : batch) {
@@ -122,21 +109,22 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             }
             jsonGenerator.writeEndArray();
         }
-        LOG.debug("emitting json ({})",
-                new String(bos.toByteArray(), StandardCharsets.UTF_8));
+        LOG.debug("emitting json ({})", new String(bos.toByteArray(), StandardCharsets.UTF_8));
         try {
-            HttpClientUtil.postJson(httpClient,
-                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(),
-                    bos.toByteArray(), gzipJson);
+            HttpClientUtil
+                    .postJson(httpClient, url + UPDATE_PATH +
+                                    "?commitWithin=" + getCommitWithin(),
+                            bos.toByteArray(), gzipJson);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("can't post", e);
         }
     }
 
-    private void jsonify(JsonGenerator jsonGenerator, String emitKey, List<Metadata> metadataList) throws IOException {
+    private void jsonify(JsonGenerator jsonGenerator, String emitKey,
+                         List<Metadata> metadataList)
+            throws IOException {
         metadataList.get(0).set(idField, emitKey);
-        if (attachmentStrategy == AttachmentStrategy.SKIP ||
-            metadataList.size() == 1) {
+        if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
             jsonify(metadataList.get(0), jsonGenerator);
         } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
             //this only handles text for now, not xhtml
@@ -162,12 +150,13 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             jsonGenerator.writeEndArray();
             jsonGenerator.writeEndObject();
         } else {
-            throw new IllegalArgumentException("I don't yet support this attachment strategy: "
-                    + attachmentStrategy);
+            throw new IllegalArgumentException(
+                    "I don't yet support this attachment strategy: " + attachmentStrategy);
         }
     }
 
-    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject) throws IOException {
+    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject)
+            throws IOException {
         jsonGenerator.writeStartObject();
         for (String n : metadata.names()) {
             String[] vals = metadata.getValues(n);
@@ -213,29 +202,35 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         } else if (attachmentStrategy.equals("parent-child")) {
             this.attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
         } else {
-            throw new IllegalArgumentException("Expected 'skip', 'concatenate-content' or "+
+            throw new IllegalArgumentException("Expected 'skip', 'concatenate-content' or " +
                     "'parent-child'. I regret I do not recognize: " + attachmentStrategy);
         }
     }
 
     /**
      * Specify the url for Solr
+     *
      * @param url
      */
     @Field
     public void setUrl(String url) {
         if (url.endsWith("/")) {
-            url = url.substring(0, url.length()-1);
+            url = url.substring(0, url.length() - 1);
         }
         this.url = url;
     }
 
+    public String getContentField() {
+        return contentField;
+    }
+
     /**
      * This is the field _after_ metadata mappings have been applied
      * that contains the "content" for each metadata object.
-     *
+     * <p>
      * This is the field that is used if {@link #attachmentStrategy}
      * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
+     *
      * @param contentField
      */
     @Field
@@ -243,8 +238,8 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         this.contentField = contentField;
     }
 
-    public String getContentField() {
-        return contentField;
+    public int getCommitWithin() {
+        return commitWithin;
     }
 
     @Field
@@ -252,10 +247,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         this.commitWithin = commitWithin;
     }
 
-    public int getCommitWithin() {
-        return commitWithin;
-    }
-
     /**
      * Specify the field in the first Metadata that should be
      * used as the id field for the document.
@@ -300,8 +291,14 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
 
     }
 
+    enum AttachmentStrategy {
+        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+        //anything else?
+    }
+
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 069e7a2..049fe96 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -17,19 +17,20 @@
 package org.apache.tika.pipes.emitter.solr;
 
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.filter.MetadataFilter;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
 
 @Ignore("requires solr to be up and running")
 public class TestBasic {
@@ -39,8 +40,7 @@ public class TestBasic {
         TikaConfig tikaConfig = new TikaConfig(
                 TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
         Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr1");
-        List<Metadata> metadataList = getParentChild(tikaConfig,
-                "id1", 2);
+        List<Metadata> metadataList = getParentChild(tikaConfig, "id1", 2);
 
         emitter.emit("1", metadataList);
     }
@@ -52,17 +52,15 @@ public class TestBasic {
         Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr2");
         List<EmitData> emitData = new ArrayList<>();
         for (int i = 0; i < 100; i++) {
-            List<Metadata> metadataList = getParentChild(tikaConfig,
-                    "batch_"+i, 4);
-            emitData.add(new EmitData(
-                    new EmitKey(emitter.getName(),  "batch_"+i),
-                    metadataList));
+            List<Metadata> metadataList = getParentChild(tikaConfig, "batch_" + i, 4);
+            emitData.add(new EmitData(new EmitKey(emitter.getName(),
+                    "batch_" + i), metadataList));
         }
         emitter.emit(emitData);
     }
 
-    private List<Metadata> getParentChild(TikaConfig tikaConfig,
-                                          String id, int numChildren) throws TikaException {
+    private List<Metadata> getParentChild(TikaConfig tikaConfig, String id, int numChildren)
+            throws TikaException {
         List<Metadata> metadataList = new ArrayList<>();
         MetadataFilter filter = tikaConfig.getMetadataFilter();
 
@@ -75,7 +73,7 @@ public class TestBasic {
         m1.add(TikaCoreProperties.CREATOR, "secondAuthor");
         filter.filter(m1);
         metadataList.add(m1);
-        for (int i = 1; i < numChildren; i++ ) {
+        for (int i = 1; i < numChildren; i++) {
             Metadata m2 = new Metadata();
             m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH, "/path_to_this.txt");
             m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy " + i);
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
index 92b6d56..11e5887 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=debug,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
-log4j.appender.stderr.layout.ConversionPattern= %-5p %m%n
+log4j.appender.stderr.layout.ConversionPattern=%-5p %m%n
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 7cb0c02..3063124 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -16,8 +16,24 @@
  */
 package org.apache.tika.pipes.fetchiterator.csv;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -29,21 +45,6 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 /**
  * Iterates through a UTF-8 CSV file. This adds all columns
@@ -51,17 +52,20 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  * to the metadata object.
  * <p>
  *  <ul>
- *      <li>If a 'fetchKeyColumn' is specified, this will use that column's value as the fetchKey.</li>
- *      <li>If no 'fetchKeyColumn' is specified, this will send the metadata from the other columns.</li>
+ *      <li>If a 'fetchKeyColumn' is specified, this will use that
+ *      column's value as the fetchKey.</li>
+ *      <li>If no 'fetchKeyColumn' is specified, this will send the
+ *      metadata from the other columns.</li>
  *      <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
  *  </ul>
  * <p>
  *  <ul>
- *      <li>If an 'emitKeyColumn' is specified, this will use that column's value as the emit key.</li>
- *      <li>If an 'emitKeyColumn' is not specified, this will use the value from the 'fetchKeyColumn'.</li>
+ *      <li>If an 'emitKeyColumn' is specified, this will use that
+ *      column's value as the emit key.</li>
+ *      <li>If an 'emitKeyColumn' is not specified, this will use
+ *      the value from the 'fetchKeyColumn'.</li>
  *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
  *  </ul>
- *
  */
 public class CSVFetchIterator extends FetchIterator implements Initializable {
 
@@ -110,24 +114,21 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             for (CSVRecord record : records) {
                 String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
                 String emitKey = getEmitKey(fetchEmitKeyIndices, record);
-                if (StringUtils.isBlank(fetchKey) && ! StringUtils.isBlank(fetcherName)) {
+                if (StringUtils.isBlank(fetchKey) && !StringUtils.isBlank(fetcherName)) {
                     LOGGER.debug("Fetcher specified ({}), but no fetchkey was found in ({})",
                             fetcherName, record);
                 }
                 if (StringUtils.isBlank(emitKey)) {
-                    throw new IOException("emitKey must not be blank in :"+record);
+                    throw new IOException("emitKey must not be blank in :" + record);
                 }
                 Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, record);
-                tryToAdd(new FetchEmitTuple(
-                        new FetchKey(fetcherName, fetchKey),
-                        new EmitKey(emitterName, emitKey), metadata,
-                        getOnParseException()));
+                tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+                        new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
             }
         }
     }
 
-    private void checkFetchEmitValidity(String fetcherName,
-                                        String emitterName,
+    private void checkFetchEmitValidity(String fetcherName, String emitterName,
                                         FetchEmitKeyIndices fetchEmitKeyIndices,
                                         List<String> headers) throws IOException {
 
@@ -135,7 +136,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             throw new IOException(new TikaConfigException("must specify at least an emitterName"));
         }
 
-        if (StringUtils.isBlank(fetcherName) && ! StringUtils.isBlank(fetchKeyColumn)) {
+        if (StringUtils.isBlank(fetcherName) && !StringUtils.isBlank(fetchKeyColumn)) {
             throw new IOException(new TikaConfigException("If specifying a 'fetchKeyColumn', " +
                     "you must also specify a 'fetcherName'"));
         }
@@ -145,17 +146,17 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
         }
 
         //if a fetchkeycolumn is specified, make sure that it was found
-        if (! StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find fetchKeyColumn ("+
-                    fetchKeyColumn+" in header.\n" +
-                    "These are the headers I see: " + headers));
+        if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
+            throw new IOException(new TikaConfigException(
+                    "Couldn't find fetchKeyColumn (" + fetchKeyColumn + " in header.\n" +
+                            "These are the headers I see: " + headers));
         }
 
         //if an emitkeycolumn is specified, make sure that it was found
-        if (! StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find emitKeyColumn ("+
-                    emitKeyColumn+" in header.\n" +
-                    "These are the headers I see: " + headers));
+        if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
+            throw new IOException(new TikaConfigException(
+                    "Couldn't find emitKeyColumn (" + emitKeyColumn + " in header.\n" +
+                            "These are the headers I see: " + headers));
         }
 
         if (StringUtils.isBlank(emitKeyColumn)) {
@@ -179,7 +180,8 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
         return getFetchKey(fetchEmitKeyIndices, record);
     }
 
-    private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers, CSVRecord record) {
+    private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers,
+                                  CSVRecord record) {
         Metadata metadata = new Metadata();
         for (int i = 0; i < record.size(); i++) {
             if (fetchEmitKeyIndices.shouldSkip(i)) {
@@ -192,7 +194,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
 
 
     private FetchEmitKeyIndices loadHeaders(CSVRecord record, List<String> headers)
-        throws IOException {
+            throws IOException {
         int fetchKeyColumnIndex = -1;
         int emitKeyColumnIndex = -1;
 
@@ -200,7 +202,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             String header = record.get(col);
             if (StringUtils.isBlank(header)) {
                 throw new IOException(
-                        new TikaException("Header in column (" +col +") must not be empty"));
+                        new TikaException("Header in column (" + col + ") must not be empty"));
             }
             headers.add(header);
             if (header.equals(fetchKeyColumn)) {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 55ae655..fd86a05 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
-import org.junit.Test;
+
+import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
+import static org.junit.Assert.assertEquals;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -25,15 +24,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
 
 public class TestCSVFetchIterator {
 
@@ -76,10 +76,8 @@ public class TestCSVFetchIterator {
         for (MockFetcher f : fetchers) {
             for (FetchEmitTuple t : f.pairs) {
                 String id = t.getMetadata().get("id");
-                assertEquals("path/to/my/file"+id,
-                        t.getFetchKey().getKey());
-                assertEquals("project"+
-                                (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
+                assertEquals("path/to/my/file" + id, t.getFetchKey().getFetchKey());
+                assertEquals("project" + (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
                         t.getMetadata().get("project"));
             }
         }
@@ -98,12 +96,13 @@ public class TestCSVFetchIterator {
     }
 
     private Path get(String testFileName) throws Exception {
-        return Paths.get(TestCSVFetchIterator.class.getResource("/"+testFileName).toURI());
+        return Paths.get(TestCSVFetchIterator.class.getResource("/" + testFileName).toURI());
     }
 
     private static class MockFetcher implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
         private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
index 2813ed4..702554a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.tika</groupId>
@@ -29,10 +29,10 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>tika-fetch-iterator-jdbc</artifactId>
-    
+
     <name>Apache Tika Fetch Iterator - jdbc</name>
     <url>http://tika.apache.org/</url>
-    
+
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
@@ -97,15 +97,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 112f52a..f1ca52c 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -16,19 +16,7 @@
  */
 package org.apache.tika.pipes.fetchiterator.jdbc;
 
-import org.apache.tika.config.Field;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -42,7 +30,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
 
 /**
  * Iterates through a the results from a sql call via jdbc. This adds all columns
@@ -50,8 +51,10 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  * to the metadata object.
  * <p>
  *  <ul>
- *      <li>If a 'fetchKeyColumn' is specified, this will use that column's value as the fetchKey.</li>
- *      <li>If no 'fetchKeyColumn' is specified, this will send the metadata from the other columns.</li>
+ *      <li>If a 'fetchKeyColumn' is specified, this will use that
+ *      column's value as the fetchKey.</li>
+ *      <li>If no 'fetchKeyColumn' is specified, this will send the
+ *      metadata from the other columns.</li>
  *      <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
  *  </ul>
  * <p>
@@ -59,7 +62,6 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *      <li>An 'emitKeyColumn' must be specified</li>
  *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
  *  </ul>
- *
  */
 public class JDBCFetchIterator extends FetchIterator implements Initializable {
 
@@ -88,15 +90,15 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         this.connection = connection;
     }
 
+    public String getSelect() {
+        return select;
+    }
+
     @Field
     public void setSelect(String select) {
         this.select = select;
     }
 
-    public String getSelect() {
-        return select;
-    }
-
     @Override
     protected void enqueue() throws InterruptedException, IOException, TimeoutException {
         String fetcherName = getFetcherName();
@@ -110,16 +112,17 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
                 while (rs.next()) {
                     if (headers.size() == 0) {
                         fetchEmitKeyIndices = loadHeaders(rs.getMetaData(), headers);
-                        checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices, headers);
+                        checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices,
+                                headers);
                     }
                     try {
                         processRow(fetcherName, emitterName, headers, fetchEmitKeyIndices, rs);
                     } catch (SQLException e) {
-                        LOGGER.warn("Failed to insert: "+rs, e);
+                        LOGGER.warn("Failed to insert: " + rs, e);
                     }
                     rowCount++;
                     if (rowCount % 1000 == 0) {
-                        LOGGER.info("added "+rowCount + " rows to the queue");
+                        LOGGER.info("added " + rowCount + " rows to the queue");
                     }
                 }
             }
@@ -129,21 +132,23 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         } finally {
             try {
                 db.close();
-            } catch (SQLException e){
+            } catch (SQLException e) {
                 LOGGER.warn("failed to close connection", e);
             }
         }
     }
-    private void checkFetchEmitValidity(String fetcherName,
-                                        String emitterName,
+
+    private void checkFetchEmitValidity(String fetcherName, String emitterName,
                                         FetchEmitKeyIndices fetchEmitKeyIndices,
                                         List<String> headers) throws IOException {
 
-        if (! StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find column: "+fetchKeyColumn));
+        if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
+            throw new IOException(
+                    new TikaConfigException("Couldn't find column: " + fetchKeyColumn));
         }
-        if (! StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find column: "+emitKeyColumn));
+        if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
+            throw new IOException(
+                    new TikaConfigException("Couldn't find column: " + emitKeyColumn));
         }
     }
 
@@ -165,7 +170,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
             if (i == fetchEmitKeyIndices.emitKeyIndex) {
                 emitKey = getString(i, rs);
                 if (emitKey == null) {
-                    LOGGER.debug("emitKey is empty for record "+toString(rs));
+                    LOGGER.debug("emitKey is empty for record " + toString(rs));
                 }
                 emitKey = (emitKey == null) ? "" : emitKey;
                 continue;
@@ -176,10 +181,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
             }
         }
 
-        tryToAdd(new FetchEmitTuple(
-                new FetchKey(fetcherName, fetchKey),
-                new EmitKey(emitterName, emitKey),
-                metadata, getOnParseException()));
+        tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+                new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
     }
 
     private String toString(ResultSet rs) throws SQLException {
@@ -188,7 +191,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
             String val = rs.getString(i);
             val = (val == null) ? "" : val;
             val = (val.length() > 100) ? val.substring(0, 100) : val;
-            sb.append(rs.getMetaData().getColumnLabel(i)+":"+val+"\n");
+            sb.append(rs.getMetaData().getColumnLabel(i) + ":" + val + "\n");
         }
         return sb.toString();
     }
@@ -203,7 +206,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
     }
 
 
-    private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, List<String> headers) throws SQLException {
+    private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, List<String> headers)
+            throws SQLException {
         int fetchKeyIndex = -1;
         int emitKeyIndex = -1;
         for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -236,7 +240,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         mustNotBeEmpty("emitterName", this.getEmitterName());
         mustNotBeEmpty("emitKeyColumn", this.emitKeyColumn);
 
-        if (StringUtils.isBlank(getFetcherName()) && ! StringUtils.isBlank(fetchKeyColumn)) {
+        if (StringUtils.isBlank(getFetcherName()) && !StringUtils.isBlank(fetchKeyColumn)) {
             throw new TikaConfigException(
                     "If you specify a 'fetchKeyColumn', you must specify a 'fetcherName'");
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index 9a92fc5..7eebe31 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -16,13 +16,9 @@
  */
 package org.apache.tika.pipes.fetchiterator.jdbc;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -44,35 +40,41 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class TestJDBCFetchIterator {
 
     static final String TABLE = "fetchkeys";
-    static Connection CONNECTION;
-    static Path DB_DIR;
     static final String db = "mydb";
     private static final int NUM_ROWS = 1000;
+    static Connection CONNECTION;
+    static Path DB_DIR;
 
     @BeforeClass
     public static void setUp() throws Exception {
         DB_DIR = Files.createTempDirectory("tika-jdbc-fetchiterator-test-");
 
-        CONNECTION = DriverManager.getConnection("jdbc:h2:file:"+DB_DIR.toAbsolutePath()+"/"+db);
-        String sql = "create table "+TABLE +
-                " (id varchar(128), " +
+        CONNECTION =
+                DriverManager.getConnection("jdbc:h2:file:" +
+                        DB_DIR.toAbsolutePath() + "/" + db);
+        String sql = "create table " + TABLE + " (id varchar(128), " +
                 "project varchar(128), " +
                 "fetchKey varchar(128))";
         CONNECTION.createStatement().execute(sql);
 
         for (int i = 0; i < NUM_ROWS; i++) {
-            sql = "insert into "+TABLE + " (id, project, fetchKey) values ('id"+i+"','project"+
-                    (i%2 == 0 ? "a" : "b") +"','fk"+i+"')";
+            sql = "insert into " + TABLE + " (id, project, fetchKey) values ('id" + i +
+                    "','project" + (i % 2 == 0 ? "a" : "b") + "','fk" + i + "')";
             CONNECTION.createStatement().execute(sql);
         }
-        sql = "select count(1) from "+ TABLE;
+        sql = "select count(1) from " + TABLE;
         ResultSet rs = CONNECTION.createStatement().executeQuery(sql);
         while (rs.next()) {
             int cnt = rs.getInt(1);
@@ -90,7 +92,7 @@ public class TestJDBCFetchIterator {
         TikaConfig tk = getConfig();
         int numConsumers = 5;
         FetchIterator fetchIterator = tk.getFetchIterator();
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService =
                 new ExecutorCompletionService<>(es);
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
@@ -118,16 +120,16 @@ public class TestJDBCFetchIterator {
         Matcher m = Pattern.compile("fk(\\d+)").matcher("");
         for (MockFetcher f : fetchers) {
             for (FetchEmitTuple p : f.pairs) {
-                String k = p.getFetchKey().getKey();
+                String k = p.getFetchKey().getFetchKey();
                 String num = "";
                 if (m.reset(k).find()) {
                     num = m.group(1);
                 } else {
-                    fail("failed to find key pattern: "+k);
+                    fail("failed to find key pattern: " + k);
                 }
                 String aOrB = Integer.parseInt(num) % 2 == 0 ? "a" : "b";
-                assertEquals("id"+num, p.getMetadata().get("MY_ID"));
-                assertEquals("project"+aOrB, p.getMetadata().get("MY_PROJECT"));
+                assertEquals("id" + num, p.getMetadata().get("MY_ID"));
+                assertEquals("project" + aOrB, p.getMetadata().get("MY_PROJECT"));
                 assertNull(p.getMetadata().get("fetchKey"));
                 assertNull(p.getMetadata().get("MY_FETCHKEY"));
                 cnt++;
@@ -139,17 +141,22 @@ public class TestJDBCFetchIterator {
     private TikaConfig getConfig() throws Exception {
         String config = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?><properties>\n" +
                 "    <fetchIterators>\n" +
-                "        <fetchIterator class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
+                "        <fetchIterator " +
+                "       class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
                 "            <params>\n" +
                 "                <param name=\"fetcherName\" type=\"string\">s3f</param>\n" +
                 "                <param name=\"emitterName\" type=\"string\">s3e</param>\n" +
                 "                <param name=\"queueSize\" type=\"int\">57</param>\n" +
-                "                <param name=\"fetchKeyColumn\" type=\"string\">my_fetchkey</param>\n" +
-                "                <param name=\"emitKeyColumn\" type=\"string\">my_fetchkey</param>\n" +
+                "                <param name=\"fetchKeyColumn\" " +
+                "                     type=\"string\">my_fetchkey</param>\n" +
+                "                <param name=\"emitKeyColumn\" " +
+                "                    type=\"string\">my_fetchkey</param>\n" +
                 "                <param name=\"select\" type=\"string\">" +
-                "select id as my_id, project as my_project, fetchKey as my_fetchKey from fetchkeys</param>\n" +
-                "                <param name=\"connection\" type=\"string\">jdbc:h2:file:"+
-                DB_DIR.toAbsolutePath()+"/"+db +"</param>\n" +
+                "select id as my_id, project as my_project, fetchKey as my_fetchKey " +
+                "from fetchkeys</param>\n" +
+                "                <param name=\"connection\" " +
+                "                type=\"string\">jdbc:h2:file:" + DB_DIR.toAbsolutePath() + "/" +
+                    db + "</param>\n" +
                 "            </params>\n" +
                 "        </fetchIterator>\n" +
                 "    </fetchIterators>\n" +
@@ -160,6 +167,7 @@ public class TestJDBCFetchIterator {
     private static class MockFetcher implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
         private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=info,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
index ef8854d..24be277 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.tika</groupId>
@@ -133,15 +133,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index b8897a8..988f149 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -16,6 +16,12 @@
  */
 package org.apache.tika.pipes.fetchiterator.s3;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -24,6 +30,9 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.iterable.S3Objects;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -34,14 +43,6 @@ import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 public class S3FetchIterator extends FetchIterator implements Initializable {
 
@@ -76,8 +77,9 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
 
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
-        if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
-            throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+        if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+            throw new IllegalArgumentException(
+                    "credentialsProvider must be either 'profile' or instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -85,6 +87,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
     /**
      * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
      * e.g. AmazonClientException in a TikaConfigException.
+     *
      * @param params params to use for initialization
      * @throws TikaConfigException
      */
@@ -94,7 +97,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         AWSCredentialsProvider provider = null;
         if ("instance".equals(credentialsProvider)) {
             provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if ("profile".equals(credentialsProvider)){
+        } else if ("profile".equals(credentialsProvider)) {
             provider = new ProfileCredentialsProvider(profile);
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
@@ -102,9 +105,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         }
 
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withRegion(region)
-                    .withCredentials(provider)
+            s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 fetchiterator");
@@ -128,12 +129,10 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) {
 
             long elapsed = System.currentTimeMillis() - start;
-            LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
-                    elapsed);
-            tryToAdd(new FetchEmitTuple(
-                    new FetchKey(fetcherName, summary.getKey()),
-                    new EmitKey(emitterName, summary.getKey()),
-                    new Metadata(), getOnParseException()));
+            LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(), elapsed);
+            tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, summary.getKey()),
+                    new EmitKey(emitterName, summary.getKey()), new Metadata(),
+                    getOnParseException()));
             count++;
         }
         long elapsed = System.currentTimeMillis() - start;
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 567920d..50cb819 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 package org.apache.tika.pipes.fetchiterator.s3;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Ignore;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,7 +29,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 @Ignore("turn into an actual unit test")
 public class TestS3FetchIterator {
@@ -48,7 +50,7 @@ public class TestS3FetchIterator {
         int numConsumers = 6;
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
 
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService c = new ExecutorCompletionService(es);
         List<MockFetcher> fetchers = new ArrayList<>();
         for (int i = 0; i < numConsumers; i++) {
@@ -65,7 +67,7 @@ public class TestS3FetchIterator {
         int finished = 0;
         int completed = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers + 1) {
                 Future<Integer> f = c.take();
                 completed += f.get();
             }
@@ -79,6 +81,7 @@ public class TestS3FetchIterator {
     private static class MockFetcher implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
         private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=info,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
index 09dcb97..ae76a52 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-fetchers</artifactId>
@@ -97,15 +97,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index a6d2325..be39765 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -17,11 +17,19 @@
 package org.apache.tika.pipes.fetcher.http;
 
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.client.HttpClientFactory;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
@@ -32,13 +40,6 @@ import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
 
 /**
  * Based on Apache httpclient
@@ -52,9 +53,9 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
     public HttpFetcher() throws TikaConfigException {
         httpClientFactory = new HttpClientFactory();
     }
+
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata)
-            throws IOException, TikaException {
+    public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
         HttpGet get = new HttpGet(fetchKey);
         return get(get);
     }
@@ -62,7 +63,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
     public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
             throws IOException, TikaException {
         HttpGet get = new HttpGet(fetchKey);
-        get.setHeader("Range", "bytes="+startRange+"-"+endRange);
+        get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
         return get(get);
     }
 
@@ -70,21 +71,18 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
         HttpResponse response = httpClient.execute(get);
         int code = response.getStatusLine().getStatusCode();
         if (code < 200 || code > 299) {
-            throw new IOException("bad status code: "+
-                    code
-                    + " :: " +
+            throw new IOException("bad status code: " + code + " :: " +
                     responseToString(response.getEntity().getContent()));
         }
 
         //spool to local
         long start = System.currentTimeMillis();
-        TikaInputStream tis = TikaInputStream.get(
-                response.getEntity().getContent());
+        TikaInputStream tis = TikaInputStream.get(response.getEntity().getContent());
         tis.getPath();
         if (response instanceof CloseableHttpResponse) {
             ((CloseableHttpResponse) response).close();
         }
-        long elapsed = System.currentTimeMillis()-start;
+        long elapsed = System.currentTimeMillis() - start;
         LOG.debug("took {} ms to copy to local tmp file", elapsed);
         return tis;
     }
@@ -149,7 +147,8 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
 
     }
 }
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
index 8a2d8a1..3a2e127 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
@@ -16,13 +16,7 @@
  */
 package org.apache.tika.pipes.fetcher.http;
 
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TemporaryResources;
-import org.apache.tika.metadata.Metadata;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.xml.sax.SAXException;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,32 +25,40 @@ import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.util.zip.GZIPInputStream;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.Metadata;
 
 @Ignore("requires network connectivity")
 public class HttpFetcherTest {
 
-        @Test
-        public void testRange() throws Exception {
-            String url =
-                    "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
-            long start = 969596307;
-            long end = start + 1408 - 1;
-            Metadata metadata = new Metadata();
-            HttpFetcher httpFetcher = (HttpFetcher) getConfig("tika-config-http.xml")
-                    .getFetcherManager().getFetcher("http");
-            try (TemporaryResources tmp = new TemporaryResources()) {
-                Path tmpPath = tmp.createTempFile();
-                try (InputStream is = httpFetcher.fetch(url, start, end, metadata)) {
-                    Files.copy(new GZIPInputStream(is), tmpPath, StandardCopyOption.REPLACE_EXISTING);
-                }
-                assertEquals(2461, Files.size(tmpPath));
+    @Test
+    public void testRange() throws Exception {
+        String url =
+                "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
+        long start = 969596307;
+        long end = start + 1408 - 1;
+        Metadata metadata = new Metadata();
+        HttpFetcher httpFetcher =
+                (HttpFetcher) getConfig("tika-config-http.xml").getFetcherManager()
+                        .getFetcher("http");
+        try (TemporaryResources tmp = new TemporaryResources()) {
+            Path tmpPath = tmp.createTempFile();
+            try (InputStream is = httpFetcher.fetch(url, start, end, metadata)) {
+                Files.copy(new GZIPInputStream(is), tmpPath, StandardCopyOption.REPLACE_EXISTING);
             }
+            assertEquals(2461, Files.size(tmpPath));
         }
+    }
 
 
     TikaConfig getConfig(String path) throws TikaException, IOException, SAXException {
-            return new TikaConfig(HttpFetcherTest.class.getResourceAsStream("/"+path));
+        return new TikaConfig(HttpFetcherTest.class.getResourceAsStream("/" + path));
     }
 
 
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 506a040..a6b34dd 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -16,6 +16,13 @@
  */
 package org.apache.tika.pipes.fetcher.s3;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.regex.Pattern;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -24,24 +31,18 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.fetcher.AbstractFetcher;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
 
 /**
  * Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
@@ -61,27 +62,23 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     private boolean spoolToTemp = true;
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata)
-            throws TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
 
-        LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
-                fetchKey, bucket);
+        LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);
 
         try {
             S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
             if (extractUserMetadata) {
-                for (Map.Entry<String, String> e :
-                        s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+                for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
+                        .entrySet()) {
                     metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
                 }
             }
             if (!spoolToTemp) {
-                return TikaInputStream.get(
-                        s3Object.getObjectContent());
+                return TikaInputStream.get(s3Object.getObjectContent());
             } else {
                 long start = System.currentTimeMillis();
-                TikaInputStream tis = TikaInputStream.get(
-                        s3Object.getObjectContent());
+                TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
                 tis.getPath();
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
@@ -95,26 +92,24 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
             throws TikaException, IOException {
         //TODO -- figure out how to integrate this
-        LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
-                fetchKey, startRange, endRange, bucket);
+        LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})", fetchKey,
+                startRange, endRange, bucket);
 
         try {
-            S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
-                    .withRange(startRange, endRange));
+            S3Object s3Object = s3Client.getObject(
+                    new GetObjectRequest(bucket, fetchKey).withRange(startRange, endRange));
 
             if (extractUserMetadata) {
-                for (Map.Entry<String, String> e :
-                        s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+                for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
+                        .entrySet()) {
                     metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
                 }
             }
             if (!spoolToTemp) {
-                return TikaInputStream.get(
-                        s3Object.getObjectContent());
+                return TikaInputStream.get(s3Object.getObjectContent());
             } else {
                 long start = System.currentTimeMillis();
-                TikaInputStream tis = TikaInputStream.get(
-                        s3Object.getObjectContent());
+                TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
                 tis.getPath();
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
@@ -157,8 +152,9 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
 
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
-        if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
-            throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+        if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+            throw new IllegalArgumentException(
+                    "credentialsProvider must be either 'profile' or instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -166,6 +162,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     /**
      * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
      * e.g. AmazonClientException in a TikaConfigException.
+     *
      * @param params params to use for initialization
      * @throws TikaConfigException
      */
@@ -175,7 +172,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         AWSCredentialsProvider provider = null;
         if ("instance".equals(credentialsProvider)) {
             provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if ("profile".equals(credentialsProvider)){
+        } else if ("profile".equals(credentialsProvider)) {
             provider = new ProfileCredentialsProvider(profile);
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
@@ -183,9 +180,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         }
 
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withRegion(region)
-                    .withCredentials(provider)
+            s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 fetcher", e);
@@ -193,7 +188,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
         mustNotBeEmpty("bucket", this.bucket);
         mustNotBeEmpty("region", this.region);
     }
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
index 450d933..9be1ab4 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
@@ -15,11 +15,6 @@
  * limitations under the License.
  */
 package org.apache.tika.pipes.fetcher.s3;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.metadata.Metadata;
-import org.junit.Ignore;
-import org.junit.Test;
 
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -28,12 +23,19 @@ import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Collections;
 
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
 @Ignore("write actual unit tests")
 public class TestS3Fetcher {
     private static final String FETCH_STRING = "";
-    private Path outputFile = Paths.get("");
-    private String region = "us-east-1";
-    private String profile = "";
+    private final Path outputFile = Paths.get("");
+    private final String region = "us-east-1";
+    private final String profile = "";
 
     @Test
     public void testBasic() throws Exception {
@@ -50,9 +52,8 @@ public class TestS3Fetcher {
 
     @Test
     public void testConfig() throws Exception {
-        TikaConfig config = new TikaConfig(
-                this.getClass().getResourceAsStream("/tika-config-s3.xml")
-        );
+        TikaConfig config =
+                new TikaConfig(this.getClass().getResourceAsStream("/tika-config-s3.xml"));
         Fetcher fetcher = config.getFetcherManager().getFetcher("s3");
         Metadata metadata = new Metadata();
         try (InputStream is = fetcher.fetch(FETCH_STRING, metadata)) {
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
index 8cbc26b..77619f9 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
@@ -16,6 +16,27 @@
  */
 package org.apache.tika.client;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.SecretKeySpec;
+import javax.net.ssl.SSLContext;
+
 import org.apache.http.Header;
 import org.apache.http.HeaderElement;
 import org.apache.http.HeaderElementIterator;
@@ -52,35 +73,15 @@ import org.apache.http.message.BasicHeaderElementIterator;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.ssl.SSLContexts;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.crypto.BadPaddingException;
-import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
-import javax.crypto.NoSuchPaddingException;
-import javax.crypto.spec.SecretKeySpec;
-import javax.net.ssl.SSLContext;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.security.InvalidKeyException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.utils.StringUtils;
 
 /**
  * This holds quite a bit of state and is not thread safe.  Beware!
- *
+ * <p>
  * Also, we're currently ignoring the SSL checks.  Please open a ticket/PR
  * if you need robust SSL.
  */
@@ -104,7 +105,7 @@ public class HttpClientFactory {
     private String password;
     private String ntDomain;//if using nt credentials
     private String authScheme = "basic"; //ntlm or basic
-    private boolean credentialsAESEncrypted = false;
+    private final boolean credentialsAESEncrypted = false;
 
 
     public HttpClientFactory() throws TikaConfigException {
@@ -116,6 +117,7 @@ public class HttpClientFactory {
             aes = new AES();
         }
     }
+
     public String getProxyHost() {
         return proxyHost;
     }
@@ -218,6 +220,7 @@ public class HttpClientFactory {
 
     /**
      * only basic and ntlm are supported
+     *
      * @param authScheme
      */
     public void setAuthScheme(String authScheme) {
@@ -230,19 +233,18 @@ public class HttpClientFactory {
         TrustStrategy acceptingTrustStrategy = (cert, authType) -> true;
         SSLContext sslContext = null;
         try {
-            sslContext = SSLContexts.custom().loadTrustMaterial(null,
-                    acceptingTrustStrategy).build();
+            sslContext =
+                    SSLContexts.custom().loadTrustMaterial(
+                            null, acceptingTrustStrategy).build();
         } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
             throw new TikaConfigException("", e);
         }
-        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
-                NoopHostnameVerifier.INSTANCE);
+        SSLConnectionSocketFactory sslsf =
+                new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
 
         Registry<ConnectionSocketFactory> socketFactoryRegistry =
-                RegistryBuilder.<ConnectionSocketFactory>create()
-                        .register("https", sslsf)
-                        .register("http", new PlainConnectionSocketFactory())
-                        .build();
+                RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf)
+                        .register("http", new PlainConnectionSocketFactory()).build();
 
         PoolingHttpClientConnectionManager manager =
                 new PoolingHttpClientConnectionManager(socketFactoryRegistry);
@@ -253,19 +255,13 @@ public class HttpClientFactory {
         addCredentialsProvider(builder);
         addProxy(builder);
         return builder.setConnectionManager(manager)
-                .setRedirectStrategy(
-                        new CustomRedirectStrategy(allowedHostsForRedirect))
-                .setDefaultRequestConfig(RequestConfig.custom()
-                        .setTargetPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC,
-                                AuthSchemes.NTLM))
-                        .setConnectionRequestTimeout((int) requestTimeout)
-                        .setConnectionRequestTimeout(connectTimeout)
-                        .setSocketTimeout(socketTimeout)
-                        .build()
-                )
-                .setKeepAliveStrategy(getKeepAliveStrategy())
-                .setSSLSocketFactory(sslsf)
-                .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+                .setRedirectStrategy(new CustomRedirectStrategy(allowedHostsForRedirect))
+                .setDefaultRequestConfig(RequestConfig.custom().setTargetPreferredAuthSchemes(
+                        Arrays.asList(AuthSchemes.BASIC, AuthSchemes.NTLM))
+                        .setConnectionRequestTimeout(requestTimeout)
+                        .setConnectionRequestTimeout(connectTimeout).setSocketTimeout(socketTimeout)
+                        .build()).setKeepAliveStrategy(getKeepAliveStrategy())
+                .setSSLSocketFactory(sslsf).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                 .build();
     }
 
@@ -285,33 +281,32 @@ public class HttpClientFactory {
 
         if ((StringUtils.isBlank(userName) && StringUtils.isBlank(password)) ||
                 (StringUtils.isBlank(password) && StringUtils.isBlank(userName))) {
-            throw new IllegalArgumentException("can't have one of 'username', " +
-                    "'password' null and the other not");
+            throw new IllegalArgumentException(
+                    "can't have one of 'username', " + "'password' null and the other not");
         }
 
         String finalUserName = decrypt(userName);
         String finalPassword = decrypt(password);
         String finalDomain = decrypt(ntDomain);
-            CredentialsProvider provider = new BasicCredentialsProvider();
-            Credentials credentials = null;
-            Registry<AuthSchemeProvider> authSchemeRegistry = null;
-            if (authScheme.equals("basic")) {
-                credentials = new UsernamePasswordCredentials(finalUserName, finalPassword);
-                authSchemeRegistry = RegistryBuilder
-                        .<AuthSchemeProvider>create()
-                        .register("basic", new BasicSchemeFactory())
-                        .build();
-            } else if (authScheme.equals("ntlm")) {
-                if (StringUtils.isBlank(ntDomain)) {
-                    throw new IllegalArgumentException("must specify 'ntDomain'");
-                }
-                credentials = new NTCredentials(finalUserName, finalPassword, null, finalDomain);
-                authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
-                        .register("ntlm", new NTLMSchemeFactory()).build();
+        CredentialsProvider provider = new BasicCredentialsProvider();
+        Credentials credentials = null;
+        Registry<AuthSchemeProvider> authSchemeRegistry = null;
+        if (authScheme.equals("basic")) {
+            credentials = new UsernamePasswordCredentials(finalUserName, finalPassword);
+            authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
+                    .register("basic", new BasicSchemeFactory()).build();
+        } else if (authScheme.equals("ntlm")) {
+            if (StringUtils.isBlank(ntDomain)) {
+                throw new IllegalArgumentException("must specify 'ntDomain'");
             }
-            provider.setCredentials(AuthScope.ANY, credentials);
-            builder.setDefaultCredentialsProvider(provider);
-            builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
+            credentials = new NTCredentials(finalUserName, finalPassword,
+                    null, finalDomain);
+            authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
+                    .register("ntlm", new NTLMSchemeFactory()).build();
+        }
+        provider.setCredentials(AuthScope.ANY, credentials);
+        builder.setDefaultCredentialsProvider(provider);
+        builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
 
     }
 
@@ -350,7 +345,7 @@ public class HttpClientFactory {
     private static class CustomRedirectStrategy extends LaxRedirectStrategy {
 
         private static final Logger LOG = LoggerFactory.getLogger(CustomRedirectStrategy.class);
-        private Set<String> allowedHosts;
+        private final Set<String> allowedHosts;
 
         public CustomRedirectStrategy(Set<String> allowedHosts) {
             this.allowedHosts = allowedHosts;
@@ -373,7 +368,9 @@ public class HttpClientFactory {
         }
 
         @Override
-        public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException {
+        public boolean isRedirected(HttpRequest request, HttpResponse response,
+                                    HttpContext context)
+                throws ProtocolException {
             boolean isRedirectedSuper = super.isRedirected(request, response, context);
             if (isRedirectedSuper) {
                 Header locationHeader = response.getFirstHeader("Location");
@@ -422,9 +419,10 @@ public class HttpClientFactory {
             try {
                 Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
-                return Base64.getEncoder()
-                        .encodeToString(cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8)));
-            } catch (NoSuchAlgorithmException|InvalidKeyException|NoSuchPaddingException|BadPaddingException|IllegalBlockSizeException e) {
+                return Base64.getEncoder().encodeToString(
+                        cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8)));
+            } catch (NoSuchAlgorithmException | InvalidKeyException |
+                    NoSuchPaddingException | BadPaddingException | IllegalBlockSizeException e) {
                 throw new TikaConfigException("bad encryption info", e);
             }
         }
@@ -435,11 +433,8 @@ public class HttpClientFactory {
                 cipher.init(Cipher.DECRYPT_MODE, secretKey);
                 return new String(cipher.doFinal(Base64.getDecoder().decode(strToDecrypt)),
                         StandardCharsets.UTF_8);
-            } catch (NoSuchAlgorithmException|
-                    InvalidKeyException|
-                    NoSuchPaddingException|
-                    BadPaddingException|
-                    IllegalBlockSizeException e) {
+            } catch (NoSuchAlgorithmException | InvalidKeyException |
+                    NoSuchPaddingException | BadPaddingException | IllegalBlockSizeException e) {
                 throw new TikaConfigException("bad encryption info", e);
             }
         }
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
index 11ebb3c..3de32da 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
@@ -16,20 +16,19 @@
  */
 package org.apache.tika.client;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
 public class HttpClientUtil {
 
-    public static boolean postJson(HttpClient client, String url, String json) throws IOException,
-            TikaClientException {
+    public static boolean postJson(HttpClient client, String url, String json)
+            throws IOException, TikaClientException {
         HttpPost post = new HttpPost(url);
         post.setHeader("Content-Encoding", "gzip");
         ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
@@ -41,9 +40,8 @@ public class HttpClientUtil {
 
         if (response.getStatusLine().getStatusCode() != 200) {
             String msg = EntityUtils.toString(response.getEntity());
-            throw new TikaClientException("Bad status: " +
-                    response.getStatusLine().getStatusCode() + " : "+
-                    msg);
+            throw new TikaClientException(
+                    "Bad status: " + response.getStatusLine().getStatusCode() + " : " + msg);
         } else {
             String msg = EntityUtils.toString(response.getEntity());
             System.out.println("httputil: " + msg);
@@ -51,9 +49,8 @@ public class HttpClientUtil {
         return true;
     }
 
-    public static boolean postJson(HttpClient client, String url,
-                                   byte[] bytes, boolean gzipped) throws IOException,
-            TikaClientException {
+    public static boolean postJson(HttpClient client, String url, byte[] bytes, boolean gzipped)
+            throws IOException, TikaClientException {
         HttpPost post = new HttpPost(url);
         if (gzipped) {
             post.setHeader("Content-Encoding", "gzip");
@@ -67,9 +64,8 @@ public class HttpClientUtil {
 
         if (response.getStatusLine().getStatusCode() != 200) {
             String msg = EntityUtils.toString(response.getEntity());
-            throw new TikaClientException("Bad status: " +
-                    response.getStatusLine().getStatusCode() + " : "+
-                    msg);
+            throw new TikaClientException(
+                    "Bad status: " + response.getStatusLine().getStatusCode() + " : " + msg);
         } else {
             String msg = EntityUtils.toString(response.getEntity());
             System.out.println("httputil: " + msg);
diff --git a/tika-pipes/tika-pipes-async/pom.xml b/tika-pipes/tika-pipes-async/pom.xml
index ded8a0a..5f4e418 100644
--- a/tika-pipes/tika-pipes-async/pom.xml
+++ b/tika-pipes/tika-pipes-async/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.tika</groupId>
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index 4860299..cb8347f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -1,16 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -29,6 +36,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
 public class AsyncCli {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
@@ -37,7 +54,7 @@ public class AsyncCli {
         Path configPath = Paths.get(args[0]);
         int maxConsumers = 20;
         AsyncCli asyncCli = new AsyncCli();
-        Path dbDir = Paths.get("/Users/allison/Desktop/tmp-db");//Files.createTempDirectory("tika-async-db-");
+        Path dbDir = Files.createTempDirectory("tika-async-db-");
         try {
             asyncCli.execute(dbDir, configPath, maxConsumers);
         } finally {
@@ -46,6 +63,18 @@ public class AsyncCli {
 
     }
 
+    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+        PreparedStatement findActiveWorkers =
+                connection.prepareStatement("select worker_id from workers");
+        List<Integer> workers = new ArrayList<>();
+        try (ResultSet rs = findActiveWorkers.executeQuery()) {
+            while (rs.next()) {
+                workers.add(rs.getInt(1));
+            }
+        }
+        return workers;
+    }
+
     private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
         TikaConfig tikaConfig = new TikaConfig(configPath);
 
@@ -60,18 +89,19 @@ public class AsyncCli {
             if (fetchIterator instanceof EmptyFetchIterator) {
                 throw new IllegalArgumentException("can't have empty fetch iterator");
             }
-            ArrayBlockingQueue<FetchEmitTuple> q = new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
+            ArrayBlockingQueue<FetchEmitTuple> q =
+                    new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
             AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
             executorCompletionService.submit(fetchIterator);
             executorCompletionService.submit(enqueuer);
             executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
 
             for (int i = 0; i < maxConsumers; i++) {
-                executorCompletionService.submit(new AsyncWorker(connection,
-                        connectionString, i, configPath));
+                executorCompletionService
+                        .submit(new AsyncWorker(connection, connectionString, i, configPath));
             }
             int completed = 0;
-            while (completed < maxConsumers+3) {
+            while (completed < maxConsumers + 3) {
                 Future<Integer> future = executorCompletionService.take();
                 if (future != null) {
                     int val = future.get();
@@ -86,17 +116,13 @@ public class AsyncCli {
 
     private String setupTables(Path dbDir) throws SQLException {
         Path dbFile = dbDir.resolve("tika-async");
-        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
-                ";AUTO_SERVER=TRUE";
+        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
         Connection connection = DriverManager.getConnection(url);
 
-        String sql = "create table task_queue " +
-                "(id bigint auto_increment primary key," +
-                "status tinyint," +//byte
-                "worker_id integer," +
-                "retry smallint," + //short
-                "time_stamp timestamp," +
-                "json varchar(64000))";
+        String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
+                "status tinyint," + //byte
+                "worker_id integer," + "retry smallint," + //short
+                "time_stamp timestamp," + "json varchar(64000))";
         connection.createStatement().execute(sql);
         //no clear benefit to creating an index on timestamp
 //        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
@@ -106,17 +132,13 @@ public class AsyncCli {
         sql = "create table workers_shutdown (worker_id int primary key)";
         connection.createStatement().execute(sql);
 
-        sql = "create table error_log (task_id bigint, " +
-                "fetch_key varchar(10000)," +
-                "time_stamp timestamp," +
-                "retry integer," +
-                "error_code tinyint)";
+        sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
+                "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
         connection.createStatement().execute(sql);
 
         return url;
     }
 
-
     //this reads fetchemittuples from the queue and inserts them in the db
     //for the workers to read
     private static class AsyncTaskEnqueuer implements Callable<Integer> {
@@ -128,8 +150,8 @@ public class AsyncCli {
 
         private volatile boolean isComplete = false;
 
-        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
-                          Connection connection) throws SQLException {
+        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
+                throws SQLException {
             this.queue = queue;
             this.connection = connection;
             String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
@@ -142,7 +164,7 @@ public class AsyncCli {
             List<Integer> workers = new ArrayList<>();
             while (true) {
                 FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
-                LOG.debug("enqueing to db "+t);
+                LOG.debug("enqueing to db " + t);
                 if (t == null) {
                     //log.trace?
                 } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
@@ -155,7 +177,7 @@ public class AsyncCli {
                     while (workers.size() == 0 && elapsed < 600000) {
                         workers = getActiveWorkers(connection);
                         Thread.sleep(100);
-                        elapsed = System.currentTimeMillis()-start;
+                        elapsed = System.currentTimeMillis() - start;
                     }
                     insert(t, workers);
                 }
@@ -165,7 +187,9 @@ public class AsyncCli {
         boolean isComplete() {
             return isComplete;
         }
-        private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+
+        private void insert(FetchEmitTuple t, List<Integer> workers)
+                throws IOException, SQLException {
             int workerId = workers.size() == 1 ? workers.get(0) :
                     workers.get(random.nextInt(workers.size()));
             insert.clearParameters();
@@ -190,12 +214,12 @@ public class AsyncCli {
         private final Random random = new Random();
 
 
-        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer)
+                throws SQLException {
             this.connection = connection;
             this.enqueuer = enqueuer;
             //this gets workers and # of tasks in desc order of number of tasks
-            String sql = "select w.worker_id, p.cnt " +
-                    "from workers w " +
+            String sql = "select w.worker_id, p.cnt " + "from workers w " +
                     "left join (select worker_id, count(1) as cnt from task_queue " +
                     "where status=0 group by worker_id)" +
                     " p on p.worker_id=w.worker_id order by p.cnt desc";
@@ -213,14 +237,12 @@ public class AsyncCli {
             //current strategy reallocate tasks from longest queue to shortest
             //TODO: might consider randomly shuffling or other algorithms
             sql = "update task_queue set worker_id= ? where id in " +
-                    "(select id from task_queue where " +
-                    "worker_id = ? and " +
-                    "rand() < 0.8 " +
+                    "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
                     "and status=0 for update)";
             reallocate = connection.prepareStatement(sql);
 
-            sql = "select count(1) from task_queue where status="
-                    + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
+            sql = "select count(1) from task_queue where status=" +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
             countAvailableTasks = connection.prepareStatement(sql);
 
             sql = "insert into workers_shutdown (worker_id) values (?)";
@@ -251,7 +273,7 @@ public class AsyncCli {
         }
 
         private boolean isComplete() throws SQLException {
-            if (! enqueuer.isComplete) {
+            if (!enqueuer.isComplete) {
                 return false;
             }
             try (ResultSet rs = countAvailableTasks.executeQuery()) {
@@ -299,7 +321,8 @@ public class AsyncCli {
 
         }
 
-        private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+        private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
+                throws SQLException {
 
             if (missingWorkers.size() == 0) {
                 return;
@@ -316,8 +339,7 @@ public class AsyncCli {
                 allocateNonworkersToWorkers.setInt(1, active);
                 allocateNonworkersToWorkers.setInt(2, missing);
                 allocateNonworkersToWorkers.execute();
-                LOG.debug("allocating missing working ({}) to ({})",
-                        missing, active);
+                LOG.debug("allocating missing working ({}) to ({})", missing, active);
             }
         }
 
@@ -333,16 +355,4 @@ public class AsyncCli {
             return missingWorkers;
         }
     }
-
-    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
-        PreparedStatement findActiveWorkers = connection.prepareStatement(
-                "select worker_id from workers");
-        List<Integer> workers = new ArrayList<>();
-        try (ResultSet rs = findActiveWorkers.executeQuery()) {
-            while (rs.next()) {
-                workers.add(rs.getInt(1));
-            }
-        }
-        return workers;
-    }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index a86f563..33964b3 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -1,38 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.tika.utils.StringUtils;
-
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
+import org.apache.tika.utils.StringUtils;
+
 public class AsyncConfig {
 
+    private final int queueSize = 1000;
+    private final int numWorkers = 10;
+    private final int numEmitters = 1;
+    private String jdbcString;
+    private Path dbDir;
+
     public static AsyncConfig load(Path p) throws IOException {
         AsyncConfig asyncConfig = new AsyncConfig();
 
         if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
             asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
             Path dbFile = asyncConfig.dbDir.resolve("tika-async");
-            asyncConfig.setJdbcString("jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
-                    ";AUTO_SERVER=TRUE");
+            asyncConfig.setJdbcString(
+                    "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE");
         } else {
             asyncConfig.dbDir = null;
         }
         return asyncConfig;
     }
 
-    private int queueSize = 1000;
-    private int maxConsumers = 10;
-    private String jdbcString;
-    private Path dbDir;
-
     public int getQueueSize() {
         return queueSize;
     }
 
-    public int getMaxConsumers() {
-        return maxConsumers;
+    public int getNumWorkers() {
+        return numWorkers;
+    }
+
+    public int getNumEmitters() {
+        return numEmitters;
     }
 
     public String getJdbcString() {
@@ -46,6 +67,7 @@ public class AsyncConfig {
     /**
      * If no jdbc connection was specified, this
      * dir contains the h2 database.  Otherwise, null.
+     *
      * @return
      */
     public Path getTempDBDir() {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
index d7e058d..df80929 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
@@ -1,20 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitData;
-
-import java.util.List;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncData extends EmitData {
 
-    private final AsyncTask asyncTask;
+    private final long taskId;
+    private final FetchKey fetchKey;
+    private final FetchEmitTuple.ON_PARSE_EXCEPTION onParseException;
+
+    public AsyncData(@JsonProperty("taskId") long taskId,
+                     @JsonProperty("fetchKey") FetchKey fetchKey,
+                     @JsonProperty("emitKey") EmitKey emitKey, @JsonProperty("onParseException")
+                             FetchEmitTuple.ON_PARSE_EXCEPTION onParseException,
+                     @JsonProperty("metadataList") List<Metadata> metadataList) {
+        super(emitKey, metadataList);
+        this.taskId = taskId;
+        this.fetchKey = fetchKey;
+        this.onParseException = onParseException;
+    }
+
+    public FetchKey getFetchKey() {
+        return fetchKey;
+    }
 
-    public AsyncData(AsyncTask asyncTask, List<Metadata> metadataList) {
-        super(asyncTask.getEmitKey(), metadataList);
-        this.asyncTask = asyncTask;
+    public long getTaskId() {
+        return taskId;
     }
 
-    public AsyncTask getAsyncTask() {
-        return asyncTask;
+    public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
+        return onParseException;
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
index 02d7fec..502c8b4 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
 public interface AsyncEmitHook {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 071a1fa..4c44f9c 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -17,38 +17,19 @@
 package org.apache.tika.pipes.async;
 
 
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AsyncEmitter implements Callable<Integer> {
 
@@ -56,42 +37,27 @@ public class AsyncEmitter implements Callable<Integer> {
 
 
     private final String connectionString;
-    private final int workerId;
+    private final int emitterId;
     private final Path tikaConfigPath;
     private final Connection connection;
     private final PreparedStatement finished;
     private final PreparedStatement restarting;
-    private final PreparedStatement selectActiveTasks;
-    private final PreparedStatement insertErrorLog;
-    private final PreparedStatement resetStatus;
 
-    public AsyncEmitter(Connection connection,
-                       String connectionString, int workerId,
-                       Path tikaConfigPath) throws SQLException {
+    public AsyncEmitter(Connection connection, String connectionString, int emitterId,
+                        Path tikaConfigPath) throws SQLException {
         this.connectionString = connectionString;
-        this.workerId = workerId;
+        this.emitterId = emitterId;
         this.tikaConfigPath = tikaConfigPath;
         this.connection = connection;
-        String sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
-                " where worker_id = (" + workerId + ")";
+        String sql = "update emitters set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
+                " where emitter_id = (" + emitterId + ")";
         finished = connection.prepareStatement(sql);
 
-        sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
-                " where worker_id = (" + workerId + ")";
+        sql = "update emitters set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
+                " where emitter_id = (" + emitterId + ")";
         restarting = connection.prepareStatement(sql);
-        //this checks if the process was able to reset the status
-        sql = "select id, retry, json from task_queue where worker_id="
-                + workerId +
-                " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
-        selectActiveTasks = connection.prepareStatement(sql);
-
-        //if not, this is called to insert into the error log
-        insertErrorLog = prepareInsertErrorLog(connection);
-
-        //and this is called to reset the status on error
-        resetStatus = prepareReset(connection);
     }
 
     @Override
@@ -105,7 +71,7 @@ public class AsyncEmitter implements Callable<Integer> {
                 if (finished) {
                     int exitValue = p.exitValue();
                     if (exitValue == 0) {
-                        LOG.info("forked emitter process finished with exitValue=0");
+                        LOG.debug("forked emitter process finished with exitValue=0");
                         return 1;
                     }
                     reportCrash(++restarts, exitValue);
@@ -121,72 +87,24 @@ public class AsyncEmitter implements Callable<Integer> {
     }
 
     private Process start() throws IOException {
-        String[] args = new String[]{
-                "java", "-Djava.awt.headless=true",
-                "-cp", System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncEmitterProcess",
-                Integer.toString(workerId)
-        };
+        String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
+                System.getProperty("java.class.path"),
+                "org.apache.tika.pipes.async.AsyncEmitterProcess", Integer.toString(emitterId)};
         ProcessBuilder pb = new ProcessBuilder(args);
         pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
-        pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
-                tikaConfigPath.toAbsolutePath().toString());
+        pb.environment()
+                .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
         pb.inheritIO();
         return pb.start();
     }
 
     private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
-        LOG.warn("worker id={} terminated, exitValue={}",
-                workerId, exitValue);
+        LOG.warn("emitter id={} terminated, exitValue={}", emitterId, exitValue);
         restarting.execute();
-        List<AsyncTask> activeTasks = new ArrayList<>();
-        try (ResultSet rs = selectActiveTasks.executeQuery()) {
-            long taskId = rs.getLong(1);
-            short retry = rs.getShort(2);
-            String json = rs.getString(3);
-            FetchEmitTuple tuple = JsonFetchEmitTuple.fromJson(new StringReader(json));
-            activeTasks.add(new AsyncTask(taskId, retry, tuple));
-        }
-        if (activeTasks.size() == 0) {
-            LOG.info("worker reset active tasks, nothing extra to report");
-            return;
-        }
-        if (activeTasks.size() > 1) {
-            LOG.warn("more than one active task? this should never happen!");
-        }
-
-        for (AsyncTask t : activeTasks) {
-            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
-                    insertErrorLog, resetStatus, LOG);
-        }
-
-    }
-
-    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
-                               PreparedStatement insertErrorLog, PreparedStatement resetStatus,
-                               Logger logger) {
-        try {
-            insertErrorLog.clearParameters();
-            insertErrorLog.setLong(1, task.getTaskId());
-            insertErrorLog.setString(2, task.getFetchKey().getKey());
-            insertErrorLog.setInt(3, task.getRetry());
-            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
-            insertErrorLog.execute();
-        } catch (SQLException e) {
-            logger.error("Can't update error log", e);
-        }
-
-        try {
-            resetStatus.clearParameters();
-            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            resetStatus.setShort(2, (short)(task.getRetry()+1));
-            resetStatus.setLong(3, task.getTaskId());
-            resetStatus.execute();
-        } catch (SQLException e) {
-            logger.error("Can't reset try status", e);
-        }
+        //should we unassign emit tasks here?
     }
 
+ /*
     static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
         //if not, this is called to insert into the error log
         return connection.prepareStatement(
@@ -203,5 +121,5 @@ public class AsyncEmitter implements Callable<Integer> {
                         "time_stamp=CURRENT_TIMESTAMP(), " +
                         "retry=? " +
                         "where id=?");
-    }
+    }*/
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
index 37c5f3a..03f5b93 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -16,21 +16,9 @@
  */
 package org.apache.tika.pipes.async;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.commons.io.IOUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Paths;
 import java.sql.Blob;
 import java.sql.Connection;
@@ -42,95 +30,184 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
 
 public class AsyncEmitterProcess {
 
-    //TODO -- parameterize these
-    private long emitWithinMs = 10000;
-    private long emitMaxBytes = 10_000_000;
     private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitterProcess.class);
-
-    private final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+    private final LZ4FastDecompressor decompressor =
+            LZ4Factory.fastestInstance().fastDecompressor();
     private final ObjectMapper objectMapper = new ObjectMapper();
+    private final FutureTask<Integer> stdinWatcher;
     int recordsPerPulse = 10;
+    //TODO -- parameterize these
+    private final long emitWithinMs = 10000;
+    private final long emitMaxBytes = 10_000_000;
     private PreparedStatement markForSelecting;
     private PreparedStatement selectForProcessing;
     private PreparedStatement emitStatusUpdate;
-    private PreparedStatement checkForShutdown;
+    private PreparedStatement checkForCanShutdown;
+    private PreparedStatement checkForShouldShutdown;
+    private PreparedStatement updateEmitterStatus;
+
+    private AsyncEmitterProcess(InputStream stdin) {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
+        objectMapper.registerModule(module);
+        stdinWatcher = new FutureTask<>(new ForkWatcher(stdin));
+        new Thread(stdinWatcher).start();
+    }
 
     public static void main(String[] args) throws Exception {
         String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
-        TikaConfig tikaConfig = new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+        TikaConfig tikaConfig =
+                new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
         int workerId = Integer.parseInt(args[0]);
         LOG.debug("trying to get connection {} >{}<", workerId, db);
         try (Connection connection = DriverManager.getConnection(db)) {
-            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess(System.in);
             asyncEmitter.execute(connection, workerId, tikaConfig);
         }
         System.exit(0);
     }
 
-    private void execute(Connection connection, int workerId,
-                         TikaConfig tikaConfig) throws SQLException,
-            InterruptedException {
+    private static void reportEmitStatus(List<Long> ids,
+                                         AsyncWorkerProcess.TASK_STATUS_CODES emitted,
+                                         PreparedStatement emitStatusUpdate) throws SQLException {
+        for (long id : ids) {
+            emitStatusUpdate.clearParameters();
+            emitStatusUpdate.setByte(1, (byte) emitted.ordinal());
+            emitStatusUpdate.setLong(2, id);
+            emitStatusUpdate.addBatch();
+        }
+        emitStatusUpdate.executeBatch();
+    }
+
+    private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
+            throws SQLException, InterruptedException {
         prepareStatements(connection, workerId);
+        updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.ACTIVE.ordinal());
         EmitterManager emitterManager = tikaConfig.getEmitterManager();
-        EmitDataCache emitDataCache = new EmitDataCache(emitterManager, emitMaxBytes,
-                emitStatusUpdate);
-        int shouldShutdown = 0;
+        EmitDataCache emitDataCache =
+                new EmitDataCache(emitterManager, emitMaxBytes, emitStatusUpdate);
+        try {
+            mainLoop(emitDataCache);
+        } finally {
+            emitDataCache.emitAll();
+            updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal());
+        }
+    }
+
+    private void mainLoop(EmitDataCache emitDataCache) throws InterruptedException, SQLException {
+
         while (true) {
+            if (shouldShutdown()) {
+                LOG.debug("received should shutdown signal");
+                return;
+            }
             int toEmit = markForSelecting.executeUpdate();
+            if (toEmit == 0 && canShutdown()) {
+                //avoid race condition; double check there's nothing
+                //left to emit
+                toEmit = markForSelecting.executeUpdate();
+                if (toEmit == 0) {
+                    LOG.debug("received can shutdown and didn't update any for selecting");
+                    return;
+                }
+            }
             if (toEmit > 0) {
-                try (ResultSet rs = selectForProcessing.executeQuery()) {
-                    while (rs.next()) {
-                        long id = rs.getLong(1);
-                        Timestamp ts = rs.getTimestamp(2);
-                        int uncompressedSize = rs.getInt(3);
-                        Blob blob = rs.getBlob(4);
-                        try {
-                            tryToEmit(id, ts, uncompressedSize, blob,
-                                    emitDataCache);
-                        } catch (SQLException|IOException e) {
-                            reportEmitStatus(
-                                    Collections.singletonList(id),
-                                    AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
-                                    emitStatusUpdate
-                            );
-                        }
-                    }
+                try {
+                    tryToEmitNextBatch(emitDataCache);
+                } catch (IOException e) {
+                    LOG.warn("IOException trying to emit", e);
                 }
             }
             if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
                 emitDataCache.emitAll();
             }
             Thread.sleep(500);
-            if (shouldShutdown()) {
-                shouldShutdown++;
+        }
+    }
+
+    private void tryToEmitNextBatch(EmitDataCache emitDataCache) throws IOException, SQLException {
+        List<AsyncEmitTuple> toEmitList = new ArrayList<>();
+        try (ResultSet rs = selectForProcessing.executeQuery()) {
+            while (rs.next()) {
+                long id = rs.getLong(1);
+                Timestamp ts = rs.getTimestamp(2);
+                int uncompressedSize = rs.getInt(3);
+                Blob blob = rs.getBlob(4);
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                IOUtils.copyLarge(blob.getBinaryStream(), bos);
+                byte[] bytes = bos.toByteArray();
+                toEmitList.add(new AsyncEmitTuple(id, ts, uncompressedSize, bytes));
             }
-            //make sure to test twice
-            if (shouldShutdown > 1) {
-                emitDataCache.emitAll();
-                return;
+        }
+        List<Long> successes = new ArrayList<>();
+        List<Long> exceptions = new ArrayList<>();
+        for (AsyncEmitTuple tuple : toEmitList) {
+            try {
+                tryToEmit(tuple, emitDataCache);
+                successes.add(tuple.id);
+            } catch (IOException | SQLException e) {
+                exceptions.add(tuple.id);
             }
         }
+        reportEmitStatus(successes, AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS_EMIT,
+                emitStatusUpdate);
+        reportEmitStatus(exceptions, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+                emitStatusUpdate);
+
+    }
+
+    private void updateStatus(byte status) throws SQLException {
+        updateEmitterStatus.clearParameters();
+        updateEmitterStatus.setByte(1, status);
+        updateEmitterStatus.executeUpdate();
     }
 
-    private void tryToEmit(long id, Timestamp ts,
-                           int decompressedLength,
-                           Blob blob, EmitDataCache emitDataCache)
+    private void tryToEmit(AsyncEmitTuple asyncEmitTuple, EmitDataCache emitDataCache)
             throws SQLException, IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        IOUtils.copyLarge(blob.getBinaryStream(), bos);
-        AsyncData asyncData = deserialize(bos.toByteArray(), decompressedLength);
+        AsyncData asyncData = deserialize(asyncEmitTuple.bytes, asyncEmitTuple.uncompressedSize);
         emitDataCache.add(asyncData);
     }
 
     boolean shouldShutdown() throws SQLException {
-        try (ResultSet rs = checkForShutdown.executeQuery()) {
+        if (stdinWatcher.isDone()) {
+            LOG.info("parent inputstream closed; shutting down now");
+        }
+        try (ResultSet rs = checkForShouldShutdown.executeQuery()) {
+            if (rs.next()) {
+                int val = rs.getInt(1);
+                return val > 0;
+            }
+        }
+        return false;
+    }
+
+    boolean canShutdown() throws SQLException {
+        try (ResultSet rs = checkForCanShutdown.executeQuery()) {
             if (rs.next()) {
                 int val = rs.getInt(1);
                 return val > 0;
@@ -141,54 +218,57 @@ public class AsyncEmitterProcess {
 
     private void prepareStatements(Connection connection, int workerId) throws SQLException {
         String sql = "update task_queue set status=" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal()+
-                ", worker_id="+workerId+", time_stamp=CURRENT_TIMESTAMP()"+
-                " where id in " +
-                " (select id from task_queue "+//where worker_id = " + workerId +
-                " where status="+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
-                " order by time_stamp asc limit "+recordsPerPulse+" for update)";
+                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + ", worker_id=" +
+                workerId + ", time_stamp=CURRENT_TIMESTAMP()" + " where id in " +
+                " (select id from task_queue " + //where worker_id = " + workerId +
+                " where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
+                " order by time_stamp asc limit " + recordsPerPulse + " for update)";
         markForSelecting = connection.prepareStatement(sql);
 
-        sql = "select q.id, q.time_stamp, uncompressed_size, bytes from emits e " +
-                "join task_queue q " +
-                "where q.status=" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() +
-                " and worker_id=" + workerId +
-                " order by time_stamp asc";
+        sql = "select q.id, q.time_stamp, uncompressed_size, bytes " + "from emits e " +
+                "join task_queue q on e.id=q.id " + "where q.status=" +
+                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + " and worker_id=" +
+                workerId + " order by time_stamp asc";
         selectForProcessing = connection.prepareStatement(sql);
 
-        sql = "update task_queue set status=?"+
-                ", time_stamp=CURRENT_TIMESTAMP()"+
-                " where id=?";
+        //only update the status if it is not already emitted or failed emit
+        sql = "update task_queue set status=?" + ", time_stamp=CURRENT_TIMESTAMP()" +
+                " where id=? and status not in (" +
+                AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED.ordinal() + ", " +
+                AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT.ordinal() + ")";
+
         emitStatusUpdate = connection.prepareStatement(sql);
 
-        sql = "select count(1) from workers where worker_id=" + workerId +
-                " and status="+ AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
-        checkForShutdown = connection.prepareStatement(sql);
-    }
+        sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
+        checkForCanShutdown = connection.prepareStatement(sql);
 
+        sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+        checkForShouldShutdown = connection.prepareStatement(sql);
 
-    private AsyncData deserialize(byte[] compressed, int decompressedLength)
-            throws IOException {
-        byte[] restored = new byte[decompressedLength];
-        int compressedLength2 = decompressor.decompress(compressed, 0, restored,
-                0, decompressedLength);
+        sql = "merge into emitters key (emitter_id) " + "values (" + workerId + ", ? )";
+        updateEmitterStatus = connection.prepareStatement(sql);
+    }
 
-        return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+    private AsyncData deserialize(byte[] compressed, int decompressedLength) throws IOException {
+        byte[] restored = new byte[decompressedLength];
+        int compressedLength2 =
+                decompressor.decompress(compressed, 0, restored, 0, decompressedLength);
+        return objectMapper.readerFor(AsyncData.class).readValue(restored);
     }
 
     private static class EmitDataCache {
         private final EmitterManager emitterManager;
         private final long maxBytes;
         private final PreparedStatement emitStatusUpdate;
-        private Instant lastAdded = Instant.now();
-
         long estimatedSize = 0;
         int size = 0;
         Map<String, List<AsyncData>> map = new HashMap<>();
+        private Instant lastAdded = Instant.now();
 
-        public EmitDataCache(EmitterManager emitterManager,
-                             long maxBytes, PreparedStatement emitStatusUpdate) {
+        public EmitDataCache(EmitterManager emitterManager, long maxBytes,
+                             PreparedStatement emitStatusUpdate) {
             this.emitterManager = emitterManager;
             this.maxBytes = maxBytes;
             this.emitStatusUpdate = emitStatusUpdate;
@@ -201,10 +281,11 @@ public class AsyncEmitterProcess {
         void add(AsyncData data) {
 
             size++;
-            long sz = AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
+            long sz = AbstractEmitter
+                    .estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
             if (estimatedSize + sz > maxBytes) {
                 LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
-                        (estimatedSize+sz), maxBytes);
+                        (estimatedSize + sz), maxBytes);
                 emitAll();
             }
             List<AsyncData> cached = map.get(data.getEmitKey().getEmitterName());
@@ -240,7 +321,7 @@ public class AsyncEmitterProcess {
                 throws SQLException {
             List<Long> ids = new ArrayList<>();
             for (AsyncData d : cachedEmitData) {
-                ids.add(d.getAsyncTask().getTaskId());
+                ids.add(d.getTaskId());
             }
             try {
                 emitter.emit(cachedEmitData);
@@ -250,27 +331,49 @@ public class AsyncEmitterProcess {
                 reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
                         emitStatusUpdate);
             }
-            reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED,
-                    emitStatusUpdate);
+            reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED, emitStatusUpdate);
             return 1;
         }
 
 
         public boolean exceedsEmitWithin(long emitWithinMs) {
-            return ChronoUnit.MILLIS.between(lastAdded, Instant.now())
-                    > emitWithinMs;
+            return ChronoUnit.MILLIS.between(lastAdded, Instant.now()) > emitWithinMs;
         }
     }
 
-    private static void reportEmitStatus(List<Long> ids,
-                                         AsyncWorkerProcess.TASK_STATUS_CODES emitted,
-                                         PreparedStatement emitStatusUpdate)
-            throws SQLException {
-        for (long id : ids) {
-            emitStatusUpdate.clearParameters();
-            emitStatusUpdate.setByte(1, (byte)emitted.ordinal());
-            emitStatusUpdate.setLong(2, id);
-            emitStatusUpdate.executeUpdate();
+    private static class ForkWatcher implements Callable<Integer> {
+        private final InputStream in;
+
+        public ForkWatcher(InputStream in) {
+            this.in = in;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            //this should block forever
+            //if the forking process dies,
+            // this will either throw an IOException or read -1.
+            try {
+                int i = in.read();
+            } finally {
+                LOG.warn("forking process shutdown; exiting now");
+                System.exit(0);
+            }
+            return 1;
+        }
+    }
+
+    private static class AsyncEmitTuple {
+        final long id;
+        final Timestamp timestamp;
+        final int uncompressedSize;
+        final byte[] bytes;
+
+        public AsyncEmitTuple(long id, Timestamp timestamp, int uncompressedSize, byte[] bytes) {
+            this.id = id;
+            this.timestamp = timestamp;
+            this.uncompressedSize = uncompressedSize;
+            this.bytes = bytes;
         }
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
index 3828b24..79ed80d 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
@@ -1,12 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class AsyncPipesEmitHook implements AsyncEmitHook {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncPipesEmitHook.class);
@@ -14,7 +30,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
     private final PreparedStatement markSuccess;
     private final PreparedStatement markFailure;
 
-    public AsyncPipesEmitHook(Connection connection) throws SQLException  {
+    public AsyncPipesEmitHook(Connection connection) throws SQLException {
         String sql = "delete from task_queue where id=?";
         markSuccess = connection.prepareStatement(sql);
         //TODO --fix this
@@ -28,7 +44,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
             markSuccess.setLong(1, task.getTaskId());
             markSuccess.execute();
         } catch (SQLException e) {
-            LOG.warn("problem with on success: "+task.getTaskId(), e);
+            LOG.warn("problem with on success: " + task.getTaskId(), e);
         }
     }
 
@@ -39,7 +55,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
             markFailure.setLong(1, task.getTaskId());
             markFailure.execute();
         } catch (SQLException e) {
-            LOG.warn("problem with on fail: "+task.getTaskId(), e);
+            LOG.warn("problem with on fail: " + task.getTaskId(), e);
         }
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index b4d8a8a..efd6fec 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -1,14 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
 import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -17,8 +28,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -28,44 +41,69 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
 public class AsyncProcessor implements Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
     protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
     protected static String TIKA_ASYNC_CONFIG_FILE_KEY = "TIKA_ASYNC_CONFIG_FILE_KEY";
     private final Path tikaConfigPath;
-    private AsyncConfig asyncConfig;
     private final ArrayBlockingQueue<FetchEmitTuple> queue;
     private final Connection connection;
-    private int finishedThreads = 0;
     private final int totalThreads;
+    private final AsyncConfig asyncConfig;
+    private PreparedStatement emittersCanShutdown;
+    private volatile boolean isShuttingDown = false;
+    private AssignmentManager assignmentManager;
+    private int finishedThreads = 0;
     private ExecutorService executorService;
     private ExecutorCompletionService<Integer> executorCompletionService;
 
+    private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
+        this.tikaConfigPath = tikaConfigPath;
+        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+        this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
+        this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
+        this.totalThreads = asyncConfig.getNumWorkers() + asyncConfig.getNumEmitters() +
+                2;//assignment manager and enqueuer threads
+    }
+
     public static AsyncProcessor build(Path tikaConfigPath) throws AsyncRuntimeException {
         try {
             AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
-
             processor.init();
             return processor;
-        } catch (SQLException|IOException e) {
+        } catch (SQLException | IOException e) {
             throw new AsyncRuntimeException(e);
         }
     }
 
-    private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
-        this.tikaConfigPath = tikaConfigPath;
-        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
-        this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
-        this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
-        this.totalThreads = asyncConfig.getMaxConsumers() + 2 + 1;
+    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+        PreparedStatement findActiveWorkers =
+                connection.prepareStatement("select worker_id from workers");
+        List<Integer> workers = new ArrayList<>();
+        try (ResultSet rs = findActiveWorkers.executeQuery()) {
+            while (rs.next()) {
+                workers.add(rs.getInt(1));
+            }
+        }
+        return workers;
     }
 
-    public synchronized boolean offer (
-            List<FetchEmitTuple> fetchEmitTuples, long offerMs) throws
-            AsyncRuntimeException, InterruptedException {
+    public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs)
+            throws AsyncRuntimeException, InterruptedException {
         if (queue == null) {
             throw new IllegalStateException("queue hasn't been initialized yet.");
+        } else if (isShuttingDown) {
+            throw new IllegalStateException(
+                    "Can't call offer after calling close() or " + "shutdownNow()");
         }
         long start = System.currentTimeMillis();
         long elapsed = System.currentTimeMillis() - start;
@@ -87,6 +125,12 @@ public class AsyncProcessor implements Closeable {
 
     public synchronized boolean offer(FetchEmitTuple t, long offerMs)
             throws AsyncRuntimeException, InterruptedException {
+        if (queue == null) {
+            throw new IllegalStateException("queue hasn't been initialized yet.");
+        } else if (isShuttingDown) {
+            throw new IllegalStateException(
+                    "Can't call offer after calling close() or " + "shutdownNow()");
+        }
         checkActive();
         return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
     }
@@ -99,8 +143,7 @@ public class AsyncProcessor implements Closeable {
      * @throws AsyncRuntimeException
      * @throws InterruptedException
      */
-    public synchronized boolean checkActive()
-            throws AsyncRuntimeException, InterruptedException {
+    public synchronized boolean checkActive() throws AsyncRuntimeException, InterruptedException {
         Future<Integer> future = executorCompletionService.poll();
         if (future != null) {
             try {
@@ -110,74 +153,87 @@ public class AsyncProcessor implements Closeable {
             }
             finishedThreads++;
         }
-        if (finishedThreads == totalThreads) {
-            return false;
-        }
-        return true;
+        return finishedThreads != totalThreads;
     }
 
     private void init() throws SQLException {
 
         setupTables();
+        String sql = "update emitters set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
+        this.emittersCanShutdown = connection.prepareStatement(sql);
+        executorService = Executors.newFixedThreadPool(totalThreads);
+        executorCompletionService = new ExecutorCompletionService<>(executorService);
 
-        executorService = Executors.newFixedThreadPool(
-                totalThreads);
-        executorCompletionService =
-                new ExecutorCompletionService<>(executorService);
+        AsyncTaskEnqueuer taskEnqueuer = new AsyncTaskEnqueuer(queue, connection);
 
-        AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
+        executorCompletionService.submit(taskEnqueuer);
 
-        executorCompletionService.submit(enqueuer);
-        executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
-        //executorCompletionService.submit(new )
-        for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
-            executorCompletionService.submit(new AsyncWorker(connection,
-                    asyncConfig.getJdbcString(), i, tikaConfigPath));
+        List<AsyncWorker> workers = buildWorkers(connection, asyncConfig, tikaConfigPath);
+        int maxRetries = 0;
+        for (AsyncWorker worker : workers) {
+            if (worker.getMaxRetries() > maxRetries) {
+                maxRetries = worker.getMaxRetries();
+            }
+            executorCompletionService.submit(worker);
+        }
+        assignmentManager = new AssignmentManager(connection, taskEnqueuer, maxRetries);
+        executorCompletionService.submit(assignmentManager);
+        for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
+            executorCompletionService
+                    .submit(new AsyncEmitter(connection, asyncConfig.getJdbcString(),
+                            asyncConfig.getNumWorkers() + i, tikaConfigPath));
         }
-        executorCompletionService.submit(new AsyncEmitter(connection,
-                asyncConfig.getJdbcString(), asyncConfig.getMaxConsumers(),
-                tikaConfigPath));
+    }
+
+    private List<AsyncWorker> buildWorkers(Connection connection, AsyncConfig asyncConfig,
+                                           Path tikaConfigPath) throws SQLException {
+        //TODO -- make these workers configurable via the tika config, e.g. max retries
+        //and jvm args, etc.
+        List<AsyncWorker> workers = new ArrayList<>();
+        for (int i = 0; i < asyncConfig.getNumWorkers(); i++) {
+            workers.add(
+                    new AsyncWorker(connection, asyncConfig.getJdbcString(), i, tikaConfigPath));
+        }
+        return workers;
     }
 
     private void setupTables() throws SQLException {
 
-        String sql = "create table task_queue " +
-                "(id bigint auto_increment primary key," +
-                "status tinyint," +//byte
-                "worker_id integer," +
-                "retry smallint," + //short
+        String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
+                "status tinyint," + //byte
+                "worker_id integer," + "retry smallint," + //short
                 "time_stamp timestamp," +
                 "json varchar(64000))";//this is the AsyncTask ... not the emit data!
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
-        //no clear benefit to creating an index on timestamp
-//        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
         sql = "create table workers (worker_id int primary key, status tinyint)";
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
 
-        sql = "create table error_log (task_id bigint, " +
-                "fetch_key varchar(10000)," +
-                "time_stamp timestamp," +
-                "retry integer," +
-                "error_code tinyint)";
+        sql = "create table emitters (emitter_id int primary key, status tinyint)";
+        try (Statement st = connection.createStatement()) {
+            st.execute(sql);
+        }
+
+        sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
+                "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
+
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
 
-        sql = "create table emits (" +
-                "id bigint primary key, " +
-                "time_stamp timestamp, "+
-                "uncompressed_size bigint, " +
-                "bytes blob)";
+        sql = "create table emits (" + "id bigint primary key, " + "time_stamp timestamp, " +
+                "uncompressed_size bigint, " + "bytes blob)";
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
     }
 
     public void shutdownNow() throws IOException, AsyncRuntimeException {
+        isShuttingDown = true;
         try {
             executorService.shutdownNow();
         } finally {
@@ -189,50 +245,65 @@ public class AsyncProcessor implements Closeable {
     }
 
     /**
-     * This is a blocking close.  If you need to shutdown immediately,
-     * try {@link #shutdownNow()}.
+     * This is a blocking close.  It will wait for all tasks successfully submitted before this
+     * call to close() to complete before closing.  If you need to shutdown immediately, try
+     * {@link #shutdownNow()}.
+     *
      * @throws IOException
      */
     @Override
     public void close() throws IOException {
+        isShuttingDown = true;
         try {
-            for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
-                try {
-                    //blocking
-                    queue.put(FetchIterator.COMPLETED_SEMAPHORE);
-                } catch (InterruptedException e) {
-                    //swallow
-                }
-            }
-            //TODO: clean this up
-            String sql = "update workers set status="+
-                    AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
-                    " where worker_id = (" + asyncConfig.getMaxConsumers() + ")";
-            try {
-                connection.prepareStatement(sql).execute();
-            } catch (SQLException throwables) {
-                throwables.printStackTrace();
-            }
-            long start = System.currentTimeMillis();
-            long elapsed = System.currentTimeMillis() - start;
-            try {
-                boolean isActive = checkActive();
-                while (isActive) {
-                    isActive = checkActive();
-                    elapsed = System.currentTimeMillis();
-                }
-            } catch (InterruptedException e) {
-                return;
-            }
+            completeAndShutdown();
+        } catch (SQLException | InterruptedException e) {
+            throw new IOException(e);
         } finally {
             executorService.shutdownNow();
+            SQLException ex = null;
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                ex = e;
+            }
             //close down processes and db
             if (asyncConfig.getTempDBDir() != null) {
                 FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
             }
+            if (ex != null) {
+                throw new IOException(ex);
+            }
         }
     }
 
+    //this will block until everything finishes
+    private void completeAndShutdown() throws SQLException, InterruptedException {
+
+        //blocking...notify taskEnqueuer
+        queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+
+        //wait for assignmentManager to finish
+        //it will only complete after the task enqueuer has completed
+        //and there are no more parse tasks available, selected or in process
+        while (!assignmentManager.hasCompletedTasks()) {
+            Thread.sleep(100);
+        }
+
+        emittersCanShutdown.executeUpdate();
+
+        //wait for emitters to finish
+        long start = System.currentTimeMillis();
+        long elapsed = System.currentTimeMillis() - start;
+        try {
+            boolean isActive = checkActive();
+            while (isActive) {
+                isActive = checkActive();
+                elapsed = System.currentTimeMillis();
+            }
+        } catch (InterruptedException e) {
+            return;
+        }
+    }
 
     //this reads fetchemittuples from the queue and inserts them in the db
     //for the workers to read
@@ -245,8 +316,8 @@ public class AsyncProcessor implements Closeable {
 
         private volatile boolean isComplete = false;
 
-        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
-                          Connection connection) throws SQLException {
+        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
+                throws SQLException {
             this.queue = queue;
             this.connection = connection;
             String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
@@ -268,7 +339,7 @@ public class AsyncProcessor implements Closeable {
                 } else {
                     long start = System.currentTimeMillis();
                     long elapsed = System.currentTimeMillis() - start;
-                    //TODO -- fix this
+                    //TODO -- fix this -- this loop waits for workers to register
                     while (workers.size() == 0 && elapsed < 600000) {
                         workers = getActiveWorkers(connection);
                         Thread.sleep(100);
@@ -283,7 +354,8 @@ public class AsyncProcessor implements Closeable {
             return isComplete;
         }
 
-        private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+        private void insert(FetchEmitTuple t, List<Integer> workers)
+                throws IOException, SQLException {
             int workerId = workers.size() == 1 ? workers.get(0) :
                     workers.get(random.nextInt(workers.size()));
             insert.clearParameters();
@@ -305,18 +377,24 @@ public class AsyncProcessor implements Closeable {
         private final PreparedStatement reallocate;
         private final PreparedStatement countAvailableTasks;
         private final PreparedStatement shutdownWorker;
+        private final PreparedStatement findMaxRetrieds;
+        private final PreparedStatement logMaxRetrieds;
+        private final PreparedStatement removeMaxRetrieds;
         private final Random random = new Random();
+        private final int maxRetries;
+        private volatile boolean hasCompleted = false;
 
 
-        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer, int maxRetries)
+                throws SQLException {
             this.connection = connection;
             this.enqueuer = enqueuer;
+            this.maxRetries = maxRetries;
             //this gets workers and # of tasks in desc order of number of tasks
-            String sql = "select w.worker_id, p.cnt " +
-                    "from workers w " +
+            String sql = "select w.worker_id, p.cnt " + "from workers w " +
                     "left join (select worker_id, count(1) as cnt from task_queue " +
-                    "where status=0 group by worker_id)" +
-                    " p on p.worker_id=w.worker_id order by p.cnt desc";
+                    "where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() +
+                    " group by worker_id)" + " p on p.worker_id=w.worker_id order by p.cnt desc";
             getQueueDistribution = connection.prepareStatement(sql);
             //find workers that have assigned tasks but are not in the
             //workers table
@@ -331,26 +409,46 @@ public class AsyncProcessor implements Closeable {
             //current strategy reallocate tasks from longest queue to shortest
             //TODO: might consider randomly shuffling or other algorithms
             sql = "update task_queue set worker_id= ? where id in " +
-                    "(select id from task_queue where " +
-                    "worker_id = ? and " +
-                    "rand() < 0.8 " +
+                    "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
                     "and status=0 for update)";
             reallocate = connection.prepareStatement(sql);
 
-            sql = "select count(1) from task_queue where status="
-                    + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
+            //get those tasks that are in the parse phase
+            //if they are selected or in process, it is possible that
+            //they'll need to be retried.  So, include all statuses
+            //meaning that the parse has not completed.
+            sql = "select count(1) from task_queue where status in (" +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() + ", " +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED.ordinal() + ", " +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal() + ")";
             countAvailableTasks = connection.prepareStatement(sql);
 
-            sql = "update workers set status="+
+            sql = "update workers set status=" +
                     AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
-                " where worker_id = ?";
+                    " where worker_id = ?";
             shutdownWorker = connection.prepareStatement(sql);
+
+            sql = "select id, retry, json from task_queue where retry >=" + maxRetries;
+            findMaxRetrieds = connection.prepareStatement(sql);
+
+            sql = "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code)" +
+                    "values (?,?,CURRENT_TIMESTAMP(), ?," +
+                    AsyncWorkerProcess.ERROR_CODES.MAX_RETRIES.ordinal() + ")";
+            logMaxRetrieds = connection.prepareStatement(sql);
+
+            sql = "delete from task_queue where id=?";
+            removeMaxRetrieds = connection.prepareStatement(sql);
+        }
+
+        protected boolean hasCompletedTasks() {
+            return hasCompleted;
         }
 
         @Override
         public Integer call() throws Exception {
 
             while (true) {
+                removeMaxRetrieds();
                 List<Integer> missingWorkers = getMissingWorkers();
                 reallocateFromMissingWorkers(missingWorkers);
                 redistribute();
@@ -362,6 +460,39 @@ public class AsyncProcessor implements Closeable {
             }
         }
 
+        private void removeMaxRetrieds() throws SQLException {
+            Set<Long> toRemove = new HashSet<>();
+            try (ResultSet rs = findMaxRetrieds.executeQuery()) {
+                while (rs.next()) {
+                    long id = rs.getLong(1);
+                    String json = rs.getString(2);
+                    int retries = rs.getInt(3);
+                    toRemove.add(id);
+                    FetchEmitTuple t;
+                    try (Reader reader = new StringReader(json)) {
+                        t = JsonFetchEmitTuple.fromJson(reader);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        //need to log this in the error_logs table
+                        continue;
+                    }
+                    logMaxRetrieds.clearParameters();
+                    logMaxRetrieds.setLong(1, id);
+                    logMaxRetrieds.setString(2, t.getFetchKey().getFetchKey());
+                    logMaxRetrieds.setInt(3, retries);
+                    logMaxRetrieds.addBatch();
+                }
+            }
+            logMaxRetrieds.executeBatch();
+
+            for (Long id : toRemove) {
+                removeMaxRetrieds.clearParameters();
+                removeMaxRetrieds.setLong(1, id);
+                removeMaxRetrieds.addBatch();
+            }
+            removeMaxRetrieds.executeBatch();
+        }
+
         private void notifyWorkers() throws SQLException {
             for (int workerId : getActiveWorkers(connection)) {
                 shutdownWorker.clearParameters();
@@ -371,12 +502,19 @@ public class AsyncProcessor implements Closeable {
         }
 
         private boolean isComplete() throws SQLException {
+            if (hasCompleted) {
+                return hasCompleted;
+            }
             if (!enqueuer.isComplete) {
                 return false;
             }
             try (ResultSet rs = countAvailableTasks.executeQuery()) {
                 while (rs.next()) {
-                    return rs.getInt(1) == 0;
+                    int availTasks = rs.getInt(1);
+                    if (availTasks == 0) {
+                        hasCompleted = true;
+                        return true;
+                    }
                 }
             }
             return false;
@@ -419,7 +557,8 @@ public class AsyncProcessor implements Closeable {
 
         }
 
-        private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+        private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
+                throws SQLException {
 
             if (missingWorkers.size() == 0) {
                 return;
@@ -436,8 +575,7 @@ public class AsyncProcessor implements Closeable {
                 allocateNonworkersToWorkers.setInt(1, active);
                 allocateNonworkersToWorkers.setInt(2, missing);
                 allocateNonworkersToWorkers.execute();
-                LOG.debug("allocating missing working ({}) to ({})",
-                        missing, active);
+                LOG.debug("allocating missing working ({}) to ({})", missing, active);
             }
         }
 
@@ -453,16 +591,4 @@ public class AsyncProcessor implements Closeable {
             return missingWorkers;
         }
     }
-
-    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
-        PreparedStatement findActiveWorkers = connection.prepareStatement(
-                "select worker_id from workers");
-        List<Integer> workers = new ArrayList<>();
-        try (ResultSet rs = findActiveWorkers.executeQuery()) {
-            while (rs.next()) {
-                workers.add(rs.getInt(1));
-            }
-        }
-        return workers;
-    }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index 24acdf8..e0c214a 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -1,20 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.tika.pipes.emitter.EmitKey;
+
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncTask extends FetchEmitTuple {
 
-    public static final AsyncTask SHUTDOWN_SEMAPHORE
-            = new AsyncTask(-1, (short)-1, new FetchEmitTuple(null, null, null));
-
-    private long taskId;
+    public static final AsyncTask SHUTDOWN_SEMAPHORE =
+            new AsyncTask(-1, (short) -1, new FetchEmitTuple(null, null, null));
     private final short retry;
+    private long taskId;
 
     public AsyncTask(@JsonProperty("taskId") long taskId, @JsonProperty("retry") short retry,
                      @JsonProperty("fetchEmitTuple") FetchEmitTuple fetchEmitTuple) {
-        super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(), fetchEmitTuple.getMetadata());
+        super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(),
+                fetchEmitTuple.getMetadata());
         this.taskId = taskId;
         this.retry = retry;
     }
@@ -23,18 +39,16 @@ public class AsyncTask extends FetchEmitTuple {
         return taskId;
     }
 
+    public void setTaskId(long taskId) {
+        this.taskId = taskId;
+    }
+
     public short getRetry() {
         return retry;
     }
 
-    public void setTaskId(long taskId) {
-        this.taskId = taskId;
-    }
     @Override
     public String toString() {
-        return "AsyncTask{" +
-                "taskId=" + taskId +
-                ", retry=" + retry +
-                '}';
+        return "AsyncTask{" + "taskId=" + taskId + ", retry=" + retry + '}';
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index 0b685a2..c26ee0f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ProcessUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
 
 import java.io.IOException;
 import java.io.StringReader;
@@ -18,8 +31,11 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 /**
  * This controls monitoring of the AsyncWorkerProcess
@@ -39,26 +55,26 @@ public class AsyncWorker implements Callable<Integer> {
     private final PreparedStatement selectActiveTasks;
     private final PreparedStatement insertErrorLog;
     private final PreparedStatement resetStatus;
+    //TODO: make this configurable
+    private final int maxRetries = 2;
 
-    public AsyncWorker(Connection connection,
-                       String connectionString, int workerId,
+    public AsyncWorker(Connection connection, String connectionString, int workerId,
                        Path tikaConfigPath) throws SQLException {
         this.connectionString = connectionString;
         this.workerId = workerId;
         this.tikaConfigPath = tikaConfigPath;
         this.connection = connection;
-        String sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+        String sql = "update workers set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
                 " where worker_id = (" + workerId + ")";
         finished = connection.prepareStatement(sql);
 
-        sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
+        sql = "update workers set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
                 " where worker_id = (" + workerId + ")";
         restarting = connection.prepareStatement(sql);
         //this checks if the process was able to reset the status
-        sql = "select id, retry, json from task_queue where worker_id="
-                + workerId +
+        sql = "select id, retry, json from task_queue where worker_id=" + workerId +
                 " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
         selectActiveTasks = connection.prepareStatement(sql);
 
@@ -69,6 +85,45 @@ public class AsyncWorker implements Callable<Integer> {
         resetStatus = prepareReset(connection);
     }
 
+    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
+                               PreparedStatement insertErrorLog, PreparedStatement resetStatus,
+                               Logger logger) {
+        try {
+            insertErrorLog.clearParameters();
+            insertErrorLog.setLong(1, task.getTaskId());
+            insertErrorLog.setString(2, task.getFetchKey().getFetchKey());
+            insertErrorLog.setInt(3, task.getRetry());
+            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
+            insertErrorLog.execute();
+        } catch (SQLException e) {
+            logger.error("Can't update error log", e);
+        }
+
+        try {
+            resetStatus.clearParameters();
+            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
+            resetStatus.setShort(2, (short) (task.getRetry() + 1));
+            resetStatus.setLong(3, task.getTaskId());
+            resetStatus.execute();
+        } catch (SQLException e) {
+            logger.error("Can't reset try status", e);
+        }
+    }
+
+    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
+        //if not, this is called to insert into the error log
+        return connection.prepareStatement(
+                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
+                        " values (?,?,CURRENT_TIMESTAMP(),?,?)");
+    }
+
+    static PreparedStatement prepareReset(Connection connection) throws SQLException {
+        //and this is called to reset the status on error
+        return connection.prepareStatement(
+                "update task_queue set " + "status=?, " + "time_stamp=CURRENT_TIMESTAMP(), " +
+                        "retry=? " + "where id=?");
+    }
+
     @Override
     public Integer call() throws Exception {
         Process p = null;
@@ -80,7 +135,7 @@ public class AsyncWorker implements Callable<Integer> {
                 if (finished) {
                     int exitValue = p.exitValue();
                     if (exitValue == 0) {
-                        LOG.info("forked worker process finished with exitValue=0");
+                        LOG.debug("forked worker process finished with exitValue=0");
                         return 1;
                     }
                     reportCrash(++restarts, exitValue);
@@ -95,24 +150,24 @@ public class AsyncWorker implements Callable<Integer> {
         }
     }
 
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
     private Process start() throws IOException {
-        String[] args = new String[]{
-                "java", "-Djava.awt.headless=true",
-                "-cp", System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncWorkerProcess",
-                Integer.toString(workerId)
-        };
+        String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
+                System.getProperty("java.class.path"),
+                "org.apache.tika.pipes.async.AsyncWorkerProcess", Integer.toString(workerId)};
         ProcessBuilder pb = new ProcessBuilder(args);
         pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
-        pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
-                tikaConfigPath.toAbsolutePath().toString());
+        pb.environment()
+                .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
         pb.inheritIO();
         return pb.start();
     }
 
     private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
-        LOG.warn("worker id={} terminated, exitValue={}",
-                workerId, exitValue);
+        LOG.warn("worker id={} terminated, exitValue={}", workerId, exitValue);
         restarting.execute();
         List<AsyncTask> activeTasks = new ArrayList<>();
         try (ResultSet rs = selectActiveTasks.executeQuery()) {
@@ -123,60 +178,18 @@ public class AsyncWorker implements Callable<Integer> {
             activeTasks.add(new AsyncTask(taskId, retry, tuple));
         }
         if (activeTasks.size() == 0) {
-            LOG.info("worker reset active tasks, nothing extra to report");
+            LOG.debug("worker reset active tasks, nothing extra to report");
             return;
         }
+
         if (activeTasks.size() > 1) {
             LOG.warn("more than one active task? this should never happen!");
         }
 
         for (AsyncTask t : activeTasks) {
-            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
-                    insertErrorLog, resetStatus, LOG);
-        }
-
-    }
-
-    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
-                             PreparedStatement insertErrorLog, PreparedStatement resetStatus,
-                             Logger logger) {
-        try {
-            insertErrorLog.clearParameters();
-            insertErrorLog.setLong(1, task.getTaskId());
-            insertErrorLog.setString(2, task.getFetchKey().getKey());
-            insertErrorLog.setInt(3, task.getRetry());
-            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
-            insertErrorLog.execute();
-        } catch (SQLException e) {
-            logger.error("Can't update error log", e);
-        }
-
-        try {
-            resetStatus.clearParameters();
-            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            resetStatus.setShort(2, (short)(task.getRetry()+1));
-            resetStatus.setLong(3, task.getTaskId());
-            resetStatus.execute();
-        } catch (SQLException e) {
-            logger.error("Can't reset try status", e);
+            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE, insertErrorLog,
+                    resetStatus, LOG);
         }
-    }
-
-    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
-        //if not, this is called to insert into the error log
-        return connection.prepareStatement(
-                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
-                " values (?,?,CURRENT_TIMESTAMP(),?,?)"
-        );
-    }
 
-    static PreparedStatement prepareReset(Connection connection) throws SQLException {
-        //and this is called to reset the status on error
-        return connection.prepareStatement(
-                "update task_queue set " +
-                        "status=?, " +
-                        "time_stamp=CURRENT_TIMESTAMP(), " +
-                        "retry=? " +
-                        "where id=?");
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index 8675f22..988748f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -1,27 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.EncryptedDocumentException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.sax.RecursiveParserWrapperHandler;
-import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
+import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
+import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -44,52 +42,37 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
-import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
-
-public class AsyncWorkerProcess {
-
-    enum TASK_STATUS_CODES {
-        AVAILABLE,
-        SELECTED,
-        IN_PROCESS,
-        AVAILABLE_EMIT,
-        SELECTED_EMIT,
-        IN_PROCESS_EMIT,
-        FAILED_EMIT,
-        EMITTED
-    }
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.jpountz.lz4.LZ4Factory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
 
-    public enum WORKER_STATUS_CODES {
-        ACTIVE,
-        RESTARTING,
-        HIBERNATING,
-        SHOULD_SHUTDOWN,
-        SHUTDOWN
-    }
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.StringUtils;
 
-    enum ERROR_CODES {
-        TIMEOUT,
-        SECURITY_EXCEPTION,
-        OTHER_EXCEPTION,
-        OOM,
-        OTHER_ERROR,
-        UNKNOWN_PARSE,
-        EMIT_SERIALIZATION,
-        EMIT_SQL_INSERT_EXCEPTION,
-        EMIT_SQL_SELECT_EXCEPTION,
-        EMIT_DESERIALIZATION,
-        EMIT_EXCEPTION
-    }
+public class AsyncWorkerProcess {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerProcess.class);
-
     //make these all configurable
     private static final long SHUTDOWN_AFTER_MS = 120000;
-    private static long PULSE_MS = 1000;
-    private long parseTimeoutMs = 60000;
+    private static final long PULSE_MS = 1000;
+    private final long parseTimeoutMs = 60000;
 
     public static void main(String[] args) throws Exception {
         Path tikaConfigPath = Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
@@ -104,15 +87,15 @@ public class AsyncWorkerProcess {
         System.exit(0);
     }
 
-    private void execute(Connection connection,
-                         int workerId, TikaConfig tikaConfig) throws SQLException {
-
+    private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
+            throws SQLException {
+        //3 = worker + forkwatcher + active task
         ExecutorService service = Executors.newFixedThreadPool(3);
         ExecutorCompletionService<Integer> executorCompletionService =
                 new ExecutorCompletionService<>(service);
 
-        executorCompletionService.submit(new Worker(connection, workerId,
-                tikaConfig, parseTimeoutMs));
+        executorCompletionService
+                .submit(new Worker(connection, workerId, tikaConfig, parseTimeoutMs));
         executorCompletionService.submit(new ForkWatcher(System.in));
 
         int completed = 0;
@@ -135,6 +118,23 @@ public class AsyncWorkerProcess {
         return;
     }
 
+    enum TASK_STATUS_CODES {
+        AVAILABLE, SELECTED, IN_PROCESS, AVAILABLE_EMIT, SELECTED_EMIT, IN_PROCESS_EMIT,
+        FAILED_EMIT, EMITTED
+    }
+
+    public enum WORKER_STATUS_CODES {
+        ACTIVE, RESTARTING, HIBERNATING, CAN_SHUTDOWN,//if there's nothing else to process, shutdown
+        SHOULD_SHUTDOWN, //shutdown now whether or not there's anything else to process
+        HAS_SHUTDOWN
+    }
+
+    enum ERROR_CODES {
+        TIMEOUT, SECURITY_EXCEPTION, OTHER_EXCEPTION, OOM, OTHER_ERROR, UNKNOWN_PARSE, MAX_RETRIES,
+        EMIT_SERIALIZATION, EMIT_SQL_INSERT_EXCEPTION, EMIT_SQL_SELECT_EXCEPTION,
+        EMIT_DESERIALIZATION, EMIT_EXCEPTION
+    }
+
     private static class TaskQueue {
         private final Connection connection;
         private final int workerId;
@@ -149,28 +149,22 @@ public class AsyncWorkerProcess {
             this.connection = connection;
             this.workerId = workerId;
             //TODO -- need to update timestamp
-            String sql = "update task_queue set status=" +
-                    TASK_STATUS_CODES.SELECTED.ordinal()+
-                    ", time_stamp=CURRENT_TIMESTAMP()"+
-                    " where id = " +
-                    " (select id from task_queue where worker_id = " + workerId +
-                    " and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
+            String sql = "update task_queue set status=" + TASK_STATUS_CODES.SELECTED.ordinal() +
+                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id = " +
+                    " (select id from task_queue where worker_id = " + workerId + " and status=" +
+                    TASK_STATUS_CODES.AVAILABLE.ordinal() +
                     " order by time_stamp asc limit 1 for update)";
             markForSelecting = connection.prepareStatement(sql);
             sql = "select id, retry, json from task_queue where status=" +
-                    TASK_STATUS_CODES.SELECTED.ordinal() +
-                    " and " +
-                    " worker_id=" + workerId +
+                    TASK_STATUS_CODES.SELECTED.ordinal() + " and " + " worker_id=" + workerId +
                     " order by time_stamp asc limit 1";
             selectForProcessing = connection.prepareStatement(sql);
-            sql = "update task_queue set status="+
-                    TASK_STATUS_CODES.IN_PROCESS.ordinal()+
-                    ", time_stamp=CURRENT_TIMESTAMP()"+
-                    " where id=?";
+            sql = "update task_queue set status=" + TASK_STATUS_CODES.IN_PROCESS.ordinal() +
+                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
             markForProcessing = connection.prepareStatement(sql);
 
-            sql = "select count(1) from workers where worker_id=" + workerId +
-            " and status="+WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+            sql = "select count(1) from workers where worker_id=" + workerId + " and status=" +
+                    WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
             checkForShutdown = connection.prepareStatement(sql);
         }
 
@@ -183,7 +177,7 @@ public class AsyncWorkerProcess {
                 }
                 int i = markForSelecting.executeUpdate();
                 if (i == 0) {
-                    debugQueue();
+//                   debugQueue();
                     Thread.sleep(PULSE_MS);
                 } else {
                     long taskId = -1;
@@ -213,13 +207,12 @@ public class AsyncWorkerProcess {
         }
 
         private void debugQueue() throws SQLException {
-            try (ResultSet rs = connection.createStatement().executeQuery(
-                    "select * from task_queue limit 10")) {
+            try (ResultSet rs = connection.createStatement()
+                    .executeQuery("select id, status, worker_id from task_queue limit 10")) {
                 while (rs.next()) {
-                    for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
-                        System.out.print(rs.getString(i)+ " ");
-                    }
-                    System.out.println("");
+                    System.out.println(
+                            "id: " + rs.getInt(1) + " status: " + rs.getInt(2) + " worker_id: " +
+                                    rs.getInt(3));
                 }
             }
         }
@@ -243,18 +236,17 @@ public class AsyncWorkerProcess {
         private final RecursiveParserWrapper parser;
         private final TikaConfig tikaConfig;
         private final long parseTimeoutMs;
-        private ExecutorService executorService;
-        private ExecutorCompletionService<AsyncData> executorCompletionService;
         private final PreparedStatement insertErrorLog;
         private final PreparedStatement resetStatus;
         private final PreparedStatement insertEmitData;
         private final PreparedStatement updateStatusForEmit;
         private final ObjectMapper objectMapper = new ObjectMapper();
         LZ4Factory factory = LZ4Factory.fastestInstance();
+        private final ExecutorService executorService;
+        private final ExecutorCompletionService<AsyncData> executorCompletionService;
 
-        public Worker(Connection connection,
-                      int workerId,
-                      TikaConfig tikaConfig, long parseTimeoutMs) throws SQLException {
+        public Worker(Connection connection, int workerId, TikaConfig tikaConfig,
+                      long parseTimeoutMs) throws SQLException {
             this.connection = connection;
             this.workerId = workerId;
             this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
@@ -264,22 +256,24 @@ public class AsyncWorkerProcess {
             this.parseTimeoutMs = parseTimeoutMs;
 
             SimpleModule module = new SimpleModule();
-            module.addSerializer(Metadata.class, new JsonMetadata());
+            module.addSerializer(Metadata.class, new JsonMetadataSerializer());
             objectMapper.registerModule(module);
-            String sql = "insert into workers (worker_id, status) " +
-                    "values (" + workerId + ", "+
-                    WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
+            String sql = "merge into workers key (worker_id) " + "values (" + workerId + ", " +
+                    WORKER_STATUS_CODES.ACTIVE.ordinal() + ")";
             connection.createStatement().execute(sql);
             insertErrorLog = prepareInsertErrorLog(connection);
             resetStatus = prepareReset(connection);
             insertEmitData = prepareInsertEmitData(connection);
-            sql = "update task_queue set status="+
-                    TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
-                    ", time_stamp=CURRENT_TIMESTAMP()" +
-                    " where id=?";
+            sql = "update task_queue set status=" + TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
+                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
             updateStatusForEmit = connection.prepareStatement(sql);
         }
 
+        static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
+            return connection.prepareStatement(
+                    "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
+                            " values (?,CURRENT_TIMESTAMP(),?,?)");
+        }
 
         public Integer call() throws Exception {
             AsyncTask task = null;
@@ -306,26 +300,22 @@ public class AsyncWorkerProcess {
                     }
                 }
             } catch (TimeoutException e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.TIMEOUT,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.TIMEOUT, insertErrorLog, resetStatus, LOG);
             } catch (SecurityException e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION, insertErrorLog, resetStatus,
+                        LOG);
             } catch (Exception e) {
                 e.printStackTrace();
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION, insertErrorLog, resetStatus, LOG);
             } catch (OutOfMemoryError e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.OOM,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.OOM, insertErrorLog, resetStatus, LOG);
             } catch (Error e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.OTHER_ERROR,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.OTHER_ERROR, insertErrorLog, resetStatus, LOG);
             } finally {
                 executorService.shutdownNow();
                 return 1;
@@ -338,15 +328,16 @@ public class AsyncWorkerProcess {
                 LOG.debug("received shutdown notification");
                 return;
             } else {
-                executorCompletionService.submit(new TaskProcessor(task, tikaConfig, parser,
-                        workerId));
-                Future<AsyncData> future = executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
+                executorCompletionService
+                        .submit(new TaskProcessor(task, tikaConfig, parser, workerId));
+                Future<AsyncData> future =
+                        executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
                 if (future == null) {
-                    handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+                    handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
                 } else {
                     AsyncData asyncData = future.get(1000, TimeUnit.MILLISECONDS);
                     if (asyncData == null) {
-                        handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+                        handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
                     }
                     boolean shouldEmit = checkForParseException(asyncData);
                     if (shouldEmit) {
@@ -354,13 +345,11 @@ public class AsyncWorkerProcess {
                             emit(asyncData);
                         } catch (JsonProcessingException e) {
                             e.printStackTrace();
-                            recordBadEmit(task.getTaskId(),
-                                    task.getFetchKey().getKey(),
+                            recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
                                     ERROR_CODES.EMIT_SERIALIZATION.ordinal());
                         } catch (SQLException e) {
                             e.printStackTrace();
-                            recordBadEmit(task.getTaskId(),
-                                    task.getFetchKey().getKey(),
+                            recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
                                     ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
                         }
                     }
@@ -372,18 +361,16 @@ public class AsyncWorkerProcess {
             //stub
         }
 
-        private void emit(AsyncData asyncData) throws SQLException,
-                JsonProcessingException {
+        private void emit(AsyncData asyncData) throws SQLException, JsonProcessingException {
             insertEmitData.clearParameters();
-            insertEmitData.setLong(1, asyncData.getAsyncTask().getTaskId());
+            insertEmitData.setLong(1, asyncData.getTaskId());
             byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
             byte[] compressed = factory.fastCompressor().compress(bytes);
             insertEmitData.setLong(2, bytes.length);
             insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
             insertEmitData.execute();
             updateStatusForEmit.clearParameters();
-            updateStatusForEmit.setLong(1,
-                    asyncData.getAsyncTask().getTaskId());
+            updateStatusForEmit.setLong(1, asyncData.getTaskId());
             updateStatusForEmit.execute();
         }
 
@@ -392,12 +379,10 @@ public class AsyncWorkerProcess {
             throw new TimeoutException(key);
         }
 
-
         private boolean checkForParseException(AsyncData asyncData) {
             if (asyncData == null || asyncData.getMetadataList() == null ||
                     asyncData.getMetadataList().size() == 0) {
-                LOG.warn("empty or null emit data ({})", asyncData.getAsyncTask()
-                        .getFetchKey().getKey());
+                LOG.warn("empty or null emit data ({})", asyncData.getFetchKey().getFetchKey());
                 return false;
             }
             boolean shouldEmit = true;
@@ -405,9 +390,8 @@ public class AsyncWorkerProcess {
             String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
             if (stack != null) {
                 LOG.warn("fetchKey ({}) container parse exception ({})",
-                        asyncData.getAsyncTask().getFetchKey().getKey(), stack);
-                if (asyncData.getAsyncTask().getOnParseException()
-                        == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+                        asyncData.getFetchKey().getFetchKey(), stack);
+                if (asyncData.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
                     shouldEmit = false;
                 }
             }
@@ -417,18 +401,11 @@ public class AsyncWorkerProcess {
                 String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
                 if (embeddedStack != null) {
                     LOG.warn("fetchKey ({}) embedded parse exception ({})",
-                            asyncData.getAsyncTask().getFetchKey().getKey(), embeddedStack);
+                            asyncData.getFetchKey().getFetchKey(), embeddedStack);
                 }
             }
             return shouldEmit;
         }
-
-        static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
-            return connection.prepareStatement(
-                    "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
-                            " values (?,CURRENT_TIMESTAMP(),?,?)"
-            );
-        }
     }
 
     private static class TaskProcessor implements Callable<AsyncData> {
@@ -438,9 +415,7 @@ public class AsyncWorkerProcess {
         private final TikaConfig tikaConfig;
         private final int workerId;
 
-        public TaskProcessor(AsyncTask task,
-                             TikaConfig tikaConfig,
-                             Parser parser, int workerId) {
+        public TaskProcessor(AsyncTask task, TikaConfig tikaConfig, Parser parser, int workerId) {
             this.task = task;
             this.parser = parser;
             this.tikaConfig = tikaConfig;
@@ -451,14 +426,11 @@ public class AsyncWorkerProcess {
             Metadata userMetadata = task.getMetadata();
             Metadata metadata = new Metadata();
             String fetcherName = task.getFetchKey().getFetcherName();
-            String fetchKey = task.getFetchKey().getKey();
+            String fetchKey = task.getFetchKey().getFetchKey();
             List<Metadata> metadataList = null;
-            try (InputStream stream = tikaConfig.getFetcherManager()
-                    .getFetcher(fetcherName)
+            try (InputStream stream = tikaConfig.getFetcherManager().getFetcher(fetcherName)
                     .fetch(fetchKey, metadata)) {
-                metadataList = parseMetadata(task.getFetchKey(),
-                        stream,
-                        metadata);
+                metadataList = parseMetadata(task.getFetchKey(), stream, metadata);
             } catch (SecurityException e) {
                 throw e;
             }
@@ -468,11 +440,12 @@ public class AsyncWorkerProcess {
                 emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
                 task.setEmitKey(emitKey);
             }
-            return new AsyncData(task, metadataList);
+            return new AsyncData(task.getTaskId(), task.getFetchKey(), task.getEmitKey(),
+                    task.getOnParseException(), metadataList);
         }
 
-        private List<Metadata> parseMetadata(FetchKey fetchKey,
-                                             InputStream stream, Metadata metadata) {
+        private List<Metadata> parseMetadata(FetchKey fetchKey, InputStream stream,
+                                             Metadata metadata) {
             //make these configurable
             BasicContentHandlerFactory.HANDLER_TYPE type =
                     BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
@@ -486,16 +459,16 @@ public class AsyncWorkerProcess {
             try {
                 parser.parse(stream, handler, metadata, parseContext);
             } catch (SAXException e) {
-                LOG.warn("problem:" + fetchKey.getKey(), e);
+                LOG.warn("problem:" + fetchKey.getFetchKey(), e);
             } catch (EncryptedDocumentException e) {
-                LOG.warn("encrypted:" + fetchKey.getKey(), e);
+                LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
             } catch (SecurityException e) {
-                LOG.warn("security exception: " + fetchKey.getKey());
+                LOG.warn("security exception: " + fetchKey.getFetchKey());
                 throw e;
             } catch (Exception e) {
-                LOG.warn("exception: " + fetchKey.getKey());
+                LOG.warn("exception: " + fetchKey.getFetchKey());
             } catch (OutOfMemoryError e) {
-                LOG.error("oom: " + fetchKey.getKey());
+                LOG.error("oom: " + fetchKey.getFetchKey());
                 throw e;
             }
             return handler.getMetadataList();
@@ -514,6 +487,7 @@ public class AsyncWorkerProcess {
 
     private static class ForkWatcher implements Callable<Integer> {
         private final InputStream in;
+
         public ForkWatcher(InputStream in) {
             this.in = in;
         }
diff --git a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
index 585b03b..43553a4 100644
--- a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
+++ b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=debug,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
deleted file mode 100644
index c653267..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.tika.pipes.async;
-
-import org.junit.Test;
-
-public class AsyncCliTest {
-    @Test
-    public void testBasic() throws Exception {
-        String[] args = {
-                "/Users/allison/Desktop/tika-tmp/tika-config.xml"
-        };
-        AsyncCli.main(args);
-    }
-
-    @Test
-    public void testUnhandled() throws InterruptedException {
-        Thread t = new Thread(new Task());
-
-        t.start();
-        t.join();
-        for (StackTraceElement el : t.getStackTrace()) {
-            System.out.println(el);
-        }
-    }
-
-    private static class Task implements Runnable {
-
-        @Override
-        public void run() {
-            Thread.currentThread().setUncaughtExceptionHandler(new MyUncaught());
-            for (int i = 0; i < 5; i++) {
-                System.out.println(i);
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException();
-                }
-            }
-            throw new RuntimeException("kaboom");
-        }
-    }
-
-    private static class MyUncaught implements Thread.UncaughtExceptionHandler {
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-            throw new RuntimeException("bad");
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 920634e..04890b3 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -1,13 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -18,6 +28,18 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncProcessorTest {
 
@@ -30,29 +52,21 @@ public class AsyncProcessorTest {
     public void setUp() throws SQLException, IOException {
         dbDir = Files.createTempDirectory("async-db");
         dbFile = dbDir.resolve("emitted-db");
-        String jdbc = "jdbc:h2:file:"+dbFile.toAbsolutePath().toString()+";AUTO_SERVER=TRUE";
-        String sql = "create table emitted (id int auto_increment primary key, json varchar(20000))";
+        String jdbc = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
+        String sql = "create table emitted (id int auto_increment primary key, " +
+                "emitkey varchar(2000), json varchar(20000))";
 
         connection = DriverManager.getConnection(jdbc);
         connection.createStatement().execute(sql);
         tikaConfigPath = dbDir.resolve("tika-config.xml");
-        String xml = "" +
-                "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
-                "<properties>" +
-                "  <emitters>"+
-                "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
-                "    <params>\n" +
-                "      <param name=\"name\" type=\"string\">mock</param>\n"+
-                "      <param name=\"jdbc\" type=\"string\">"+jdbc+"</param>\n"+
-                "    </params>" +
-                "  </emitter>" +
-                "  </emitters>"+
-                "  <fetchers>" +
+        String xml = "" + "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" +
+                "  <emitters>" + "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
+                "    <params>\n" + "      <param name=\"name\" type=\"string\">mock</param>\n" +
+                "      <param name=\"jdbc\" type=\"string\">" + jdbc + "</param>\n" +
+                "    </params>" + "  </emitter>" + "  </emitters>" + "  <fetchers>" +
                 "    <fetcher class=\"org.apache.tika.pipes.async.MockFetcher\">" +
-                "      <param name=\"name\" type=\"string\">mock</param>\n"+
-                "    </fetcher>" +
-                "  </fetchers>"+
-                "</properties>";
+                "      <param name=\"name\" type=\"string\">mock</param>\n" + "    </fetcher>" +
+                "  </fetchers>" + "</properties>";
         Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
     }
 
@@ -68,21 +82,24 @@ public class AsyncProcessorTest {
 
 
         AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
-        for (int i = 0 ; i < 100; i++) {
-            FetchEmitTuple t = new FetchEmitTuple(
-                    new FetchKey("mock", "key-"+i),
-                    new EmitKey("mock", "emit-"+i),
-                    new Metadata()
-            );
+        int max = 100;
+        for (int i = 0; i < max; i++) {
+            FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", "key-" + i),
+                    new EmitKey("mock", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
         }
         processor.close();
-        String sql = "select * from emitted";
-        try (Statement st = connection.createStatement();
-             ResultSet rs = st.executeQuery(sql)) {
+        String sql = "select emitkey from emitted";
+        Set<String> emitKeys = new HashSet<>();
+        try (Statement st = connection.createStatement(); ResultSet rs = st.executeQuery(sql)) {
             while (rs.next()) {
-                System.out.println(rs.getInt(1) + " : "+rs.getString(2));
+                String emitKey = rs.getString(1);
+                emitKeys.add(emitKey);
             }
         }
+        assertEquals(max, emitKeys.size());
+        for (int i = 0; i < max; i++) {
+            assertTrue(emitKeys.contains("emit-" + i));
+        }
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
index 9f07dec..5576f73 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -1,32 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 public class MockEmitter implements Initializable, Emitter {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private Connection connection;
     private String jdbc;
     private PreparedStatement insert;
 
+    public MockEmitter() {
+        SimpleModule module = new SimpleModule();
+        module.addSerializer(Metadata.class, new JsonMetadataSerializer());
+        objectMapper.registerModule(module);
+    }
+
     @Field
     public void setJdbc(String jdbc) {
         this.jdbc = jdbc;
@@ -38,10 +63,10 @@ public class MockEmitter implements Initializable, Emitter {
     }
 
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
-        emit(Collections.singletonList(
-                new EmitData(
-                new EmitKey(getName(), emitKey), metadataList)));
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
+        emit(Collections
+                .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
     }
 
     @Override
@@ -50,7 +75,8 @@ public class MockEmitter implements Initializable, Emitter {
             String json = objectMapper.writeValueAsString(d);
             try {
                 insert.clearParameters();
-                insert.setString(1, json);
+                insert.setString(1, d.getEmitKey().getEmitKey());
+                insert.setString(2, json);
                 insert.execute();
             } catch (SQLException e) {
                 throw new TikaEmitterException("problem inserting", e);
@@ -62,7 +88,7 @@ public class MockEmitter implements Initializable, Emitter {
     public void initialize(Map<String, Param> params) throws TikaConfigException {
         try {
             connection = DriverManager.getConnection(jdbc);
-            String sql = "insert into emitted (json) values (?)";
+            String sql = "insert into emitted (emitkey, json) values (?, ?)";
             insert = connection.prepareStatement(sql);
         } catch (SQLException e) {
             throw new TikaConfigException("problem w connection", e);
@@ -70,7 +96,8 @@ public class MockEmitter implements Initializable, Emitter {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
 
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index 8c99ecf..7a74f2a 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -1,21 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.Fetcher;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
 public class MockFetcher implements Fetcher {
 
-    private static byte[] BYTES = new String("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>"+
-            "<mock>"+
-            "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>"+
-            "<write element=\"p\">main_content</write>"+
-        "</mock>").getBytes(StandardCharsets.UTF_8);
+    private static final byte[] BYTES = ("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+            "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>" +
+            "<write element=\"p\">main_content</write>" + "</mock>")
+            .getBytes(StandardCharsets.UTF_8);
 
     @Override
     public String getName() {
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
new file mode 100644
index 0000000..ebbd8f0
--- /dev/null
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.junit.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
+
+public class SerializationTest {
+
+    @Test
+    public void testBasic() throws Exception {
+        String json = "{\"taskId\":49,\"fetchKey\":{\"fetcherName\":\"mock\"," +
+                "\"fetchKey\":\"key-48\"},\"emitKey\":{\"emitterName\":\"mock\"," +
+                "\"emitKey\":\"emit-48\"},\"onParseException\":\"EMIT\",\"metadataList\":" +
+                "[{\"X-TIKA:Parsed-By\":" +
+                "\"org.apache.tika.parser.EmptyParser\",\"X-TIKA:parse_time_millis\":" +
+                "\"0\",\"X-TIKA:embedded_depth\":\"0\"}]}";
+
+        ObjectMapper mapper = new ObjectMapper();
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
+        mapper.registerModule(module);
+        AsyncData asyncData = mapper.readValue(json, AsyncData.class);
+        assertEquals(49, asyncData.getTaskId());
+        assertEquals("mock", asyncData.getFetchKey().getFetcherName());
+        assertEquals(1, asyncData.getMetadataList().size());
+    }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
deleted file mode 100644
index 394fb16..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.tika.pipes.async;
-
-
-import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestPipesDriver {
-
-    static Path TMP_DIR;
-    static Path DB;
-
-    static AtomicInteger PROCESSED = new AtomicInteger(0);
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        TMP_DIR = Files.createTempDirectory("pipes-driver-");
-        DB = Files.createTempFile(TMP_DIR, "", "");
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception {
-        FileUtils.deleteDirectory(TMP_DIR.toFile());
-    }
-
-
-    @Test
-    public void testQueue() throws Exception {
-        int numThreads = 20;
-        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10000 + numThreads);
-        for (int i = 0; i < 10000; i++) {
-            queue.add(1);
-        }
-        for (int i = 0; i < numThreads; i++) {
-            queue.offer(-1);
-        }
-        ExecutorService service = Executors.newFixedThreadPool(numThreads);
-        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(service);
-
-        long start = System.currentTimeMillis();
-        executorCompletionService.submit(new Watcher(queue));
-        for (int i = 0; i < numThreads; i++) {
-            executorCompletionService.submit(new QueueWorker(queue));
-        }
-        int finished = 0;
-        while (finished++ < numThreads) {
-            executorCompletionService.take();
-        }
-        long elapsed = System.currentTimeMillis() - start;
-        System.out.println("elapsed: " + elapsed);
-        service.shutdownNow();
-    }
-
-    private static class Watcher implements Callable<Integer> {
-        private final ArrayBlockingQueue<Integer> queue;
-
-        Watcher(ArrayBlockingQueue<Integer> queue) {
-            this.queue = queue;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            long start = System.currentTimeMillis();
-            while (true) {
-                long elapsed = System.currentTimeMillis() - start;
-                Thread.sleep(1000);
-            }
-        }
-    }
-
-    private static class QueueWorker implements Callable<Integer> {
-        static AtomicInteger counter = new AtomicInteger(0);
-
-
-        private final int id;
-        private final ArrayBlockingQueue<Integer> queue;
-
-        QueueWorker(ArrayBlockingQueue<Integer> queue) {
-            id = counter.incrementAndGet();
-            this.queue = queue;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            while (true) {
-                Integer val = queue.poll(1, TimeUnit.SECONDS);
-                if (val != null) {
-                    if (val < 0) {
-                        return 1;
-                    } else {
-                        long sleep = id * 100;
-                        Thread.sleep(sleep);
-                    }
-                }
-            }
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 332813f..7fbba83 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -17,23 +17,6 @@
 
 package org.apache.tika.pipes;
 
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.iterable.S3Objects;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.s3.S3Emitter;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Ignore;
-import org.junit.Test;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -49,6 +32,24 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.iterable.S3Objects;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.s3.S3Emitter;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
 @Ignore("turn these into actual tests")
 public class PipeIntegrationTests {
 
@@ -59,10 +60,8 @@ public class PipeIntegrationTests {
         String region = "";
         String profile = "";
         String bucket = "";
-        AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
-                .withRegion(region)
-                .withCredentials(new ProfileCredentialsProvider(profile))
-                .build();
+        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region)
+                .withCredentials(new ProfileCredentialsProvider(profile)).build();
         s3Client.listObjects(bucket);
         int cnt = 0;
         long sz = 0;
@@ -72,17 +71,18 @@ public class PipeIntegrationTests {
             if (Files.isRegularFile(targ)) {
                 continue;
             }
-            if (! Files.isDirectory(targ.getParent())) {
+            if (!Files.isDirectory(targ.getParent())) {
                 Files.createDirectories(targ.getParent());
             }
-            System.out.println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
+            System.out
+                    .println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
             S3Object s3Object = s3Client.getObject(bucket, summary.getKey());
             Files.copy(s3Object.getObjectContent(), targ);
             summary.getSize();
             cnt++;
             sz += summary.getSize();
         }
-        System.out.println("iterated: "+cnt + " sz: "+sz);
+        System.out.println("iterated: " + cnt + " sz: " + sz);
     }
 
     @Test
@@ -94,8 +94,8 @@ public class PipeIntegrationTests {
         ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
         for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new FSFetcherEmitter(
-                    queue, tikaConfig.getFetcherManager().getFetcher("s3"), null));
+            completionService.submit(new FSFetcherEmitter(queue,
+                    tikaConfig.getFetcherManager().getFetcher("s3"), null));
         }
         for (FetchEmitTuple t : it) {
             queue.offer(t);
@@ -105,7 +105,7 @@ public class PipeIntegrationTests {
         }
         int finished = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers + 1) {
                 Future<Integer> future = completionService.take();
                 future.get();
             }
@@ -122,9 +122,9 @@ public class PipeIntegrationTests {
         ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
         for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new S3FetcherEmitter(
-                    queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
-                    (S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
+            completionService.submit(new S3FetcherEmitter(queue,
+                    tikaConfig.getFetcherManager().getFetcher("s3f"),
+                    (S3Emitter) tikaConfig.getEmitterManager().getEmitter("s3e")));
         }
         FetchIterator it = tikaConfig.getFetchIterator();
         for (FetchEmitTuple t : it) {
@@ -135,7 +135,7 @@ public class PipeIntegrationTests {
         }
         int finished = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers + 1) {
                 Future<Integer> future = completionService.take();
                 future.get();
             }
@@ -145,8 +145,7 @@ public class PipeIntegrationTests {
     }
 
     private TikaConfig getConfig(String fileName) throws Exception {
-        try (InputStream is =
-                     PipeIntegrationTests.class.getResourceAsStream("/"+fileName)) {
+        try (InputStream is = PipeIntegrationTests.class.getResourceAsStream("/" + fileName)) {
             return new TikaConfig(is);
         }
     }
@@ -159,8 +158,8 @@ public class PipeIntegrationTests {
         private final Emitter emitter;
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
 
-        FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
-                fetcher, Emitter emitter) {
+        FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
+                         Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
             this.emitter = emitter;
@@ -182,12 +181,12 @@ public class PipeIntegrationTests {
         }
 
         private void process(FetchEmitTuple t) throws IOException, TikaException {
-            Path targ = OUTDIR.resolve(t.getFetchKey().getKey());
+            Path targ = OUTDIR.resolve(t.getFetchKey().getFetchKey());
             if (Files.isRegularFile(targ)) {
                 return;
             }
-            try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
-                System.out.println(counter.getAndIncrement() + " : "+t );
+            try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
+                System.out.println(counter.getAndIncrement() + " : " + t);
                 Files.createDirectories(targ.getParent());
                 Files.copy(is, targ);
             }
@@ -201,8 +200,8 @@ public class PipeIntegrationTests {
         private final S3Emitter emitter;
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
 
-        S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
-                fetcher, S3Emitter emitter) {
+        S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
+                         S3Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
             this.emitter = emitter;
@@ -227,7 +226,7 @@ public class PipeIntegrationTests {
             Metadata userMetadata = new Metadata();
             userMetadata.set("project", "my-project");
 
-            try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
+            try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
                 emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
             }
         }
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties b/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=info,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3024f6a..78ba9d2 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -133,7 +133,7 @@ public class JsonFetchEmitTuple {
     static void writeTuple(FetchEmitTuple t, JsonGenerator jsonGenerator) throws IOException {
         jsonGenerator.writeStartObject();
         jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
-        jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
+        jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getFetchKey());
         jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
         if (!StringUtils.isBlank(t.getEmitKey().getEmitKey())) {
             jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getEmitKey());
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index 06612a1..b33c2a9 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -26,19 +26,13 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
-public class JsonMetadata extends StdSerializer<Metadata> {
+public class JsonMetadata {
 
     static volatile boolean PRETTY_PRINT = false;
 
-    public JsonMetadata() {
-        super(Metadata.class);
-    }
-
     /**
      * Serializes a Metadata object to Json.  This does not flush or close the writer.
      *
@@ -146,10 +140,4 @@ public class JsonMetadata extends StdSerializer<Metadata> {
         PRETTY_PRINT = prettyPrint;
     }
 
-    @Override
-    public void serialize(Metadata metadata,
-                          JsonGenerator jsonGenerator,
-                          SerializerProvider serializerProvider) throws IOException {
-        writeMetadataObject(metadata, jsonGenerator, false);
-    }
 }
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java
new file mode 100644
index 0000000..5a9b7d0
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import static org.apache.tika.metadata.serialization.JsonMetadata.readMetadataObject;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import org.apache.tika.metadata.Metadata;
+
+public class JsonMetadataDeserializer extends StdDeserializer<Metadata> {
+
+    public JsonMetadataDeserializer() {
+        super(Metadata.class);
+    }
+
+    @Override
+    public Metadata deserialize(JsonParser jsonParser,
+                                DeserializationContext deserializationContext)
+            throws IOException, JsonProcessingException {
+        return readMetadataObject(jsonParser);
+    }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java
new file mode 100644
index 0000000..afcf012
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import static org.apache.tika.metadata.serialization.JsonMetadata.writeMetadataObject;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import org.apache.tika.metadata.Metadata;
+
+public class JsonMetadataSerializer extends StdSerializer<Metadata> {
+
+    public JsonMetadataSerializer() {
+        super(Metadata.class);
+    }
+
+    @Override
+    public void serialize(Metadata metadata,
+                          JsonGenerator jsonGenerator,
+                          SerializerProvider serializerProvider) throws IOException {
+        writeMetadataObject(metadata, jsonGenerator, false);
+    }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index efd9a5c..d68cc11 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -64,7 +64,7 @@ public class AsyncParser implements Callable<Integer> {
                     if (!offered) {
                         //TODO: deal with this
                         LOG.warn("Failed to add ({}) " + "to emit queue after 10 minutes.",
-                                request.getFetchKey().getKey());
+                                request.getFetchKey().getFetchKey());
                     }
                 }
             } else {
@@ -76,14 +76,14 @@ public class AsyncParser implements Callable<Integer> {
     private boolean checkForParseException(FetchEmitTuple request, EmitData emitData) {
         if (emitData == null || emitData.getMetadataList() == null ||
                 emitData.getMetadataList().size() == 0) {
-            LOG.warn("empty or null emit data ({})", request.getFetchKey().getKey());
+            LOG.warn("empty or null emit data ({})", request.getFetchKey().getFetchKey());
             return false;
         }
         boolean shouldEmit = true;
         Metadata container = emitData.getMetadataList().get(0);
         String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
         if (stack != null) {
-            LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getKey(),
+            LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getFetchKey(),
                     stack);
             if (request.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
                 shouldEmit = false;
@@ -95,7 +95,7 @@ public class AsyncParser implements Callable<Integer> {
             String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
             if (embeddedStack != null) {
                 LOG.warn("fetchKey ({}) embedded parse exception ({})",
-                        request.getFetchKey().getKey(), embeddedStack);
+                        request.getFetchKey().getFetchKey(), embeddedStack);
             }
         }
         return shouldEmit;
@@ -105,7 +105,7 @@ public class AsyncParser implements Callable<Integer> {
         Metadata userMetadata = t.getMetadata();
         Metadata metadata = new Metadata();
         String fetcherName = t.getFetchKey().getFetcherName();
-        String fetchKey = t.getFetchKey().getKey();
+        String fetchKey = t.getFetchKey().getFetchKey();
         List<Metadata> metadataList = null;
         try (InputStream stream = TikaResource.getConfig().getFetcherManager()
                 .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 47dbc50..001926b 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -138,7 +138,7 @@ public class AsyncResource {
     private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
         List<String> fetchKeys = new ArrayList<>();
         for (FetchEmitTuple t : notAdded) {
-            fetchKeys.add(t.getFetchKey().getKey());
+            fetchKeys.add(t.getFetchKey().getFetchKey());
         }
         Map<String, Object> map = new HashMap<>();
         map.put("status", "throttled");
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 306732a..16aa76e 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -71,7 +71,7 @@ public class EmitterResource {
         //TODO: clean this up?
         EmitKey emitKey = t.getEmitKey();
         if (StringUtils.isBlank(emitKey.getEmitKey())) {
-            emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getKey());
+            emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
         }
         return emitKey;
     }
@@ -171,7 +171,7 @@ public class EmitterResource {
         List<Metadata> metadataList = null;
         try (InputStream stream = TikaResource.getConfig().getFetcherManager()
                 .getFetcher(t.getFetchKey().getFetcherName())
-                .fetch(t.getFetchKey().getKey(), metadata)) {
+                .fetch(t.getFetchKey().getFetchKey(), metadata)) {
 
             metadataList = RecursiveMetadataResource
                     .parseMetadata(stream, metadata, httpHeaders.getRequestHeaders(), info, "text");
@@ -213,7 +213,7 @@ public class EmitterResource {
                 shouldEmit = false;
             }
             LOG.warn("fetchKey ({}) caught container parse exception ({})",
-                    t.getFetchKey().getKey(), stack);
+                    t.getFetchKey().getFetchKey(), stack);
         }
 
         for (int i = 1; i < metadataList.size(); i++) {
@@ -221,7 +221,7 @@ public class EmitterResource {
             String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
             if (embeddedStack != null) {
                 LOG.warn("fetchKey ({}) caught embedded parse exception ({})",
-                        t.getFetchKey().getKey(), embeddedStack);
+                        t.getFetchKey().getFetchKey(), embeddedStack);
             }
         }
 

[tika] 02/02: checkstyle

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 9dbbdd3f8c434e0adee597339c2d9f232f7e5bc5
Author: tballison <ta...@apache.org>
AuthorDate: Fri Mar 26 17:48:52 2021 -0400

    checkstyle
---
 .../main/java/org/apache/tika/server/core/resource/AsyncParser.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index d68cc11..bf1019e 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -83,7 +83,8 @@ public class AsyncParser implements Callable<Integer> {
         Metadata container = emitData.getMetadataList().get(0);
         String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
         if (stack != null) {
-            LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getFetchKey(),
+            LOG.warn("fetchKey ({}) container parse exception ({})",
+                    request.getFetchKey().getFetchKey(),
                     stack);
             if (request.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
                 shouldEmit = false;