You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/03/26 21:49:07 UTC

[tika] 01/02: TIKA-3304 -- basically works...lots more remains

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3304
in repository https://gitbox.apache.org/repos/asf/tika.git

commit c3ca99a8d1032c3011ed8d2fc83dc6f0d3d5c0d9
Author: tballison <ta...@apache.org>
AuthorDate: Fri Mar 26 17:35:30 2021 -0400

    TIKA-3304 -- basically works...lots more remains
---
 pom.xml                                            |   2 +-
 .../org/apache/tika/pipes/fetcher/FetchKey.java    |   2 +-
 .../fetchiterator/FileSystemFetchIteratorTest.java |   2 +-
 tika-pipes/pom.xml                                 |  37 ++-
 tika-pipes/tika-emitters/tika-emitter-fs/pom.xml   |  13 +-
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   |  44 ++-
 tika-pipes/tika-emitters/tika-emitter-s3/pom.xml   |  13 +-
 .../apache/tika/pipes/emitter/s3/S3Emitter.java    | 103 +++---
 tika-pipes/tika-emitters/tika-emitter-solr/pom.xml |  13 +-
 .../tika/pipes/emitter/solr/SolrEmitter.java       | 129 ++++----
 .../apache/tika/pipes/emitter/solr/TestBasic.java  |  34 +-
 .../src/test/resources/log4j.properties            |   6 +-
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |  82 ++---
 .../src/test/java/TestCSVFetchIterator.java        |  23 +-
 .../tika-fetch-iterator-jdbc/pom.xml               |  17 +-
 .../fetchiterator/jdbc/JDBCFetchIterator.java      |  82 ++---
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |  66 ++--
 .../src/test/resources/log4j.properties            |   4 +-
 .../tika-fetch-iterator-s3/pom.xml                 |  13 +-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  39 ++-
 .../fetchiterator/s3/TestS3FetchIterator.java      |  17 +-
 .../src/test/resources/log4j.properties            |   4 +-
 tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml |  13 +-
 .../tika/pipes/fetcher/http/HttpFetcher.java       |  33 +-
 .../tika/pipes/fetcher/http/HttpFetcherTest.java   |  50 +--
 .../apache/tika/pipes/fetcher/s3/S3Fetcher.java    |  70 ++--
 .../tika/pipes/fetcher/s3/TestS3Fetcher.java       |  23 +-
 .../org/apache/tika/client/HttpClientFactory.java  | 147 ++++-----
 .../org/apache/tika/client/HttpClientUtil.java     |  26 +-
 tika-pipes/tika-pipes-async/pom.xml                |   4 +-
 .../java/org/apache/tika/pipes/async/AsyncCli.java | 128 ++++----
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  44 ++-
 .../org/apache/tika/pipes/async/AsyncData.java     |  53 ++-
 .../org/apache/tika/pipes/async/AsyncEmitHook.java |  16 +
 .../org/apache/tika/pipes/async/AsyncEmitter.java  | 130 ++------
 .../tika/pipes/async/AsyncEmitterProcess.java      | 309 +++++++++++------
 .../tika/pipes/async/AsyncPipesEmitHook.java       |  28 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    | 364 ++++++++++++++-------
 .../org/apache/tika/pipes/async/AsyncTask.java     |  40 ++-
 .../org/apache/tika/pipes/async/AsyncWorker.java   | 157 +++++----
 .../tika/pipes/async/AsyncWorkerProcess.java       | 298 ++++++++---------
 .../src/main/resources/log4j.properties            |   4 +-
 .../org/apache/tika/pipes/async/AsyncCliTest.java  |  48 ---
 .../tika/pipes/async/AsyncProcessorTest.java       |  89 +++--
 .../org/apache/tika/pipes/async/MockEmitter.java   |  59 +++-
 .../org/apache/tika/pipes/async/MockFetcher.java   |  33 +-
 .../apache/tika/pipes/async/SerializationTest.java |  49 +++
 .../apache/tika/pipes/async/TestPipesDriver.java   | 109 ------
 .../apache/tika/pipes/PipeIntegrationTests.java    |  81 +++--
 .../src/test/resources/log4j.properties            |   4 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |   2 +-
 .../tika/metadata/serialization/JsonMetadata.java  |  14 +-
 .../serialization/JsonMetadataDeserializer.java    |  43 +++
 .../serialization/JsonMetadataSerializer.java      |  41 +++
 .../tika/server/core/resource/AsyncParser.java     |  10 +-
 .../tika/server/core/resource/AsyncResource.java   |   2 +-
 .../tika/server/core/resource/EmitterResource.java |   8 +-
 57 files changed, 1794 insertions(+), 1480 deletions(-)

diff --git a/pom.xml b/pom.xml
index 17bc12d..1ad3c85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,11 +37,11 @@
   <modules>
     <module>tika-parent</module>
     <module>tika-core</module>
+    <module>tika-serialization</module>
     <module>tika-pipes</module>
     <module>tika-parsers</module>
     <module>tika-bundles</module>
     <module>tika-xmp</module>
-    <module>tika-serialization</module>
     <module>tika-batch</module>
     <module>tika-langdetect</module>
     <module>tika-app</module>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index deb4232..2c0ea64 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -38,7 +38,7 @@ public class FetchKey {
         return fetcherName;
     }
 
-    public String getKey() {
+    public String getFetchKey() {
         return fetchKey;
     }
 
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index 7d99828..4e314bc 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -61,7 +61,7 @@ public class FileSystemFetchIteratorTest {
 
         Set<String> iteratorSet = new HashSet<>();
         for (FetchEmitTuple p : it) {
-            iteratorSet.add(p.getFetchKey().getKey());
+            iteratorSet.add(p.getFetchKey().getFetchKey());
         }
 
         assertEquals(truthSet, iteratorSet);
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 560ed2d..21a5a49 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -14,7 +14,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <packaging>pom</packaging>
 
@@ -39,4 +40,38 @@
         <module>tika-pipes-integration-tests</module>
     </modules>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${checkstyle.plugin.version}</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>com.puppycrawl.tools</groupId>
+                        <artifactId>checkstyle</artifactId>
+                        <version>8.41</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <configuration>
+                            <configLocation>checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>false</consoleOutput>
+                            <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                            <testSourceDirectories>${project.basedir}/src/test/java</testSourceDirectories>
+                            <violationSeverity>error</violationSeverity>
+                            <failOnViolation>true</failOnViolation>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml b/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
index cf214da..df61b96 100644
--- a/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-fs/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-emitters</artifactId>
@@ -90,15 +90,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index d4a0448..285142b 100644
--- a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -16,16 +16,6 @@
  */
 package org.apache.tika.pipes.emitter.fs;
 
-import org.apache.tika.config.Field;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.StreamEmitter;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Writer;
@@ -35,9 +25,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.StreamEmitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
 
 /**
  * Emitter to write to a file system.
@@ -56,7 +52,8 @@ import java.util.Set;
  *                  &lt;param name="basePath" type="string"&gt;/path/to/output&lt;/param&gt;
  *                  &lt;!-- optional; default is 'json' --&gt;
  *                  &lt;param name="fileExtension" type="string"&gt;json&lt;/param&gt;
- *                  &lt;!-- optional; if the file already exists, options ('skip', 'replace', 'exception')
+ *                  &lt;!-- optional; if the file already exists,
+ *                       options ('skip', 'replace', 'exception')
  *                  default is 'exception' --&gt;
  *                  &lt;param name="onExists" type="string"&gt;skip&lt;/param&gt;
  *              &lt;/params&gt;
@@ -66,17 +63,13 @@ import java.util.Set;
  */
 public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter {
 
-    enum ON_EXISTS {
-        SKIP, EXCEPTION, REPLACE
-    }
-
     private Path basePath = null;
     private String fileExtension = "json";
     private ON_EXISTS onExists = ON_EXISTS.EXCEPTION;
 
-
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
         Path output;
         if (metadataList == null || metadataList.size() == 0) {
             throw new TikaEmitterException("metadata list must not be null or of size 0");
@@ -124,15 +117,14 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter
         } else if (onExists.equals("exception")) {
             this.onExists = ON_EXISTS.EXCEPTION;
         } else {
-            throw new IllegalArgumentException(
-                    "Don't understand '" + onExists +
-                            "'; must be one of: 'skip', 'replace', 'exception'");
+            throw new IllegalArgumentException("Don't understand '" + onExists +
+                    "'; must be one of: 'skip', 'replace', 'exception'");
         }
     }
 
     @Override
-    public void emit(String path, InputStream inputStream, Metadata userMetadata) throws IOException,
-            TikaEmitterException {
+    public void emit(String path, InputStream inputStream, Metadata userMetadata)
+            throws IOException, TikaEmitterException {
         Path target = basePath.resolve(path);
 
         if (!Files.isDirectory(target.getParent())) {
@@ -152,4 +144,8 @@ public class FileSystemEmitter extends AbstractEmitter implements StreamEmitter
             }
         }
     }
+
+    enum ON_EXISTS {
+        SKIP, EXCEPTION, REPLACE
+    }
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
index 7b7def3..773865c 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-emitters</artifactId>
@@ -137,15 +137,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index 68fc1ee..6f9c745 100644
--- a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -16,15 +16,31 @@
  */
 package org.apache.tika.pipes.emitter.s3;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.SdkClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.http.client.CredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -33,31 +49,13 @@ import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.StreamEmitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.metadata.Metadata;
 import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 /**
  * Emits to existing s3 bucket
@@ -71,17 +69,21 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *                  &lt;!-- required --&gt;
  *                  &lt;param name="region" type="string"&gt;us-east-1&lt;/param&gt;
  *                  &lt;!-- required --&gt;
- *                  &lt;param name="credentialsProvider" type="string"&gt;(profile|instance)&lt;/param&gt;
+ *                  &lt;param name="credentialsProvider"
+ *                       type="string"&gt;(profile|instance)&lt;/param&gt;
  *                  &lt;!-- required if credentialsProvider=profile--&gt;
  *                  &lt;param name="profile" type="string"&gt;my-profile&lt;/param&gt;
  *                  &lt;!-- required --&gt;
  *                  &lt;param name="bucket" type="string"&gt;my-bucket&lt;/param&gt;
- *                  &lt;!-- optional; prefix to add to the path before emitting; default is no prefix --&gt;
+ *                  &lt;!-- optional; prefix to add to the path before emitting;
+ *                       default is no prefix --&gt;
  *                  &lt;param name="prefix" type="string"&gt;my-prefix&lt;/param&gt;
  *                  &lt;!-- optional; default is 'json' this will be added to the SOURCE_PATH
- *                                    if no emitter key is specified. Do not add a "." before the extension --&gt;
+ *                                    if no emitter key is specified. Do not add a "."
+ *                                     before the extension --&gt;
  *                  &lt;param name="fileExtension" type="string"&gt;json&lt;/param&gt;
- *                  &lt;!-- optional; default is 'true'-- whether to copy the json to a local file before putting to s3 --&gt;
+ *                  &lt;!-- optional; default is 'true'-- whether to copy the
+ *                     json to a local file before putting to s3 --&gt;
  *                  &lt;param name="spoolToTemp" type="bool"&gt;true&lt;/param&gt;
  *              &lt;/params&gt;
  *          &lt;/emitter&gt;
@@ -108,15 +110,16 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
      * @throws TikaException
      */
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
         if (metadataList == null || metadataList.size() == 0) {
             throw new TikaEmitterException("metadata list must not be null or of size 0");
         }
 
-        if (! spoolToTemp) {
+        if (!spoolToTemp) {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            try (Writer writer =
-                         new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
+            try (Writer writer = new BufferedWriter(
+                    new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
                 JsonMetadataList.toJson(metadataList, writer);
             } catch (IOException e) {
                 throw new TikaEmitterException("can't jsonify", e);
@@ -129,8 +132,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
             TemporaryResources tmp = new TemporaryResources();
             try {
                 Path tmpPath = tmp.createTempFile();
-                try (Writer writer = Files.newBufferedWriter(tmpPath,
-                        StandardCharsets.UTF_8, StandardOpenOption.CREATE)) {
+                try (Writer writer = Files.newBufferedWriter(tmpPath, StandardCharsets.UTF_8,
+                        StandardOpenOption.CREATE)) {
                     JsonMetadataList.toJson(metadataList, writer);
                 } catch (IOException e) {
                     throw new TikaEmitterException("can't jsonify", e);
@@ -145,28 +148,27 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     }
 
     /**
-     *
-     * @param path -- object path, not including the bucket
-     * @param is inputStream to copy
+     * @param path         -- object path, not including the bucket
+     * @param is           inputStream to copy
      * @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata
      * @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception
      */
     @Override
-    public void emit(String path, InputStream is, Metadata userMetadata) throws IOException, TikaEmitterException {
+    public void emit(String path, InputStream is, Metadata userMetadata)
+            throws IOException, TikaEmitterException {
 
         if (!StringUtils.isBlank(prefix)) {
             path = prefix + "/" + path;
         }
 
-        if (! StringUtils.isBlank(fileExtension)) {
+        if (!StringUtils.isBlank(fileExtension)) {
             path += "." + fileExtension;
         }
 
-        LOGGER.debug("about to emit to target bucket: ({}) path:({})",
-                bucket, path);
+        LOGGER.debug("about to emit to target bucket: ({}) path:({})", bucket, path);
         long length = -1;
         if (is instanceof TikaInputStream) {
-            if (((TikaInputStream)is).hasFile()) {
+            if (((TikaInputStream) is).hasFile()) {
                 try {
                     length = ((TikaInputStream) is).getLength();
                 } catch (IOException e) {
@@ -183,7 +185,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
                 String[] vals = userMetadata.getValues(n);
                 if (vals.length > 1) {
                     LOGGER.warn("Can only write the first value for key {}. I see {} values.",
-                            n, vals.length);
+                            n,
+                            vals.length);
                 }
                 objectMetadata.addUserMetadata(n, vals[0]);
             }
@@ -197,6 +200,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
      * Whether or not to spool the metadatalist to a tmp file before putting object.
      * Default: <code>true</code>.  If this is set to <code>false</code>,
      * this emitter writes the json object to memory and then puts that into s3.
+     *
      * @param spoolToTemp
      */
     @Field
@@ -223,7 +227,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     public void setPrefix(String prefix) {
         //strip final "/" if it exists
         if (prefix.endsWith("/")) {
-            this.prefix = prefix.substring(0, prefix.length()-1);
+            this.prefix = prefix.substring(0, prefix.length() - 1);
         } else {
             this.prefix = prefix;
         }
@@ -231,8 +235,9 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
 
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
-        if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
-            throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+        if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+            throw new IllegalArgumentException(
+                    "credentialsProvider must be either 'profile' or instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -240,6 +245,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     /**
      * If you want to customize the output file's file extension.
      * Do not include the "."
+     *
      * @param fileExtension
      */
     @Field
@@ -248,10 +254,10 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     }
 
 
-
     /**
      * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
      * e.g. AmazonClientException in a TikaConfigException.
+     *
      * @param params params to use for initialization
      * @throws TikaConfigException
      */
@@ -261,7 +267,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
         AWSCredentialsProvider provider = null;
         if ("instance".equals(credentialsProvider)) {
             provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if ("profile".equals(credentialsProvider)){
+        } else if ("profile".equals(credentialsProvider)) {
             provider = new ProfileCredentialsProvider(profile);
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
@@ -269,9 +275,7 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
         }
 
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withRegion(region)
-                    .withCredentials(provider)
+            s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 emitter", e);
@@ -279,7 +283,8 @@ public class S3Emitter extends AbstractEmitter implements Initializable, StreamE
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
         mustNotBeEmpty("bucket", this.bucket);
         mustNotBeEmpty("region", this.region);
     }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index e36d1c8..a831fa9 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-emitters</artifactId>
@@ -102,15 +102,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 7fb7f1b..f43963e 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,9 +16,23 @@
  */
 package org.apache.tika.pipes.emitter.solr;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.zip.GZIPOutputStream;
+
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.http.client.HttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.client.HttpClientFactory;
 import org.apache.tika.client.HttpClientUtil;
 import org.apache.tika.client.TikaClientException;
@@ -26,39 +40,19 @@ import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.zip.GZIPOutputStream;
 
 public class SolrEmitter extends AbstractEmitter implements Initializable {
 
-    enum AttachmentStrategy {
-        SKIP,
-        CONCATENATE_CONTENT,
-        PARENT_CHILD,
-        //anything else?
-    }
     private static final String ATTACHMENTS = "attachments";
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class);
     //one day this will be allowed or can be configured?
     private final boolean gzipJson = false;
-
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
     private String url;
     private String contentField = "content";
@@ -66,55 +60,48 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     private int commitWithin = 100;
     private HttpClientFactory httpClientFactory;
     private HttpClient httpClient;
-
     public SolrEmitter() throws TikaConfigException {
         httpClientFactory = new HttpClientFactory();
     }
+
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException,
-            TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
 
         if (metadataList == null || metadataList.size() == 0) {
             LOG.warn("metadataList is null or empty");
             return;
         }
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Writer writer = gzipJson ?
-                new BufferedWriter(
-                        new OutputStreamWriter(
-                                new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
-                new BufferedWriter(
-                        new OutputStreamWriter(bos, StandardCharsets.UTF_8));
-        try (
-            JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+        Writer writer = gzipJson ? new BufferedWriter(
+                new OutputStreamWriter(new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+                new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartArray();
             jsonify(jsonGenerator, emitKey, metadataList);
             jsonGenerator.writeEndArray();
         }
-        LOG.debug("emitting json ({})",
-                new String(bos.toByteArray(), StandardCharsets.UTF_8));
+        LOG.debug("emitting json ({})", new String(bos.toByteArray(), StandardCharsets.UTF_8));
         try {
-            HttpClientUtil.postJson(httpClient,
-                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), bos.toByteArray(), gzipJson);
+            HttpClientUtil
+                    .postJson(httpClient, url + UPDATE_PATH +
+                                    "?commitWithin=" + getCommitWithin(),
+                            bos.toByteArray(), gzipJson);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("can't post", e);
         }
     }
 
     @Override
-    public void emit(List<? extends EmitData> batch) throws IOException,
-            TikaEmitterException {
+    public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
         if (batch == null || batch.size() == 0) {
             LOG.warn("batch is null or empty");
             return;
         }
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Writer writer = gzipJson ?
-                new BufferedWriter(
-                        new OutputStreamWriter(
-                                new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
-            new BufferedWriter(
-                new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+        Writer writer = gzipJson ? new BufferedWriter(
+                new OutputStreamWriter(new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+                new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8));
         try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartArray();
             for (EmitData d : batch) {
@@ -122,21 +109,22 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             }
             jsonGenerator.writeEndArray();
         }
-        LOG.debug("emitting json ({})",
-                new String(bos.toByteArray(), StandardCharsets.UTF_8));
+        LOG.debug("emitting json ({})", new String(bos.toByteArray(), StandardCharsets.UTF_8));
         try {
-            HttpClientUtil.postJson(httpClient,
-                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(),
-                    bos.toByteArray(), gzipJson);
+            HttpClientUtil
+                    .postJson(httpClient, url + UPDATE_PATH +
+                                    "?commitWithin=" + getCommitWithin(),
+                            bos.toByteArray(), gzipJson);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("can't post", e);
         }
     }
 
-    private void jsonify(JsonGenerator jsonGenerator, String emitKey, List<Metadata> metadataList) throws IOException {
+    private void jsonify(JsonGenerator jsonGenerator, String emitKey,
+                         List<Metadata> metadataList)
+            throws IOException {
         metadataList.get(0).set(idField, emitKey);
-        if (attachmentStrategy == AttachmentStrategy.SKIP ||
-            metadataList.size() == 1) {
+        if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
             jsonify(metadataList.get(0), jsonGenerator);
         } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
             //this only handles text for now, not xhtml
@@ -162,12 +150,13 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             jsonGenerator.writeEndArray();
             jsonGenerator.writeEndObject();
         } else {
-            throw new IllegalArgumentException("I don't yet support this attachment strategy: "
-                    + attachmentStrategy);
+            throw new IllegalArgumentException(
+                    "I don't yet support this attachment strategy: " + attachmentStrategy);
         }
     }
 
-    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject) throws IOException {
+    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject)
+            throws IOException {
         jsonGenerator.writeStartObject();
         for (String n : metadata.names()) {
             String[] vals = metadata.getValues(n);
@@ -213,29 +202,35 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         } else if (attachmentStrategy.equals("parent-child")) {
             this.attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
         } else {
-            throw new IllegalArgumentException("Expected 'skip', 'concatenate-content' or "+
+            throw new IllegalArgumentException("Expected 'skip', 'concatenate-content' or " +
                     "'parent-child'. I regret I do not recognize: " + attachmentStrategy);
         }
     }
 
     /**
      * Specify the url for Solr
+     *
      * @param url
      */
     @Field
     public void setUrl(String url) {
         if (url.endsWith("/")) {
-            url = url.substring(0, url.length()-1);
+            url = url.substring(0, url.length() - 1);
         }
         this.url = url;
     }
 
+    public String getContentField() {
+        return contentField;
+    }
+
     /**
      * This is the field _after_ metadata mappings have been applied
      * that contains the "content" for each metadata object.
-     *
+     * <p>
      * This is the field that is used if {@link #attachmentStrategy}
      * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
+     *
      * @param contentField
      */
     @Field
@@ -243,8 +238,8 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         this.contentField = contentField;
     }
 
-    public String getContentField() {
-        return contentField;
+    public int getCommitWithin() {
+        return commitWithin;
     }
 
     @Field
@@ -252,10 +247,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         this.commitWithin = commitWithin;
     }
 
-    public int getCommitWithin() {
-        return commitWithin;
-    }
-
     /**
      * Specify the field in the first Metadata that should be
      * used as the id field for the document.
@@ -300,8 +291,14 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
 
     }
 
+    enum AttachmentStrategy {
+        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+        //anything else?
+    }
+
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 069e7a2..049fe96 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -17,19 +17,20 @@
 package org.apache.tika.pipes.emitter.solr;
 
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.filter.MetadataFilter;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
 
 @Ignore("requires solr to be up and running")
 public class TestBasic {
@@ -39,8 +40,7 @@ public class TestBasic {
         TikaConfig tikaConfig = new TikaConfig(
                 TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
         Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr1");
-        List<Metadata> metadataList = getParentChild(tikaConfig,
-                "id1", 2);
+        List<Metadata> metadataList = getParentChild(tikaConfig, "id1", 2);
 
         emitter.emit("1", metadataList);
     }
@@ -52,17 +52,15 @@ public class TestBasic {
         Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr2");
         List<EmitData> emitData = new ArrayList<>();
         for (int i = 0; i < 100; i++) {
-            List<Metadata> metadataList = getParentChild(tikaConfig,
-                    "batch_"+i, 4);
-            emitData.add(new EmitData(
-                    new EmitKey(emitter.getName(),  "batch_"+i),
-                    metadataList));
+            List<Metadata> metadataList = getParentChild(tikaConfig, "batch_" + i, 4);
+            emitData.add(new EmitData(new EmitKey(emitter.getName(),
+                    "batch_" + i), metadataList));
         }
         emitter.emit(emitData);
     }
 
-    private List<Metadata> getParentChild(TikaConfig tikaConfig,
-                                          String id, int numChildren) throws TikaException {
+    private List<Metadata> getParentChild(TikaConfig tikaConfig, String id, int numChildren)
+            throws TikaException {
         List<Metadata> metadataList = new ArrayList<>();
         MetadataFilter filter = tikaConfig.getMetadataFilter();
 
@@ -75,7 +73,7 @@ public class TestBasic {
         m1.add(TikaCoreProperties.CREATOR, "secondAuthor");
         filter.filter(m1);
         metadataList.add(m1);
-        for (int i = 1; i < numChildren; i++ ) {
+        for (int i = 1; i < numChildren; i++) {
             Metadata m2 = new Metadata();
             m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH, "/path_to_this.txt");
             m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy " + i);
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
index 92b6d56..11e5887 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=debug,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
-log4j.appender.stderr.layout.ConversionPattern= %-5p %m%n
+log4j.appender.stderr.layout.ConversionPattern=%-5p %m%n
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 7cb0c02..3063124 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -16,8 +16,24 @@
  */
 package org.apache.tika.pipes.fetchiterator.csv;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -29,21 +45,6 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 /**
  * Iterates through a UTF-8 CSV file. This adds all columns
@@ -51,17 +52,20 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  * to the metadata object.
  * <p>
  *  <ul>
- *      <li>If a 'fetchKeyColumn' is specified, this will use that column's value as the fetchKey.</li>
- *      <li>If no 'fetchKeyColumn' is specified, this will send the metadata from the other columns.</li>
+ *      <li>If a 'fetchKeyColumn' is specified, this will use that
+ *      column's value as the fetchKey.</li>
+ *      <li>If no 'fetchKeyColumn' is specified, this will send the
+ *      metadata from the other columns.</li>
  *      <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
  *  </ul>
  * <p>
  *  <ul>
- *      <li>If an 'emitKeyColumn' is specified, this will use that column's value as the emit key.</li>
- *      <li>If an 'emitKeyColumn' is not specified, this will use the value from the 'fetchKeyColumn'.</li>
+ *      <li>If an 'emitKeyColumn' is specified, this will use that
+ *      column's value as the emit key.</li>
+ *      <li>If an 'emitKeyColumn' is not specified, this will use
+ *      the value from the 'fetchKeyColumn'.</li>
  *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
  *  </ul>
- *
  */
 public class CSVFetchIterator extends FetchIterator implements Initializable {
 
@@ -110,24 +114,21 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             for (CSVRecord record : records) {
                 String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
                 String emitKey = getEmitKey(fetchEmitKeyIndices, record);
-                if (StringUtils.isBlank(fetchKey) && ! StringUtils.isBlank(fetcherName)) {
+                if (StringUtils.isBlank(fetchKey) && !StringUtils.isBlank(fetcherName)) {
                     LOGGER.debug("Fetcher specified ({}), but no fetchkey was found in ({})",
                             fetcherName, record);
                 }
                 if (StringUtils.isBlank(emitKey)) {
-                    throw new IOException("emitKey must not be blank in :"+record);
+                    throw new IOException("emitKey must not be blank in :" + record);
                 }
                 Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, record);
-                tryToAdd(new FetchEmitTuple(
-                        new FetchKey(fetcherName, fetchKey),
-                        new EmitKey(emitterName, emitKey), metadata,
-                        getOnParseException()));
+                tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+                        new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
             }
         }
     }
 
-    private void checkFetchEmitValidity(String fetcherName,
-                                        String emitterName,
+    private void checkFetchEmitValidity(String fetcherName, String emitterName,
                                         FetchEmitKeyIndices fetchEmitKeyIndices,
                                         List<String> headers) throws IOException {
 
@@ -135,7 +136,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             throw new IOException(new TikaConfigException("must specify at least an emitterName"));
         }
 
-        if (StringUtils.isBlank(fetcherName) && ! StringUtils.isBlank(fetchKeyColumn)) {
+        if (StringUtils.isBlank(fetcherName) && !StringUtils.isBlank(fetchKeyColumn)) {
             throw new IOException(new TikaConfigException("If specifying a 'fetchKeyColumn', " +
                     "you must also specify a 'fetcherName'"));
         }
@@ -145,17 +146,17 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
         }
 
         //if a fetchkeycolumn is specified, make sure that it was found
-        if (! StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find fetchKeyColumn ("+
-                    fetchKeyColumn+" in header.\n" +
-                    "These are the headers I see: " + headers));
+        if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
+            throw new IOException(new TikaConfigException(
+                    "Couldn't find fetchKeyColumn (" + fetchKeyColumn + " in header.\n" +
+                            "These are the headers I see: " + headers));
         }
 
         //if an emitkeycolumn is specified, make sure that it was found
-        if (! StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find emitKeyColumn ("+
-                    emitKeyColumn+" in header.\n" +
-                    "These are the headers I see: " + headers));
+        if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
+            throw new IOException(new TikaConfigException(
+                    "Couldn't find emitKeyColumn (" + emitKeyColumn + " in header.\n" +
+                            "These are the headers I see: " + headers));
         }
 
         if (StringUtils.isBlank(emitKeyColumn)) {
@@ -179,7 +180,8 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
         return getFetchKey(fetchEmitKeyIndices, record);
     }
 
-    private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers, CSVRecord record) {
+    private Metadata loadMetadata(FetchEmitKeyIndices fetchEmitKeyIndices, List<String> headers,
+                                  CSVRecord record) {
         Metadata metadata = new Metadata();
         for (int i = 0; i < record.size(); i++) {
             if (fetchEmitKeyIndices.shouldSkip(i)) {
@@ -192,7 +194,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
 
 
     private FetchEmitKeyIndices loadHeaders(CSVRecord record, List<String> headers)
-        throws IOException {
+            throws IOException {
         int fetchKeyColumnIndex = -1;
         int emitKeyColumnIndex = -1;
 
@@ -200,7 +202,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             String header = record.get(col);
             if (StringUtils.isBlank(header)) {
                 throw new IOException(
-                        new TikaException("Header in column (" +col +") must not be empty"));
+                        new TikaException("Header in column (" + col + ") must not be empty"));
             }
             headers.add(header);
             if (header.equals(fetchKeyColumn)) {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index 55ae655..fd86a05 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
-import org.junit.Test;
+
+import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
+import static org.junit.Assert.assertEquals;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -25,15 +24,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
 
 public class TestCSVFetchIterator {
 
@@ -76,10 +76,8 @@ public class TestCSVFetchIterator {
         for (MockFetcher f : fetchers) {
             for (FetchEmitTuple t : f.pairs) {
                 String id = t.getMetadata().get("id");
-                assertEquals("path/to/my/file"+id,
-                        t.getFetchKey().getKey());
-                assertEquals("project"+
-                                (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
+                assertEquals("path/to/my/file" + id, t.getFetchKey().getFetchKey());
+                assertEquals("project" + (Integer.parseInt(id) % 2 == 1 ? "a" : "b"),
                         t.getMetadata().get("project"));
             }
         }
@@ -98,12 +96,13 @@ public class TestCSVFetchIterator {
     }
 
     private Path get(String testFileName) throws Exception {
-        return Paths.get(TestCSVFetchIterator.class.getResource("/"+testFileName).toURI());
+        return Paths.get(TestCSVFetchIterator.class.getResource("/" + testFileName).toURI());
     }
 
     private static class MockFetcher implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
         private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
index 2813ed4..702554a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.tika</groupId>
@@ -29,10 +29,10 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>tika-fetch-iterator-jdbc</artifactId>
-    
+
     <name>Apache Tika Fetch Iterator - jdbc</name>
     <url>http://tika.apache.org/</url>
-    
+
     <dependencies>
         <dependency>
             <groupId>${project.groupId}</groupId>
@@ -97,15 +97,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index 112f52a..f1ca52c 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -16,19 +16,7 @@
  */
 package org.apache.tika.pipes.fetchiterator.jdbc;
 
-import org.apache.tika.config.Field;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -42,7 +30,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.StringUtils;
 
 /**
  * Iterates through a the results from a sql call via jdbc. This adds all columns
@@ -50,8 +51,10 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  * to the metadata object.
  * <p>
  *  <ul>
- *      <li>If a 'fetchKeyColumn' is specified, this will use that column's value as the fetchKey.</li>
- *      <li>If no 'fetchKeyColumn' is specified, this will send the metadata from the other columns.</li>
+ *      <li>If a 'fetchKeyColumn' is specified, this will use that
+ *      column's value as the fetchKey.</li>
+ *      <li>If no 'fetchKeyColumn' is specified, this will send the
+ *      metadata from the other columns.</li>
  *      <li>The 'fetchKeyColumn' value is not added to the metadata.</li>
  *  </ul>
  * <p>
@@ -59,7 +62,6 @@ import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
  *      <li>An 'emitKeyColumn' must be specified</li>
  *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
  *  </ul>
- *
  */
 public class JDBCFetchIterator extends FetchIterator implements Initializable {
 
@@ -88,15 +90,15 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         this.connection = connection;
     }
 
+    public String getSelect() {
+        return select;
+    }
+
     @Field
     public void setSelect(String select) {
         this.select = select;
     }
 
-    public String getSelect() {
-        return select;
-    }
-
     @Override
     protected void enqueue() throws InterruptedException, IOException, TimeoutException {
         String fetcherName = getFetcherName();
@@ -110,16 +112,17 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
                 while (rs.next()) {
                     if (headers.size() == 0) {
                         fetchEmitKeyIndices = loadHeaders(rs.getMetaData(), headers);
-                        checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices, headers);
+                        checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices,
+                                headers);
                     }
                     try {
                         processRow(fetcherName, emitterName, headers, fetchEmitKeyIndices, rs);
                     } catch (SQLException e) {
-                        LOGGER.warn("Failed to insert: "+rs, e);
+                        LOGGER.warn("Failed to insert: " + rs, e);
                     }
                     rowCount++;
                     if (rowCount % 1000 == 0) {
-                        LOGGER.info("added "+rowCount + " rows to the queue");
+                        LOGGER.info("added " + rowCount + " rows to the queue");
                     }
                 }
             }
@@ -129,21 +132,23 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         } finally {
             try {
                 db.close();
-            } catch (SQLException e){
+            } catch (SQLException e) {
                 LOGGER.warn("failed to close connection", e);
             }
         }
     }
-    private void checkFetchEmitValidity(String fetcherName,
-                                        String emitterName,
+
+    private void checkFetchEmitValidity(String fetcherName, String emitterName,
                                         FetchEmitKeyIndices fetchEmitKeyIndices,
                                         List<String> headers) throws IOException {
 
-        if (! StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find column: "+fetchKeyColumn));
+        if (!StringUtils.isBlank(fetchKeyColumn) && fetchEmitKeyIndices.fetchKeyIndex < 0) {
+            throw new IOException(
+                    new TikaConfigException("Couldn't find column: " + fetchKeyColumn));
         }
-        if (! StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
-            throw new IOException(new TikaConfigException("Couldn't find column: "+emitKeyColumn));
+        if (!StringUtils.isBlank(emitKeyColumn) && fetchEmitKeyIndices.emitKeyIndex < 0) {
+            throw new IOException(
+                    new TikaConfigException("Couldn't find column: " + emitKeyColumn));
         }
     }
 
@@ -165,7 +170,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
             if (i == fetchEmitKeyIndices.emitKeyIndex) {
                 emitKey = getString(i, rs);
                 if (emitKey == null) {
-                    LOGGER.debug("emitKey is empty for record "+toString(rs));
+                    LOGGER.debug("emitKey is empty for record " + toString(rs));
                 }
                 emitKey = (emitKey == null) ? "" : emitKey;
                 continue;
@@ -176,10 +181,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
             }
         }
 
-        tryToAdd(new FetchEmitTuple(
-                new FetchKey(fetcherName, fetchKey),
-                new EmitKey(emitterName, emitKey),
-                metadata, getOnParseException()));
+        tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+                new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
     }
 
     private String toString(ResultSet rs) throws SQLException {
@@ -188,7 +191,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
             String val = rs.getString(i);
             val = (val == null) ? "" : val;
             val = (val.length() > 100) ? val.substring(0, 100) : val;
-            sb.append(rs.getMetaData().getColumnLabel(i)+":"+val+"\n");
+            sb.append(rs.getMetaData().getColumnLabel(i) + ":" + val + "\n");
         }
         return sb.toString();
     }
@@ -203,7 +206,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
     }
 
 
-    private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, List<String> headers) throws SQLException {
+    private FetchEmitKeyIndices loadHeaders(ResultSetMetaData metaData, List<String> headers)
+            throws SQLException {
         int fetchKeyIndex = -1;
         int emitKeyIndex = -1;
         for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -236,7 +240,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         mustNotBeEmpty("emitterName", this.getEmitterName());
         mustNotBeEmpty("emitKeyColumn", this.emitKeyColumn);
 
-        if (StringUtils.isBlank(getFetcherName()) && ! StringUtils.isBlank(fetchKeyColumn)) {
+        if (StringUtils.isBlank(getFetcherName()) && !StringUtils.isBlank(fetchKeyColumn)) {
             throw new TikaConfigException(
                     "If you specify a 'fetchKeyColumn', you must specify a 'fetcherName'");
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index 9a92fc5..7eebe31 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -16,13 +16,9 @@
  */
 package org.apache.tika.pipes.fetchiterator.jdbc;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -44,35 +40,41 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class TestJDBCFetchIterator {
 
     static final String TABLE = "fetchkeys";
-    static Connection CONNECTION;
-    static Path DB_DIR;
     static final String db = "mydb";
     private static final int NUM_ROWS = 1000;
+    static Connection CONNECTION;
+    static Path DB_DIR;
 
     @BeforeClass
     public static void setUp() throws Exception {
         DB_DIR = Files.createTempDirectory("tika-jdbc-fetchiterator-test-");
 
-        CONNECTION = DriverManager.getConnection("jdbc:h2:file:"+DB_DIR.toAbsolutePath()+"/"+db);
-        String sql = "create table "+TABLE +
-                " (id varchar(128), " +
+        CONNECTION =
+                DriverManager.getConnection("jdbc:h2:file:" +
+                        DB_DIR.toAbsolutePath() + "/" + db);
+        String sql = "create table " + TABLE + " (id varchar(128), " +
                 "project varchar(128), " +
                 "fetchKey varchar(128))";
         CONNECTION.createStatement().execute(sql);
 
         for (int i = 0; i < NUM_ROWS; i++) {
-            sql = "insert into "+TABLE + " (id, project, fetchKey) values ('id"+i+"','project"+
-                    (i%2 == 0 ? "a" : "b") +"','fk"+i+"')";
+            sql = "insert into " + TABLE + " (id, project, fetchKey) values ('id" + i +
+                    "','project" + (i % 2 == 0 ? "a" : "b") + "','fk" + i + "')";
             CONNECTION.createStatement().execute(sql);
         }
-        sql = "select count(1) from "+ TABLE;
+        sql = "select count(1) from " + TABLE;
         ResultSet rs = CONNECTION.createStatement().executeQuery(sql);
         while (rs.next()) {
             int cnt = rs.getInt(1);
@@ -90,7 +92,7 @@ public class TestJDBCFetchIterator {
         TikaConfig tk = getConfig();
         int numConsumers = 5;
         FetchIterator fetchIterator = tk.getFetchIterator();
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService =
                 new ExecutorCompletionService<>(es);
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(100);
@@ -118,16 +120,16 @@ public class TestJDBCFetchIterator {
         Matcher m = Pattern.compile("fk(\\d+)").matcher("");
         for (MockFetcher f : fetchers) {
             for (FetchEmitTuple p : f.pairs) {
-                String k = p.getFetchKey().getKey();
+                String k = p.getFetchKey().getFetchKey();
                 String num = "";
                 if (m.reset(k).find()) {
                     num = m.group(1);
                 } else {
-                    fail("failed to find key pattern: "+k);
+                    fail("failed to find key pattern: " + k);
                 }
                 String aOrB = Integer.parseInt(num) % 2 == 0 ? "a" : "b";
-                assertEquals("id"+num, p.getMetadata().get("MY_ID"));
-                assertEquals("project"+aOrB, p.getMetadata().get("MY_PROJECT"));
+                assertEquals("id" + num, p.getMetadata().get("MY_ID"));
+                assertEquals("project" + aOrB, p.getMetadata().get("MY_PROJECT"));
                 assertNull(p.getMetadata().get("fetchKey"));
                 assertNull(p.getMetadata().get("MY_FETCHKEY"));
                 cnt++;
@@ -139,17 +141,22 @@ public class TestJDBCFetchIterator {
     private TikaConfig getConfig() throws Exception {
         String config = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?><properties>\n" +
                 "    <fetchIterators>\n" +
-                "        <fetchIterator class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
+                "        <fetchIterator " +
+                "       class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
                 "            <params>\n" +
                 "                <param name=\"fetcherName\" type=\"string\">s3f</param>\n" +
                 "                <param name=\"emitterName\" type=\"string\">s3e</param>\n" +
                 "                <param name=\"queueSize\" type=\"int\">57</param>\n" +
-                "                <param name=\"fetchKeyColumn\" type=\"string\">my_fetchkey</param>\n" +
-                "                <param name=\"emitKeyColumn\" type=\"string\">my_fetchkey</param>\n" +
+                "                <param name=\"fetchKeyColumn\" " +
+                "                     type=\"string\">my_fetchkey</param>\n" +
+                "                <param name=\"emitKeyColumn\" " +
+                "                    type=\"string\">my_fetchkey</param>\n" +
                 "                <param name=\"select\" type=\"string\">" +
-                "select id as my_id, project as my_project, fetchKey as my_fetchKey from fetchkeys</param>\n" +
-                "                <param name=\"connection\" type=\"string\">jdbc:h2:file:"+
-                DB_DIR.toAbsolutePath()+"/"+db +"</param>\n" +
+                "select id as my_id, project as my_project, fetchKey as my_fetchKey " +
+                "from fetchkeys</param>\n" +
+                "                <param name=\"connection\" " +
+                "                type=\"string\">jdbc:h2:file:" + DB_DIR.toAbsolutePath() + "/" +
+                    db + "</param>\n" +
                 "            </params>\n" +
                 "        </fetchIterator>\n" +
                 "    </fetchIterators>\n" +
@@ -160,6 +167,7 @@ public class TestJDBCFetchIterator {
     private static class MockFetcher implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
         private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=info,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
index ef8854d..24be277 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.tika</groupId>
@@ -133,15 +133,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index b8897a8..988f149 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -16,6 +16,12 @@
  */
 package org.apache.tika.pipes.fetchiterator.s3;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -24,6 +30,9 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.iterable.S3Objects;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -34,14 +43,6 @@ import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 public class S3FetchIterator extends FetchIterator implements Initializable {
 
@@ -76,8 +77,9 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
 
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
-        if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
-            throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+        if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+            throw new IllegalArgumentException(
+                    "credentialsProvider must be either 'profile' or instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -85,6 +87,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
     /**
      * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
      * e.g. AmazonClientException in a TikaConfigException.
+     *
      * @param params params to use for initialization
      * @throws TikaConfigException
      */
@@ -94,7 +97,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         AWSCredentialsProvider provider = null;
         if ("instance".equals(credentialsProvider)) {
             provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if ("profile".equals(credentialsProvider)){
+        } else if ("profile".equals(credentialsProvider)) {
             provider = new ProfileCredentialsProvider(profile);
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
@@ -102,9 +105,7 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         }
 
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withRegion(region)
-                    .withCredentials(provider)
+            s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 fetchiterator");
@@ -128,12 +129,10 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) {
 
             long elapsed = System.currentTimeMillis() - start;
-            LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(),
-                    elapsed);
-            tryToAdd(new FetchEmitTuple(
-                    new FetchKey(fetcherName, summary.getKey()),
-                    new EmitKey(emitterName, summary.getKey()),
-                    new Metadata(), getOnParseException()));
+            LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(), elapsed);
+            tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, summary.getKey()),
+                    new EmitKey(emitterName, summary.getKey()), new Metadata(),
+                    getOnParseException()));
             count++;
         }
         long elapsed = System.currentTimeMillis() - start;
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 567920d..50cb819 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 package org.apache.tika.pipes.fetchiterator.s3;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Ignore;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,7 +29,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 @Ignore("turn into an actual unit test")
 public class TestS3FetchIterator {
@@ -48,7 +50,7 @@ public class TestS3FetchIterator {
         int numConsumers = 6;
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
 
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers+1);
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService c = new ExecutorCompletionService(es);
         List<MockFetcher> fetchers = new ArrayList<>();
         for (int i = 0; i < numConsumers; i++) {
@@ -65,7 +67,7 @@ public class TestS3FetchIterator {
         int finished = 0;
         int completed = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers + 1) {
                 Future<Integer> f = c.take();
                 completed += f.get();
             }
@@ -79,6 +81,7 @@ public class TestS3FetchIterator {
     private static class MockFetcher implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
         private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.queue = queue;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=info,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
index 09dcb97..ae76a52 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>tika-fetchers</artifactId>
@@ -97,15 +97,18 @@
                                 </filter>
                             </filters>
                             <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/LICENSE</resource>
                                     <file>target/classes/META-INF/LICENSE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/NOTICE</resource>
                                     <file>target/classes/META-INF/NOTICE</file>
                                 </transformer>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                                     <resource>META-INF/DEPENDENCIES</resource>
                                     <file>target/classes/META-INF/DEPENDENCIES</file>
                                 </transformer>
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index a6d2325..be39765 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -17,11 +17,19 @@
 package org.apache.tika.pipes.fetcher.http;
 
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.client.HttpClientFactory;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
@@ -32,13 +40,6 @@ import org.apache.tika.exception.TikaException;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
 
 /**
  * Based on Apache httpclient
@@ -52,9 +53,9 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
     public HttpFetcher() throws TikaConfigException {
         httpClientFactory = new HttpClientFactory();
     }
+
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata)
-            throws IOException, TikaException {
+    public InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException {
         HttpGet get = new HttpGet(fetchKey);
         return get(get);
     }
@@ -62,7 +63,7 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
     public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
             throws IOException, TikaException {
         HttpGet get = new HttpGet(fetchKey);
-        get.setHeader("Range", "bytes="+startRange+"-"+endRange);
+        get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
         return get(get);
     }
 
@@ -70,21 +71,18 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
         HttpResponse response = httpClient.execute(get);
         int code = response.getStatusLine().getStatusCode();
         if (code < 200 || code > 299) {
-            throw new IOException("bad status code: "+
-                    code
-                    + " :: " +
+            throw new IOException("bad status code: " + code + " :: " +
                     responseToString(response.getEntity().getContent()));
         }
 
         //spool to local
         long start = System.currentTimeMillis();
-        TikaInputStream tis = TikaInputStream.get(
-                response.getEntity().getContent());
+        TikaInputStream tis = TikaInputStream.get(response.getEntity().getContent());
         tis.getPath();
         if (response instanceof CloseableHttpResponse) {
             ((CloseableHttpResponse) response).close();
         }
-        long elapsed = System.currentTimeMillis()-start;
+        long elapsed = System.currentTimeMillis() - start;
         LOG.debug("took {} ms to copy to local tmp file", elapsed);
         return tis;
     }
@@ -149,7 +147,8 @@ public class HttpFetcher extends AbstractFetcher implements Initializable {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
 
     }
 }
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
index 8a2d8a1..3a2e127 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
@@ -16,13 +16,7 @@
  */
 package org.apache.tika.pipes.fetcher.http;
 
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TemporaryResources;
-import org.apache.tika.metadata.Metadata;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.xml.sax.SAXException;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,32 +25,40 @@ import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.util.zip.GZIPInputStream;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TemporaryResources;
+import org.apache.tika.metadata.Metadata;
 
 @Ignore("requires network connectivity")
 public class HttpFetcherTest {
 
-        @Test
-        public void testRange() throws Exception {
-            String url =
-                    "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
-            long start = 969596307;
-            long end = start + 1408 - 1;
-            Metadata metadata = new Metadata();
-            HttpFetcher httpFetcher = (HttpFetcher) getConfig("tika-config-http.xml")
-                    .getFetcherManager().getFetcher("http");
-            try (TemporaryResources tmp = new TemporaryResources()) {
-                Path tmpPath = tmp.createTempFile();
-                try (InputStream is = httpFetcher.fetch(url, start, end, metadata)) {
-                    Files.copy(new GZIPInputStream(is), tmpPath, StandardCopyOption.REPLACE_EXISTING);
-                }
-                assertEquals(2461, Files.size(tmpPath));
+    @Test
+    public void testRange() throws Exception {
+        String url =
+                "https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-45/segments/1603107869785.9/warc/CC-MAIN-20201020021700-20201020051700-00529.warc.gz";
+        long start = 969596307;
+        long end = start + 1408 - 1;
+        Metadata metadata = new Metadata();
+        HttpFetcher httpFetcher =
+                (HttpFetcher) getConfig("tika-config-http.xml").getFetcherManager()
+                        .getFetcher("http");
+        try (TemporaryResources tmp = new TemporaryResources()) {
+            Path tmpPath = tmp.createTempFile();
+            try (InputStream is = httpFetcher.fetch(url, start, end, metadata)) {
+                Files.copy(new GZIPInputStream(is), tmpPath, StandardCopyOption.REPLACE_EXISTING);
             }
+            assertEquals(2461, Files.size(tmpPath));
         }
+    }
 
 
     TikaConfig getConfig(String path) throws TikaException, IOException, SAXException {
-            return new TikaConfig(HttpFetcherTest.class.getResourceAsStream("/"+path));
+        return new TikaConfig(HttpFetcherTest.class.getResourceAsStream("/" + path));
     }
 
 
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index 506a040..a6b34dd 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -16,6 +16,13 @@
  */
 package org.apache.tika.pipes.fetcher.s3;
 
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.regex.Pattern;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
@@ -24,24 +31,18 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.fetcher.AbstractFetcher;
 import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
 
 /**
  * Fetches files from s3. Example string: s3://my_bucket/path/to/my_file.pdf
@@ -61,27 +62,23 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     private boolean spoolToTemp = true;
 
     @Override
-    public InputStream fetch(String fetchKey, Metadata metadata)
-            throws TikaException, IOException {
+    public InputStream fetch(String fetchKey, Metadata metadata) throws TikaException, IOException {
 
-        LOGGER.debug("about to fetch fetchkey={} from bucket ({})",
-                fetchKey, bucket);
+        LOGGER.debug("about to fetch fetchkey={} from bucket ({})", fetchKey, bucket);
 
         try {
             S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey));
             if (extractUserMetadata) {
-                for (Map.Entry<String, String> e :
-                        s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+                for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
+                        .entrySet()) {
                     metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
                 }
             }
             if (!spoolToTemp) {
-                return TikaInputStream.get(
-                        s3Object.getObjectContent());
+                return TikaInputStream.get(s3Object.getObjectContent());
             } else {
                 long start = System.currentTimeMillis();
-                TikaInputStream tis = TikaInputStream.get(
-                        s3Object.getObjectContent());
+                TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
                 tis.getPath();
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
@@ -95,26 +92,24 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     public InputStream fetch(String fetchKey, long startRange, long endRange, Metadata metadata)
             throws TikaException, IOException {
         //TODO -- figure out how to integrate this
-        LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})",
-                fetchKey, startRange, endRange, bucket);
+        LOGGER.debug("about to fetch fetchkey={} (start={} end={}) from bucket ({})", fetchKey,
+                startRange, endRange, bucket);
 
         try {
-            S3Object s3Object = s3Client.getObject(new GetObjectRequest(bucket, fetchKey)
-                    .withRange(startRange, endRange));
+            S3Object s3Object = s3Client.getObject(
+                    new GetObjectRequest(bucket, fetchKey).withRange(startRange, endRange));
 
             if (extractUserMetadata) {
-                for (Map.Entry<String, String> e :
-                        s3Object.getObjectMetadata().getUserMetadata().entrySet()) {
+                for (Map.Entry<String, String> e : s3Object.getObjectMetadata().getUserMetadata()
+                        .entrySet()) {
                     metadata.add(PREFIX + ":" + e.getKey(), e.getValue());
                 }
             }
             if (!spoolToTemp) {
-                return TikaInputStream.get(
-                        s3Object.getObjectContent());
+                return TikaInputStream.get(s3Object.getObjectContent());
             } else {
                 long start = System.currentTimeMillis();
-                TikaInputStream tis = TikaInputStream.get(
-                        s3Object.getObjectContent());
+                TikaInputStream tis = TikaInputStream.get(s3Object.getObjectContent());
                 tis.getPath();
                 long elapsed = System.currentTimeMillis() - start;
                 LOGGER.debug("took {} ms to copy to local tmp file", elapsed);
@@ -157,8 +152,9 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
 
     @Field
     public void setCredentialsProvider(String credentialsProvider) {
-        if (! credentialsProvider.equals("profile") && ! credentialsProvider.equals("instance")) {
-            throw new IllegalArgumentException("credentialsProvider must be either 'profile' or instance'");
+        if (!credentialsProvider.equals("profile") && !credentialsProvider.equals("instance")) {
+            throw new IllegalArgumentException(
+                    "credentialsProvider must be either 'profile' or instance'");
         }
         this.credentialsProvider = credentialsProvider;
     }
@@ -166,6 +162,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     /**
      * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
      * e.g. AmazonClientException in a TikaConfigException.
+     *
      * @param params params to use for initialization
      * @throws TikaConfigException
      */
@@ -175,7 +172,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         AWSCredentialsProvider provider = null;
         if ("instance".equals(credentialsProvider)) {
             provider = InstanceProfileCredentialsProvider.getInstance();
-        } else if ("profile".equals(credentialsProvider)){
+        } else if ("profile".equals(credentialsProvider)) {
             provider = new ProfileCredentialsProvider(profile);
         } else {
             throw new TikaConfigException("credentialsProvider must be set and " +
@@ -183,9 +180,7 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
         }
 
         try {
-            s3Client = AmazonS3ClientBuilder.standard()
-                    .withRegion(region)
-                    .withCredentials(provider)
+            s3Client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
             throw new TikaConfigException("can't initialize s3 fetcher", e);
@@ -193,7 +188,8 @@ public class S3Fetcher extends AbstractFetcher implements Initializable {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
         mustNotBeEmpty("bucket", this.bucket);
         mustNotBeEmpty("region", this.region);
     }
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
index 450d933..9be1ab4 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
@@ -15,11 +15,6 @@
  * limitations under the License.
  */
 package org.apache.tika.pipes.fetcher.s3;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.metadata.Metadata;
-import org.junit.Ignore;
-import org.junit.Test;
 
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -28,12 +23,19 @@ import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Collections;
 
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
 @Ignore("write actual unit tests")
 public class TestS3Fetcher {
     private static final String FETCH_STRING = "";
-    private Path outputFile = Paths.get("");
-    private String region = "us-east-1";
-    private String profile = "";
+    private final Path outputFile = Paths.get("");
+    private final String region = "us-east-1";
+    private final String profile = "";
 
     @Test
     public void testBasic() throws Exception {
@@ -50,9 +52,8 @@ public class TestS3Fetcher {
 
     @Test
     public void testConfig() throws Exception {
-        TikaConfig config = new TikaConfig(
-                this.getClass().getResourceAsStream("/tika-config-s3.xml")
-        );
+        TikaConfig config =
+                new TikaConfig(this.getClass().getResourceAsStream("/tika-config-s3.xml"));
         Fetcher fetcher = config.getFetcherManager().getFetcher("s3");
         Metadata metadata = new Metadata();
         try (InputStream is = fetcher.fetch(FETCH_STRING, metadata)) {
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
index 8cbc26b..77619f9 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientFactory.java
@@ -16,6 +16,27 @@
  */
 package org.apache.tika.client;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.spec.SecretKeySpec;
+import javax.net.ssl.SSLContext;
+
 import org.apache.http.Header;
 import org.apache.http.HeaderElement;
 import org.apache.http.HeaderElementIterator;
@@ -52,35 +73,15 @@ import org.apache.http.message.BasicHeaderElementIterator;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.ssl.SSLContexts;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.crypto.BadPaddingException;
-import javax.crypto.Cipher;
-import javax.crypto.IllegalBlockSizeException;
-import javax.crypto.NoSuchPaddingException;
-import javax.crypto.spec.SecretKeySpec;
-import javax.net.ssl.SSLContext;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.security.InvalidKeyException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.utils.StringUtils;
 
 /**
  * This holds quite a bit of state and is not thread safe.  Beware!
- *
+ * <p>
  * Also, we're currently ignoring the SSL checks.  Please open a ticket/PR
  * if you need robust SSL.
  */
@@ -104,7 +105,7 @@ public class HttpClientFactory {
     private String password;
     private String ntDomain;//if using nt credentials
     private String authScheme = "basic"; //ntlm or basic
-    private boolean credentialsAESEncrypted = false;
+    private final boolean credentialsAESEncrypted = false;
 
 
     public HttpClientFactory() throws TikaConfigException {
@@ -116,6 +117,7 @@ public class HttpClientFactory {
             aes = new AES();
         }
     }
+
     public String getProxyHost() {
         return proxyHost;
     }
@@ -218,6 +220,7 @@ public class HttpClientFactory {
 
     /**
      * only basic and ntlm are supported
+     *
      * @param authScheme
      */
     public void setAuthScheme(String authScheme) {
@@ -230,19 +233,18 @@ public class HttpClientFactory {
         TrustStrategy acceptingTrustStrategy = (cert, authType) -> true;
         SSLContext sslContext = null;
         try {
-            sslContext = SSLContexts.custom().loadTrustMaterial(null,
-                    acceptingTrustStrategy).build();
+            sslContext =
+                    SSLContexts.custom().loadTrustMaterial(
+                            null, acceptingTrustStrategy).build();
         } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
             throw new TikaConfigException("", e);
         }
-        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext,
-                NoopHostnameVerifier.INSTANCE);
+        SSLConnectionSocketFactory sslsf =
+                new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
 
         Registry<ConnectionSocketFactory> socketFactoryRegistry =
-                RegistryBuilder.<ConnectionSocketFactory>create()
-                        .register("https", sslsf)
-                        .register("http", new PlainConnectionSocketFactory())
-                        .build();
+                RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslsf)
+                        .register("http", new PlainConnectionSocketFactory()).build();
 
         PoolingHttpClientConnectionManager manager =
                 new PoolingHttpClientConnectionManager(socketFactoryRegistry);
@@ -253,19 +255,13 @@ public class HttpClientFactory {
         addCredentialsProvider(builder);
         addProxy(builder);
         return builder.setConnectionManager(manager)
-                .setRedirectStrategy(
-                        new CustomRedirectStrategy(allowedHostsForRedirect))
-                .setDefaultRequestConfig(RequestConfig.custom()
-                        .setTargetPreferredAuthSchemes(Arrays.asList(AuthSchemes.BASIC,
-                                AuthSchemes.NTLM))
-                        .setConnectionRequestTimeout((int) requestTimeout)
-                        .setConnectionRequestTimeout(connectTimeout)
-                        .setSocketTimeout(socketTimeout)
-                        .build()
-                )
-                .setKeepAliveStrategy(getKeepAliveStrategy())
-                .setSSLSocketFactory(sslsf)
-                .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+                .setRedirectStrategy(new CustomRedirectStrategy(allowedHostsForRedirect))
+                .setDefaultRequestConfig(RequestConfig.custom().setTargetPreferredAuthSchemes(
+                        Arrays.asList(AuthSchemes.BASIC, AuthSchemes.NTLM))
+                        .setConnectionRequestTimeout(requestTimeout)
+                        .setConnectionRequestTimeout(connectTimeout).setSocketTimeout(socketTimeout)
+                        .build()).setKeepAliveStrategy(getKeepAliveStrategy())
+                .setSSLSocketFactory(sslsf).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                 .build();
     }
 
@@ -285,33 +281,32 @@ public class HttpClientFactory {
 
         if ((StringUtils.isBlank(userName) && StringUtils.isBlank(password)) ||
                 (StringUtils.isBlank(password) && StringUtils.isBlank(userName))) {
-            throw new IllegalArgumentException("can't have one of 'username', " +
-                    "'password' null and the other not");
+            throw new IllegalArgumentException(
+                    "can't have one of 'username', " + "'password' null and the other not");
         }
 
         String finalUserName = decrypt(userName);
         String finalPassword = decrypt(password);
         String finalDomain = decrypt(ntDomain);
-            CredentialsProvider provider = new BasicCredentialsProvider();
-            Credentials credentials = null;
-            Registry<AuthSchemeProvider> authSchemeRegistry = null;
-            if (authScheme.equals("basic")) {
-                credentials = new UsernamePasswordCredentials(finalUserName, finalPassword);
-                authSchemeRegistry = RegistryBuilder
-                        .<AuthSchemeProvider>create()
-                        .register("basic", new BasicSchemeFactory())
-                        .build();
-            } else if (authScheme.equals("ntlm")) {
-                if (StringUtils.isBlank(ntDomain)) {
-                    throw new IllegalArgumentException("must specify 'ntDomain'");
-                }
-                credentials = new NTCredentials(finalUserName, finalPassword, null, finalDomain);
-                authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
-                        .register("ntlm", new NTLMSchemeFactory()).build();
+        CredentialsProvider provider = new BasicCredentialsProvider();
+        Credentials credentials = null;
+        Registry<AuthSchemeProvider> authSchemeRegistry = null;
+        if (authScheme.equals("basic")) {
+            credentials = new UsernamePasswordCredentials(finalUserName, finalPassword);
+            authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
+                    .register("basic", new BasicSchemeFactory()).build();
+        } else if (authScheme.equals("ntlm")) {
+            if (StringUtils.isBlank(ntDomain)) {
+                throw new IllegalArgumentException("must specify 'ntDomain'");
             }
-            provider.setCredentials(AuthScope.ANY, credentials);
-            builder.setDefaultCredentialsProvider(provider);
-            builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
+            credentials = new NTCredentials(finalUserName, finalPassword,
+                    null, finalDomain);
+            authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
+                    .register("ntlm", new NTLMSchemeFactory()).build();
+        }
+        provider.setCredentials(AuthScope.ANY, credentials);
+        builder.setDefaultCredentialsProvider(provider);
+        builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
 
     }
 
@@ -350,7 +345,7 @@ public class HttpClientFactory {
     private static class CustomRedirectStrategy extends LaxRedirectStrategy {
 
         private static final Logger LOG = LoggerFactory.getLogger(CustomRedirectStrategy.class);
-        private Set<String> allowedHosts;
+        private final Set<String> allowedHosts;
 
         public CustomRedirectStrategy(Set<String> allowedHosts) {
             this.allowedHosts = allowedHosts;
@@ -373,7 +368,9 @@ public class HttpClientFactory {
         }
 
         @Override
-        public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException {
+        public boolean isRedirected(HttpRequest request, HttpResponse response,
+                                    HttpContext context)
+                throws ProtocolException {
             boolean isRedirectedSuper = super.isRedirected(request, response, context);
             if (isRedirectedSuper) {
                 Header locationHeader = response.getFirstHeader("Location");
@@ -422,9 +419,10 @@ public class HttpClientFactory {
             try {
                 Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
-                return Base64.getEncoder()
-                        .encodeToString(cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8)));
-            } catch (NoSuchAlgorithmException|InvalidKeyException|NoSuchPaddingException|BadPaddingException|IllegalBlockSizeException e) {
+                return Base64.getEncoder().encodeToString(
+                        cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8)));
+            } catch (NoSuchAlgorithmException | InvalidKeyException |
+                    NoSuchPaddingException | BadPaddingException | IllegalBlockSizeException e) {
                 throw new TikaConfigException("bad encryption info", e);
             }
         }
@@ -435,11 +433,8 @@ public class HttpClientFactory {
                 cipher.init(Cipher.DECRYPT_MODE, secretKey);
                 return new String(cipher.doFinal(Base64.getDecoder().decode(strToDecrypt)),
                         StandardCharsets.UTF_8);
-            } catch (NoSuchAlgorithmException|
-                    InvalidKeyException|
-                    NoSuchPaddingException|
-                    BadPaddingException|
-                    IllegalBlockSizeException e) {
+            } catch (NoSuchAlgorithmException | InvalidKeyException |
+                    NoSuchPaddingException | BadPaddingException | IllegalBlockSizeException e) {
                 throw new TikaConfigException("bad encryption info", e);
             }
         }
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
index 11ebb3c..3de32da 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
@@ -16,20 +16,19 @@
  */
 package org.apache.tika.client;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.util.EntityUtils;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
 public class HttpClientUtil {
 
-    public static boolean postJson(HttpClient client, String url, String json) throws IOException,
-            TikaClientException {
+    public static boolean postJson(HttpClient client, String url, String json)
+            throws IOException, TikaClientException {
         HttpPost post = new HttpPost(url);
         post.setHeader("Content-Encoding", "gzip");
         ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
@@ -41,9 +40,8 @@ public class HttpClientUtil {
 
         if (response.getStatusLine().getStatusCode() != 200) {
             String msg = EntityUtils.toString(response.getEntity());
-            throw new TikaClientException("Bad status: " +
-                    response.getStatusLine().getStatusCode() + " : "+
-                    msg);
+            throw new TikaClientException(
+                    "Bad status: " + response.getStatusLine().getStatusCode() + " : " + msg);
         } else {
             String msg = EntityUtils.toString(response.getEntity());
             System.out.println("httputil: " + msg);
@@ -51,9 +49,8 @@ public class HttpClientUtil {
         return true;
     }
 
-    public static boolean postJson(HttpClient client, String url,
-                                   byte[] bytes, boolean gzipped) throws IOException,
-            TikaClientException {
+    public static boolean postJson(HttpClient client, String url, byte[] bytes, boolean gzipped)
+            throws IOException, TikaClientException {
         HttpPost post = new HttpPost(url);
         if (gzipped) {
             post.setHeader("Content-Encoding", "gzip");
@@ -67,9 +64,8 @@ public class HttpClientUtil {
 
         if (response.getStatusLine().getStatusCode() != 200) {
             String msg = EntityUtils.toString(response.getEntity());
-            throw new TikaClientException("Bad status: " +
-                    response.getStatusLine().getStatusCode() + " : "+
-                    msg);
+            throw new TikaClientException(
+                    "Bad status: " + response.getStatusLine().getStatusCode() + " : " + msg);
         } else {
             String msg = EntityUtils.toString(response.getEntity());
             System.out.println("httputil: " + msg);
diff --git a/tika-pipes/tika-pipes-async/pom.xml b/tika-pipes/tika-pipes-async/pom.xml
index ded8a0a..5f4e418 100644
--- a/tika-pipes/tika-pipes-async/pom.xml
+++ b/tika-pipes/tika-pipes-async/pom.xml
@@ -17,8 +17,8 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.tika</groupId>
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index 4860299..cb8347f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -1,16 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Connection;
@@ -29,6 +36,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
 public class AsyncCli {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
@@ -37,7 +54,7 @@ public class AsyncCli {
         Path configPath = Paths.get(args[0]);
         int maxConsumers = 20;
         AsyncCli asyncCli = new AsyncCli();
-        Path dbDir = Paths.get("/Users/allison/Desktop/tmp-db");//Files.createTempDirectory("tika-async-db-");
+        Path dbDir = Files.createTempDirectory("tika-async-db-");
         try {
             asyncCli.execute(dbDir, configPath, maxConsumers);
         } finally {
@@ -46,6 +63,18 @@ public class AsyncCli {
 
     }
 
+    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+        PreparedStatement findActiveWorkers =
+                connection.prepareStatement("select worker_id from workers");
+        List<Integer> workers = new ArrayList<>();
+        try (ResultSet rs = findActiveWorkers.executeQuery()) {
+            while (rs.next()) {
+                workers.add(rs.getInt(1));
+            }
+        }
+        return workers;
+    }
+
     private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
         TikaConfig tikaConfig = new TikaConfig(configPath);
 
@@ -60,18 +89,19 @@ public class AsyncCli {
             if (fetchIterator instanceof EmptyFetchIterator) {
                 throw new IllegalArgumentException("can't have empty fetch iterator");
             }
-            ArrayBlockingQueue<FetchEmitTuple> q = new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
+            ArrayBlockingQueue<FetchEmitTuple> q =
+                    new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
             AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
             executorCompletionService.submit(fetchIterator);
             executorCompletionService.submit(enqueuer);
             executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
 
             for (int i = 0; i < maxConsumers; i++) {
-                executorCompletionService.submit(new AsyncWorker(connection,
-                        connectionString, i, configPath));
+                executorCompletionService
+                        .submit(new AsyncWorker(connection, connectionString, i, configPath));
             }
             int completed = 0;
-            while (completed < maxConsumers+3) {
+            while (completed < maxConsumers + 3) {
                 Future<Integer> future = executorCompletionService.take();
                 if (future != null) {
                     int val = future.get();
@@ -86,17 +116,13 @@ public class AsyncCli {
 
     private String setupTables(Path dbDir) throws SQLException {
         Path dbFile = dbDir.resolve("tika-async");
-        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
-                ";AUTO_SERVER=TRUE";
+        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
         Connection connection = DriverManager.getConnection(url);
 
-        String sql = "create table task_queue " +
-                "(id bigint auto_increment primary key," +
-                "status tinyint," +//byte
-                "worker_id integer," +
-                "retry smallint," + //short
-                "time_stamp timestamp," +
-                "json varchar(64000))";
+        String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
+                "status tinyint," + //byte
+                "worker_id integer," + "retry smallint," + //short
+                "time_stamp timestamp," + "json varchar(64000))";
         connection.createStatement().execute(sql);
         //no clear benefit to creating an index on timestamp
 //        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
@@ -106,17 +132,13 @@ public class AsyncCli {
         sql = "create table workers_shutdown (worker_id int primary key)";
         connection.createStatement().execute(sql);
 
-        sql = "create table error_log (task_id bigint, " +
-                "fetch_key varchar(10000)," +
-                "time_stamp timestamp," +
-                "retry integer," +
-                "error_code tinyint)";
+        sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
+                "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
         connection.createStatement().execute(sql);
 
         return url;
     }
 
-
     //this reads fetchemittuples from the queue and inserts them in the db
     //for the workers to read
     private static class AsyncTaskEnqueuer implements Callable<Integer> {
@@ -128,8 +150,8 @@ public class AsyncCli {
 
         private volatile boolean isComplete = false;
 
-        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
-                          Connection connection) throws SQLException {
+        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
+                throws SQLException {
             this.queue = queue;
             this.connection = connection;
             String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
@@ -142,7 +164,7 @@ public class AsyncCli {
             List<Integer> workers = new ArrayList<>();
             while (true) {
                 FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
-                LOG.debug("enqueing to db "+t);
+                LOG.debug("enqueing to db " + t);
                 if (t == null) {
                     //log.trace?
                 } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
@@ -155,7 +177,7 @@ public class AsyncCli {
                     while (workers.size() == 0 && elapsed < 600000) {
                         workers = getActiveWorkers(connection);
                         Thread.sleep(100);
-                        elapsed = System.currentTimeMillis()-start;
+                        elapsed = System.currentTimeMillis() - start;
                     }
                     insert(t, workers);
                 }
@@ -165,7 +187,9 @@ public class AsyncCli {
         boolean isComplete() {
             return isComplete;
         }
-        private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+
+        private void insert(FetchEmitTuple t, List<Integer> workers)
+                throws IOException, SQLException {
             int workerId = workers.size() == 1 ? workers.get(0) :
                     workers.get(random.nextInt(workers.size()));
             insert.clearParameters();
@@ -190,12 +214,12 @@ public class AsyncCli {
         private final Random random = new Random();
 
 
-        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer)
+                throws SQLException {
             this.connection = connection;
             this.enqueuer = enqueuer;
             //this gets workers and # of tasks in desc order of number of tasks
-            String sql = "select w.worker_id, p.cnt " +
-                    "from workers w " +
+            String sql = "select w.worker_id, p.cnt " + "from workers w " +
                     "left join (select worker_id, count(1) as cnt from task_queue " +
                     "where status=0 group by worker_id)" +
                     " p on p.worker_id=w.worker_id order by p.cnt desc";
@@ -213,14 +237,12 @@ public class AsyncCli {
             //current strategy reallocate tasks from longest queue to shortest
             //TODO: might consider randomly shuffling or other algorithms
             sql = "update task_queue set worker_id= ? where id in " +
-                    "(select id from task_queue where " +
-                    "worker_id = ? and " +
-                    "rand() < 0.8 " +
+                    "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
                     "and status=0 for update)";
             reallocate = connection.prepareStatement(sql);
 
-            sql = "select count(1) from task_queue where status="
-                    + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
+            sql = "select count(1) from task_queue where status=" +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
             countAvailableTasks = connection.prepareStatement(sql);
 
             sql = "insert into workers_shutdown (worker_id) values (?)";
@@ -251,7 +273,7 @@ public class AsyncCli {
         }
 
         private boolean isComplete() throws SQLException {
-            if (! enqueuer.isComplete) {
+            if (!enqueuer.isComplete) {
                 return false;
             }
             try (ResultSet rs = countAvailableTasks.executeQuery()) {
@@ -299,7 +321,8 @@ public class AsyncCli {
 
         }
 
-        private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+        private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
+                throws SQLException {
 
             if (missingWorkers.size() == 0) {
                 return;
@@ -316,8 +339,7 @@ public class AsyncCli {
                 allocateNonworkersToWorkers.setInt(1, active);
                 allocateNonworkersToWorkers.setInt(2, missing);
                 allocateNonworkersToWorkers.execute();
-                LOG.debug("allocating missing working ({}) to ({})",
-                        missing, active);
+                LOG.debug("allocating missing working ({}) to ({})", missing, active);
             }
         }
 
@@ -333,16 +355,4 @@ public class AsyncCli {
             return missingWorkers;
         }
     }
-
-    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
-        PreparedStatement findActiveWorkers = connection.prepareStatement(
-                "select worker_id from workers");
-        List<Integer> workers = new ArrayList<>();
-        try (ResultSet rs = findActiveWorkers.executeQuery()) {
-            while (rs.next()) {
-                workers.add(rs.getInt(1));
-            }
-        }
-        return workers;
-    }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index a86f563..33964b3 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -1,38 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.tika.utils.StringUtils;
-
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
+import org.apache.tika.utils.StringUtils;
+
 public class AsyncConfig {
 
+    private final int queueSize = 1000;
+    private final int numWorkers = 10;
+    private final int numEmitters = 1;
+    private String jdbcString;
+    private Path dbDir;
+
     public static AsyncConfig load(Path p) throws IOException {
         AsyncConfig asyncConfig = new AsyncConfig();
 
         if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
             asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
             Path dbFile = asyncConfig.dbDir.resolve("tika-async");
-            asyncConfig.setJdbcString("jdbc:h2:file:" + dbFile.toAbsolutePath().toString() +
-                    ";AUTO_SERVER=TRUE");
+            asyncConfig.setJdbcString(
+                    "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE");
         } else {
             asyncConfig.dbDir = null;
         }
         return asyncConfig;
     }
 
-    private int queueSize = 1000;
-    private int maxConsumers = 10;
-    private String jdbcString;
-    private Path dbDir;
-
     public int getQueueSize() {
         return queueSize;
     }
 
-    public int getMaxConsumers() {
-        return maxConsumers;
+    public int getNumWorkers() {
+        return numWorkers;
+    }
+
+    public int getNumEmitters() {
+        return numEmitters;
     }
 
     public String getJdbcString() {
@@ -46,6 +67,7 @@ public class AsyncConfig {
     /**
      * If no jdbc connection was specified, this
      * dir contains the h2 database.  Otherwise, null.
+     *
      * @return
      */
     public Path getTempDBDir() {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
index d7e058d..df80929 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
@@ -1,20 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitData;
-
-import java.util.List;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncData extends EmitData {
 
-    private final AsyncTask asyncTask;
+    private final long taskId;
+    private final FetchKey fetchKey;
+    private final FetchEmitTuple.ON_PARSE_EXCEPTION onParseException;
+
+    public AsyncData(@JsonProperty("taskId") long taskId,
+                     @JsonProperty("fetchKey") FetchKey fetchKey,
+                     @JsonProperty("emitKey") EmitKey emitKey, @JsonProperty("onParseException")
+                             FetchEmitTuple.ON_PARSE_EXCEPTION onParseException,
+                     @JsonProperty("metadataList") List<Metadata> metadataList) {
+        super(emitKey, metadataList);
+        this.taskId = taskId;
+        this.fetchKey = fetchKey;
+        this.onParseException = onParseException;
+    }
+
+    public FetchKey getFetchKey() {
+        return fetchKey;
+    }
 
-    public AsyncData(AsyncTask asyncTask, List<Metadata> metadataList) {
-        super(asyncTask.getEmitKey(), metadataList);
-        this.asyncTask = asyncTask;
+    public long getTaskId() {
+        return taskId;
     }
 
-    public AsyncTask getAsyncTask() {
-        return asyncTask;
+    public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
+        return onParseException;
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
index 02d7fec..502c8b4 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
 public interface AsyncEmitHook {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 071a1fa..4c44f9c 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -17,38 +17,19 @@
 package org.apache.tika.pipes.async;
 
 
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AsyncEmitter implements Callable<Integer> {
 
@@ -56,42 +37,27 @@ public class AsyncEmitter implements Callable<Integer> {
 
 
     private final String connectionString;
-    private final int workerId;
+    private final int emitterId;
     private final Path tikaConfigPath;
     private final Connection connection;
     private final PreparedStatement finished;
     private final PreparedStatement restarting;
-    private final PreparedStatement selectActiveTasks;
-    private final PreparedStatement insertErrorLog;
-    private final PreparedStatement resetStatus;
 
-    public AsyncEmitter(Connection connection,
-                       String connectionString, int workerId,
-                       Path tikaConfigPath) throws SQLException {
+    public AsyncEmitter(Connection connection, String connectionString, int emitterId,
+                        Path tikaConfigPath) throws SQLException {
         this.connectionString = connectionString;
-        this.workerId = workerId;
+        this.emitterId = emitterId;
         this.tikaConfigPath = tikaConfigPath;
         this.connection = connection;
-        String sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
-                " where worker_id = (" + workerId + ")";
+        String sql = "update emitters set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
+                " where emitter_id = (" + emitterId + ")";
         finished = connection.prepareStatement(sql);
 
-        sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
-                " where worker_id = (" + workerId + ")";
+        sql = "update emitters set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
+                " where emitter_id = (" + emitterId + ")";
         restarting = connection.prepareStatement(sql);
-        //this checks if the process was able to reset the status
-        sql = "select id, retry, json from task_queue where worker_id="
-                + workerId +
-                " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
-        selectActiveTasks = connection.prepareStatement(sql);
-
-        //if not, this is called to insert into the error log
-        insertErrorLog = prepareInsertErrorLog(connection);
-
-        //and this is called to reset the status on error
-        resetStatus = prepareReset(connection);
     }
 
     @Override
@@ -105,7 +71,7 @@ public class AsyncEmitter implements Callable<Integer> {
                 if (finished) {
                     int exitValue = p.exitValue();
                     if (exitValue == 0) {
-                        LOG.info("forked emitter process finished with exitValue=0");
+                        LOG.debug("forked emitter process finished with exitValue=0");
                         return 1;
                     }
                     reportCrash(++restarts, exitValue);
@@ -121,72 +87,24 @@ public class AsyncEmitter implements Callable<Integer> {
     }
 
     private Process start() throws IOException {
-        String[] args = new String[]{
-                "java", "-Djava.awt.headless=true",
-                "-cp", System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncEmitterProcess",
-                Integer.toString(workerId)
-        };
+        String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
+                System.getProperty("java.class.path"),
+                "org.apache.tika.pipes.async.AsyncEmitterProcess", Integer.toString(emitterId)};
         ProcessBuilder pb = new ProcessBuilder(args);
         pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
-        pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
-                tikaConfigPath.toAbsolutePath().toString());
+        pb.environment()
+                .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
         pb.inheritIO();
         return pb.start();
     }
 
     private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
-        LOG.warn("worker id={} terminated, exitValue={}",
-                workerId, exitValue);
+        LOG.warn("emitter id={} terminated, exitValue={}", emitterId, exitValue);
         restarting.execute();
-        List<AsyncTask> activeTasks = new ArrayList<>();
-        try (ResultSet rs = selectActiveTasks.executeQuery()) {
-            long taskId = rs.getLong(1);
-            short retry = rs.getShort(2);
-            String json = rs.getString(3);
-            FetchEmitTuple tuple = JsonFetchEmitTuple.fromJson(new StringReader(json));
-            activeTasks.add(new AsyncTask(taskId, retry, tuple));
-        }
-        if (activeTasks.size() == 0) {
-            LOG.info("worker reset active tasks, nothing extra to report");
-            return;
-        }
-        if (activeTasks.size() > 1) {
-            LOG.warn("more than one active task? this should never happen!");
-        }
-
-        for (AsyncTask t : activeTasks) {
-            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
-                    insertErrorLog, resetStatus, LOG);
-        }
-
-    }
-
-    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
-                               PreparedStatement insertErrorLog, PreparedStatement resetStatus,
-                               Logger logger) {
-        try {
-            insertErrorLog.clearParameters();
-            insertErrorLog.setLong(1, task.getTaskId());
-            insertErrorLog.setString(2, task.getFetchKey().getKey());
-            insertErrorLog.setInt(3, task.getRetry());
-            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
-            insertErrorLog.execute();
-        } catch (SQLException e) {
-            logger.error("Can't update error log", e);
-        }
-
-        try {
-            resetStatus.clearParameters();
-            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            resetStatus.setShort(2, (short)(task.getRetry()+1));
-            resetStatus.setLong(3, task.getTaskId());
-            resetStatus.execute();
-        } catch (SQLException e) {
-            logger.error("Can't reset try status", e);
-        }
+        //should we unassign emit tasks here?
     }
 
+ /*
     static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
         //if not, this is called to insert into the error log
         return connection.prepareStatement(
@@ -203,5 +121,5 @@ public class AsyncEmitter implements Callable<Integer> {
                         "time_stamp=CURRENT_TIMESTAMP(), " +
                         "retry=? " +
                         "where id=?");
-    }
+    }*/
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
index 37c5f3a..03f5b93 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
@@ -16,21 +16,9 @@
  */
 package org.apache.tika.pipes.async;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.commons.io.IOUtils;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Paths;
 import java.sql.Blob;
 import java.sql.Connection;
@@ -42,95 +30,184 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.ExceptionUtils;
 
 public class AsyncEmitterProcess {
 
-    //TODO -- parameterize these
-    private long emitWithinMs = 10000;
-    private long emitMaxBytes = 10_000_000;
     private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitterProcess.class);
-
-    private final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+    private final LZ4FastDecompressor decompressor =
+            LZ4Factory.fastestInstance().fastDecompressor();
     private final ObjectMapper objectMapper = new ObjectMapper();
+    private final FutureTask<Integer> stdinWatcher;
     int recordsPerPulse = 10;
+    //TODO -- parameterize these
+    private final long emitWithinMs = 10000;
+    private final long emitMaxBytes = 10_000_000;
     private PreparedStatement markForSelecting;
     private PreparedStatement selectForProcessing;
     private PreparedStatement emitStatusUpdate;
-    private PreparedStatement checkForShutdown;
+    private PreparedStatement checkForCanShutdown;
+    private PreparedStatement checkForShouldShutdown;
+    private PreparedStatement updateEmitterStatus;
+
+    private AsyncEmitterProcess(InputStream stdin) {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
+        objectMapper.registerModule(module);
+        stdinWatcher = new FutureTask<>(new ForkWatcher(stdin));
+        new Thread(stdinWatcher).start();
+    }
 
     public static void main(String[] args) throws Exception {
         String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
-        TikaConfig tikaConfig = new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
+        TikaConfig tikaConfig =
+                new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
         int workerId = Integer.parseInt(args[0]);
         LOG.debug("trying to get connection {} >{}<", workerId, db);
         try (Connection connection = DriverManager.getConnection(db)) {
-            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess();
+            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess(System.in);
             asyncEmitter.execute(connection, workerId, tikaConfig);
         }
         System.exit(0);
     }
 
-    private void execute(Connection connection, int workerId,
-                         TikaConfig tikaConfig) throws SQLException,
-            InterruptedException {
+    private static void reportEmitStatus(List<Long> ids,
+                                         AsyncWorkerProcess.TASK_STATUS_CODES emitted,
+                                         PreparedStatement emitStatusUpdate) throws SQLException {
+        for (long id : ids) {
+            emitStatusUpdate.clearParameters();
+            emitStatusUpdate.setByte(1, (byte) emitted.ordinal());
+            emitStatusUpdate.setLong(2, id);
+            emitStatusUpdate.addBatch();
+        }
+        emitStatusUpdate.executeBatch();
+    }
+
+    private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
+            throws SQLException, InterruptedException {
         prepareStatements(connection, workerId);
+        updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.ACTIVE.ordinal());
         EmitterManager emitterManager = tikaConfig.getEmitterManager();
-        EmitDataCache emitDataCache = new EmitDataCache(emitterManager, emitMaxBytes,
-                emitStatusUpdate);
-        int shouldShutdown = 0;
+        EmitDataCache emitDataCache =
+                new EmitDataCache(emitterManager, emitMaxBytes, emitStatusUpdate);
+        try {
+            mainLoop(emitDataCache);
+        } finally {
+            emitDataCache.emitAll();
+            updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal());
+        }
+    }
+
+    private void mainLoop(EmitDataCache emitDataCache) throws InterruptedException, SQLException {
+
         while (true) {
+            if (shouldShutdown()) {
+                LOG.debug("received should shutdown signal");
+                return;
+            }
             int toEmit = markForSelecting.executeUpdate();
+            if (toEmit == 0 && canShutdown()) {
+                //avoid race condition; double check there's nothing
+                //left to emit
+                toEmit = markForSelecting.executeUpdate();
+                if (toEmit == 0) {
+                    LOG.debug("received can shutdown and didn't update any for selecting");
+                    return;
+                }
+            }
             if (toEmit > 0) {
-                try (ResultSet rs = selectForProcessing.executeQuery()) {
-                    while (rs.next()) {
-                        long id = rs.getLong(1);
-                        Timestamp ts = rs.getTimestamp(2);
-                        int uncompressedSize = rs.getInt(3);
-                        Blob blob = rs.getBlob(4);
-                        try {
-                            tryToEmit(id, ts, uncompressedSize, blob,
-                                    emitDataCache);
-                        } catch (SQLException|IOException e) {
-                            reportEmitStatus(
-                                    Collections.singletonList(id),
-                                    AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
-                                    emitStatusUpdate
-                            );
-                        }
-                    }
+                try {
+                    tryToEmitNextBatch(emitDataCache);
+                } catch (IOException e) {
+                    LOG.warn("IOException trying to emit", e);
                 }
             }
             if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
                 emitDataCache.emitAll();
             }
             Thread.sleep(500);
-            if (shouldShutdown()) {
-                shouldShutdown++;
+        }
+    }
+
+    private void tryToEmitNextBatch(EmitDataCache emitDataCache) throws IOException, SQLException {
+        List<AsyncEmitTuple> toEmitList = new ArrayList<>();
+        try (ResultSet rs = selectForProcessing.executeQuery()) {
+            while (rs.next()) {
+                long id = rs.getLong(1);
+                Timestamp ts = rs.getTimestamp(2);
+                int uncompressedSize = rs.getInt(3);
+                Blob blob = rs.getBlob(4);
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                IOUtils.copyLarge(blob.getBinaryStream(), bos);
+                byte[] bytes = bos.toByteArray();
+                toEmitList.add(new AsyncEmitTuple(id, ts, uncompressedSize, bytes));
             }
-            //make sure to test twice
-            if (shouldShutdown > 1) {
-                emitDataCache.emitAll();
-                return;
+        }
+        List<Long> successes = new ArrayList<>();
+        List<Long> exceptions = new ArrayList<>();
+        for (AsyncEmitTuple tuple : toEmitList) {
+            try {
+                tryToEmit(tuple, emitDataCache);
+                successes.add(tuple.id);
+            } catch (IOException | SQLException e) {
+                exceptions.add(tuple.id);
             }
         }
+        reportEmitStatus(successes, AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS_EMIT,
+                emitStatusUpdate);
+        reportEmitStatus(exceptions, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
+                emitStatusUpdate);
+
+    }
+
+    private void updateStatus(byte status) throws SQLException {
+        updateEmitterStatus.clearParameters();
+        updateEmitterStatus.setByte(1, status);
+        updateEmitterStatus.executeUpdate();
     }
 
-    private void tryToEmit(long id, Timestamp ts,
-                           int decompressedLength,
-                           Blob blob, EmitDataCache emitDataCache)
+    private void tryToEmit(AsyncEmitTuple asyncEmitTuple, EmitDataCache emitDataCache)
             throws SQLException, IOException {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        IOUtils.copyLarge(blob.getBinaryStream(), bos);
-        AsyncData asyncData = deserialize(bos.toByteArray(), decompressedLength);
+        AsyncData asyncData = deserialize(asyncEmitTuple.bytes, asyncEmitTuple.uncompressedSize);
         emitDataCache.add(asyncData);
     }
 
     boolean shouldShutdown() throws SQLException {
-        try (ResultSet rs = checkForShutdown.executeQuery()) {
+        if (stdinWatcher.isDone()) {
+            LOG.info("parent inputstream closed; shutting down now");
+        }
+        try (ResultSet rs = checkForShouldShutdown.executeQuery()) {
+            if (rs.next()) {
+                int val = rs.getInt(1);
+                return val > 0;
+            }
+        }
+        return false;
+    }
+
+    boolean canShutdown() throws SQLException {
+        try (ResultSet rs = checkForCanShutdown.executeQuery()) {
             if (rs.next()) {
                 int val = rs.getInt(1);
                 return val > 0;
@@ -141,54 +218,57 @@ public class AsyncEmitterProcess {
 
     private void prepareStatements(Connection connection, int workerId) throws SQLException {
         String sql = "update task_queue set status=" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal()+
-                ", worker_id="+workerId+", time_stamp=CURRENT_TIMESTAMP()"+
-                " where id in " +
-                " (select id from task_queue "+//where worker_id = " + workerId +
-                " where status="+ AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
-                " order by time_stamp asc limit "+recordsPerPulse+" for update)";
+                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + ", worker_id=" +
+                workerId + ", time_stamp=CURRENT_TIMESTAMP()" + " where id in " +
+                " (select id from task_queue " + //where worker_id = " + workerId +
+                " where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
+                " order by time_stamp asc limit " + recordsPerPulse + " for update)";
         markForSelecting = connection.prepareStatement(sql);
 
-        sql = "select q.id, q.time_stamp, uncompressed_size, bytes from emits e " +
-                "join task_queue q " +
-                "where q.status=" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() +
-                " and worker_id=" + workerId +
-                " order by time_stamp asc";
+        sql = "select q.id, q.time_stamp, uncompressed_size, bytes " + "from emits e " +
+                "join task_queue q on e.id=q.id " + "where q.status=" +
+                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + " and worker_id=" +
+                workerId + " order by time_stamp asc";
         selectForProcessing = connection.prepareStatement(sql);
 
-        sql = "update task_queue set status=?"+
-                ", time_stamp=CURRENT_TIMESTAMP()"+
-                " where id=?";
+        //only update the status if it is not already emitted or failed emit
+        sql = "update task_queue set status=?" + ", time_stamp=CURRENT_TIMESTAMP()" +
+                " where id=? and status not in (" +
+                AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED.ordinal() + ", " +
+                AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT.ordinal() + ")";
+
         emitStatusUpdate = connection.prepareStatement(sql);
 
-        sql = "select count(1) from workers where worker_id=" + workerId +
-                " and status="+ AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
-        checkForShutdown = connection.prepareStatement(sql);
-    }
+        sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
+        checkForCanShutdown = connection.prepareStatement(sql);
 
+        sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+        checkForShouldShutdown = connection.prepareStatement(sql);
 
-    private AsyncData deserialize(byte[] compressed, int decompressedLength)
-            throws IOException {
-        byte[] restored = new byte[decompressedLength];
-        int compressedLength2 = decompressor.decompress(compressed, 0, restored,
-                0, decompressedLength);
+        sql = "merge into emitters key (emitter_id) " + "values (" + workerId + ", ? )";
+        updateEmitterStatus = connection.prepareStatement(sql);
+    }
 
-        return objectMapper.readerFor(AsyncTask.class).readValue(restored);
+    private AsyncData deserialize(byte[] compressed, int decompressedLength) throws IOException {
+        byte[] restored = new byte[decompressedLength];
+        int compressedLength2 =
+                decompressor.decompress(compressed, 0, restored, 0, decompressedLength);
+        return objectMapper.readerFor(AsyncData.class).readValue(restored);
     }
 
     private static class EmitDataCache {
         private final EmitterManager emitterManager;
         private final long maxBytes;
         private final PreparedStatement emitStatusUpdate;
-        private Instant lastAdded = Instant.now();
-
         long estimatedSize = 0;
         int size = 0;
         Map<String, List<AsyncData>> map = new HashMap<>();
+        private Instant lastAdded = Instant.now();
 
-        public EmitDataCache(EmitterManager emitterManager,
-                             long maxBytes, PreparedStatement emitStatusUpdate) {
+        public EmitDataCache(EmitterManager emitterManager, long maxBytes,
+                             PreparedStatement emitStatusUpdate) {
             this.emitterManager = emitterManager;
             this.maxBytes = maxBytes;
             this.emitStatusUpdate = emitStatusUpdate;
@@ -201,10 +281,11 @@ public class AsyncEmitterProcess {
         void add(AsyncData data) {
 
             size++;
-            long sz = AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
+            long sz = AbstractEmitter
+                    .estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
             if (estimatedSize + sz > maxBytes) {
                 LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
-                        (estimatedSize+sz), maxBytes);
+                        (estimatedSize + sz), maxBytes);
                 emitAll();
             }
             List<AsyncData> cached = map.get(data.getEmitKey().getEmitterName());
@@ -240,7 +321,7 @@ public class AsyncEmitterProcess {
                 throws SQLException {
             List<Long> ids = new ArrayList<>();
             for (AsyncData d : cachedEmitData) {
-                ids.add(d.getAsyncTask().getTaskId());
+                ids.add(d.getTaskId());
             }
             try {
                 emitter.emit(cachedEmitData);
@@ -250,27 +331,49 @@ public class AsyncEmitterProcess {
                 reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
                         emitStatusUpdate);
             }
-            reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED,
-                    emitStatusUpdate);
+            reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED, emitStatusUpdate);
             return 1;
         }
 
 
         public boolean exceedsEmitWithin(long emitWithinMs) {
-            return ChronoUnit.MILLIS.between(lastAdded, Instant.now())
-                    > emitWithinMs;
+            return ChronoUnit.MILLIS.between(lastAdded, Instant.now()) > emitWithinMs;
         }
     }
 
-    private static void reportEmitStatus(List<Long> ids,
-                                         AsyncWorkerProcess.TASK_STATUS_CODES emitted,
-                                         PreparedStatement emitStatusUpdate)
-            throws SQLException {
-        for (long id : ids) {
-            emitStatusUpdate.clearParameters();
-            emitStatusUpdate.setByte(1, (byte)emitted.ordinal());
-            emitStatusUpdate.setLong(2, id);
-            emitStatusUpdate.executeUpdate();
+    private static class ForkWatcher implements Callable<Integer> {
+        private final InputStream in;
+
+        public ForkWatcher(InputStream in) {
+            this.in = in;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            //this should block forever
+            //if the forking process dies,
+            // this will either throw an IOException or read -1.
+            try {
+                int i = in.read();
+            } finally {
+                LOG.warn("forking process shutdown; exiting now");
+                System.exit(0);
+            }
+            return 1;
+        }
+    }
+
+    private static class AsyncEmitTuple {
+        final long id;
+        final Timestamp timestamp;
+        final int uncompressedSize;
+        final byte[] bytes;
+
+        public AsyncEmitTuple(long id, Timestamp timestamp, int uncompressedSize, byte[] bytes) {
+            this.id = id;
+            this.timestamp = timestamp;
+            this.uncompressedSize = uncompressedSize;
+            this.bytes = bytes;
         }
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
index 3828b24..79ed80d 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
@@ -1,12 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class AsyncPipesEmitHook implements AsyncEmitHook {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncPipesEmitHook.class);
@@ -14,7 +30,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
     private final PreparedStatement markSuccess;
     private final PreparedStatement markFailure;
 
-    public AsyncPipesEmitHook(Connection connection) throws SQLException  {
+    public AsyncPipesEmitHook(Connection connection) throws SQLException {
         String sql = "delete from task_queue where id=?";
         markSuccess = connection.prepareStatement(sql);
         //TODO --fix this
@@ -28,7 +44,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
             markSuccess.setLong(1, task.getTaskId());
             markSuccess.execute();
         } catch (SQLException e) {
-            LOG.warn("problem with on success: "+task.getTaskId(), e);
+            LOG.warn("problem with on success: " + task.getTaskId(), e);
         }
     }
 
@@ -39,7 +55,7 @@ public class AsyncPipesEmitHook implements AsyncEmitHook {
             markFailure.setLong(1, task.getTaskId());
             markFailure.execute();
         } catch (SQLException e) {
-            LOG.warn("problem with on fail: "+task.getTaskId(), e);
+            LOG.warn("problem with on fail: " + task.getTaskId(), e);
         }
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index b4d8a8a..efd6fec 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -1,14 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
 import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -17,8 +28,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -28,44 +41,69 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
 public class AsyncProcessor implements Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
     protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
     protected static String TIKA_ASYNC_CONFIG_FILE_KEY = "TIKA_ASYNC_CONFIG_FILE_KEY";
     private final Path tikaConfigPath;
-    private AsyncConfig asyncConfig;
     private final ArrayBlockingQueue<FetchEmitTuple> queue;
     private final Connection connection;
-    private int finishedThreads = 0;
     private final int totalThreads;
+    private final AsyncConfig asyncConfig;
+    private PreparedStatement emittersCanShutdown;
+    private volatile boolean isShuttingDown = false;
+    private AssignmentManager assignmentManager;
+    private int finishedThreads = 0;
     private ExecutorService executorService;
     private ExecutorCompletionService<Integer> executorCompletionService;
 
+    private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
+        this.tikaConfigPath = tikaConfigPath;
+        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
+        this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
+        this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
+        this.totalThreads = asyncConfig.getNumWorkers() + asyncConfig.getNumEmitters() +
+                2;//assignment manager and enqueuer threads
+    }
+
     public static AsyncProcessor build(Path tikaConfigPath) throws AsyncRuntimeException {
         try {
             AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
-
             processor.init();
             return processor;
-        } catch (SQLException|IOException e) {
+        } catch (SQLException | IOException e) {
             throw new AsyncRuntimeException(e);
         }
     }
 
-    private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
-        this.tikaConfigPath = tikaConfigPath;
-        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
-        this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
-        this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
-        this.totalThreads = asyncConfig.getMaxConsumers() + 2 + 1;
+    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
+        PreparedStatement findActiveWorkers =
+                connection.prepareStatement("select worker_id from workers");
+        List<Integer> workers = new ArrayList<>();
+        try (ResultSet rs = findActiveWorkers.executeQuery()) {
+            while (rs.next()) {
+                workers.add(rs.getInt(1));
+            }
+        }
+        return workers;
     }
 
-    public synchronized boolean offer (
-            List<FetchEmitTuple> fetchEmitTuples, long offerMs) throws
-            AsyncRuntimeException, InterruptedException {
+    public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs)
+            throws AsyncRuntimeException, InterruptedException {
         if (queue == null) {
             throw new IllegalStateException("queue hasn't been initialized yet.");
+        } else if (isShuttingDown) {
+            throw new IllegalStateException(
+                    "Can't call offer after calling close() or " + "shutdownNow()");
         }
         long start = System.currentTimeMillis();
         long elapsed = System.currentTimeMillis() - start;
@@ -87,6 +125,12 @@ public class AsyncProcessor implements Closeable {
 
     public synchronized boolean offer(FetchEmitTuple t, long offerMs)
             throws AsyncRuntimeException, InterruptedException {
+        if (queue == null) {
+            throw new IllegalStateException("queue hasn't been initialized yet.");
+        } else if (isShuttingDown) {
+            throw new IllegalStateException(
+                    "Can't call offer after calling close() or " + "shutdownNow()");
+        }
         checkActive();
         return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
     }
@@ -99,8 +143,7 @@ public class AsyncProcessor implements Closeable {
      * @throws AsyncRuntimeException
      * @throws InterruptedException
      */
-    public synchronized boolean checkActive()
-            throws AsyncRuntimeException, InterruptedException {
+    public synchronized boolean checkActive() throws AsyncRuntimeException, InterruptedException {
         Future<Integer> future = executorCompletionService.poll();
         if (future != null) {
             try {
@@ -110,74 +153,87 @@ public class AsyncProcessor implements Closeable {
             }
             finishedThreads++;
         }
-        if (finishedThreads == totalThreads) {
-            return false;
-        }
-        return true;
+        return finishedThreads != totalThreads;
     }
 
     private void init() throws SQLException {
 
         setupTables();
+        String sql = "update emitters set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
+        this.emittersCanShutdown = connection.prepareStatement(sql);
+        executorService = Executors.newFixedThreadPool(totalThreads);
+        executorCompletionService = new ExecutorCompletionService<>(executorService);
 
-        executorService = Executors.newFixedThreadPool(
-                totalThreads);
-        executorCompletionService =
-                new ExecutorCompletionService<>(executorService);
+        AsyncTaskEnqueuer taskEnqueuer = new AsyncTaskEnqueuer(queue, connection);
 
-        AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(queue, connection);
+        executorCompletionService.submit(taskEnqueuer);
 
-        executorCompletionService.submit(enqueuer);
-        executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
-        //executorCompletionService.submit(new )
-        for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
-            executorCompletionService.submit(new AsyncWorker(connection,
-                    asyncConfig.getJdbcString(), i, tikaConfigPath));
+        List<AsyncWorker> workers = buildWorkers(connection, asyncConfig, tikaConfigPath);
+        int maxRetries = 0;
+        for (AsyncWorker worker : workers) {
+            if (worker.getMaxRetries() > maxRetries) {
+                maxRetries = worker.getMaxRetries();
+            }
+            executorCompletionService.submit(worker);
+        }
+        assignmentManager = new AssignmentManager(connection, taskEnqueuer, maxRetries);
+        executorCompletionService.submit(assignmentManager);
+        for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
+            executorCompletionService
+                    .submit(new AsyncEmitter(connection, asyncConfig.getJdbcString(),
+                            asyncConfig.getNumWorkers() + i, tikaConfigPath));
         }
-        executorCompletionService.submit(new AsyncEmitter(connection,
-                asyncConfig.getJdbcString(), asyncConfig.getMaxConsumers(),
-                tikaConfigPath));
+    }
+
+    private List<AsyncWorker> buildWorkers(Connection connection, AsyncConfig asyncConfig,
+                                           Path tikaConfigPath) throws SQLException {
+        //TODO -- make these workers configurable via the tika config, e.g. max retries
+        //and jvm args, etc.
+        List<AsyncWorker> workers = new ArrayList<>();
+        for (int i = 0; i < asyncConfig.getNumWorkers(); i++) {
+            workers.add(
+                    new AsyncWorker(connection, asyncConfig.getJdbcString(), i, tikaConfigPath));
+        }
+        return workers;
     }
 
     private void setupTables() throws SQLException {
 
-        String sql = "create table task_queue " +
-                "(id bigint auto_increment primary key," +
-                "status tinyint," +//byte
-                "worker_id integer," +
-                "retry smallint," + //short
+        String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
+                "status tinyint," + //byte
+                "worker_id integer," + "retry smallint," + //short
                 "time_stamp timestamp," +
                 "json varchar(64000))";//this is the AsyncTask ... not the emit data!
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
-        //no clear benefit to creating an index on timestamp
-//        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
         sql = "create table workers (worker_id int primary key, status tinyint)";
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
 
-        sql = "create table error_log (task_id bigint, " +
-                "fetch_key varchar(10000)," +
-                "time_stamp timestamp," +
-                "retry integer," +
-                "error_code tinyint)";
+        sql = "create table emitters (emitter_id int primary key, status tinyint)";
+        try (Statement st = connection.createStatement()) {
+            st.execute(sql);
+        }
+
+        sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
+                "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
+
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
 
-        sql = "create table emits (" +
-                "id bigint primary key, " +
-                "time_stamp timestamp, "+
-                "uncompressed_size bigint, " +
-                "bytes blob)";
+        sql = "create table emits (" + "id bigint primary key, " + "time_stamp timestamp, " +
+                "uncompressed_size bigint, " + "bytes blob)";
         try (Statement st = connection.createStatement()) {
             st.execute(sql);
         }
     }
 
     public void shutdownNow() throws IOException, AsyncRuntimeException {
+        isShuttingDown = true;
         try {
             executorService.shutdownNow();
         } finally {
@@ -189,50 +245,65 @@ public class AsyncProcessor implements Closeable {
     }
 
     /**
-     * This is a blocking close.  If you need to shutdown immediately,
-     * try {@link #shutdownNow()}.
+     * This is a blocking close.  It will wait for all tasks successfully submitted before this
+     * call to close() to complete before closing.  If you need to shutdown immediately, try
+     * {@link #shutdownNow()}.
+     *
      * @throws IOException
      */
     @Override
     public void close() throws IOException {
+        isShuttingDown = true;
         try {
-            for (int i = 0; i < asyncConfig.getMaxConsumers(); i++) {
-                try {
-                    //blocking
-                    queue.put(FetchIterator.COMPLETED_SEMAPHORE);
-                } catch (InterruptedException e) {
-                    //swallow
-                }
-            }
-            //TODO: clean this up
-            String sql = "update workers set status="+
-                    AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
-                    " where worker_id = (" + asyncConfig.getMaxConsumers() + ")";
-            try {
-                connection.prepareStatement(sql).execute();
-            } catch (SQLException throwables) {
-                throwables.printStackTrace();
-            }
-            long start = System.currentTimeMillis();
-            long elapsed = System.currentTimeMillis() - start;
-            try {
-                boolean isActive = checkActive();
-                while (isActive) {
-                    isActive = checkActive();
-                    elapsed = System.currentTimeMillis();
-                }
-            } catch (InterruptedException e) {
-                return;
-            }
+            completeAndShutdown();
+        } catch (SQLException | InterruptedException e) {
+            throw new IOException(e);
         } finally {
             executorService.shutdownNow();
+            SQLException ex = null;
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                ex = e;
+            }
             //close down processes and db
             if (asyncConfig.getTempDBDir() != null) {
                 FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
             }
+            if (ex != null) {
+                throw new IOException(ex);
+            }
         }
     }
 
+    //this will block until everything finishes
+    private void completeAndShutdown() throws SQLException, InterruptedException {
+
+        //blocking...notify taskEnqueuer
+        queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+
+        //wait for assignmentManager to finish
+        //it will only complete after the task enqueuer has completed
+        //and there are no more parse tasks available, selected or in process
+        while (!assignmentManager.hasCompletedTasks()) {
+            Thread.sleep(100);
+        }
+
+        emittersCanShutdown.executeUpdate();
+
+        //wait for emitters to finish
+        long start = System.currentTimeMillis();
+        long elapsed = System.currentTimeMillis() - start;
+        try {
+            boolean isActive = checkActive();
+            while (isActive) {
+                isActive = checkActive();
+                elapsed = System.currentTimeMillis();
+            }
+        } catch (InterruptedException e) {
+            return;
+        }
+    }
 
     //this reads fetchemittuples from the queue and inserts them in the db
     //for the workers to read
@@ -245,8 +316,8 @@ public class AsyncProcessor implements Closeable {
 
         private volatile boolean isComplete = false;
 
-        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue,
-                          Connection connection) throws SQLException {
+        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
+                throws SQLException {
             this.queue = queue;
             this.connection = connection;
             String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
@@ -268,7 +339,7 @@ public class AsyncProcessor implements Closeable {
                 } else {
                     long start = System.currentTimeMillis();
                     long elapsed = System.currentTimeMillis() - start;
-                    //TODO -- fix this
+                    //TODO -- fix this -- this loop waits for workers to register
                     while (workers.size() == 0 && elapsed < 600000) {
                         workers = getActiveWorkers(connection);
                         Thread.sleep(100);
@@ -283,7 +354,8 @@ public class AsyncProcessor implements Closeable {
             return isComplete;
         }
 
-        private void insert(FetchEmitTuple t, List<Integer> workers) throws IOException, SQLException {
+        private void insert(FetchEmitTuple t, List<Integer> workers)
+                throws IOException, SQLException {
             int workerId = workers.size() == 1 ? workers.get(0) :
                     workers.get(random.nextInt(workers.size()));
             insert.clearParameters();
@@ -305,18 +377,24 @@ public class AsyncProcessor implements Closeable {
         private final PreparedStatement reallocate;
         private final PreparedStatement countAvailableTasks;
         private final PreparedStatement shutdownWorker;
+        private final PreparedStatement findMaxRetrieds;
+        private final PreparedStatement logMaxRetrieds;
+        private final PreparedStatement removeMaxRetrieds;
         private final Random random = new Random();
+        private final int maxRetries;
+        private volatile boolean hasCompleted = false;
 
 
-        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer) throws SQLException {
+        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer, int maxRetries)
+                throws SQLException {
             this.connection = connection;
             this.enqueuer = enqueuer;
+            this.maxRetries = maxRetries;
             //this gets workers and # of tasks in desc order of number of tasks
-            String sql = "select w.worker_id, p.cnt " +
-                    "from workers w " +
+            String sql = "select w.worker_id, p.cnt " + "from workers w " +
                     "left join (select worker_id, count(1) as cnt from task_queue " +
-                    "where status=0 group by worker_id)" +
-                    " p on p.worker_id=w.worker_id order by p.cnt desc";
+                    "where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() +
+                    " group by worker_id)" + " p on p.worker_id=w.worker_id order by p.cnt desc";
             getQueueDistribution = connection.prepareStatement(sql);
             //find workers that have assigned tasks but are not in the
             //workers table
@@ -331,26 +409,46 @@ public class AsyncProcessor implements Closeable {
             //current strategy reallocate tasks from longest queue to shortest
             //TODO: might consider randomly shuffling or other algorithms
             sql = "update task_queue set worker_id= ? where id in " +
-                    "(select id from task_queue where " +
-                    "worker_id = ? and " +
-                    "rand() < 0.8 " +
+                    "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
                     "and status=0 for update)";
             reallocate = connection.prepareStatement(sql);
 
-            sql = "select count(1) from task_queue where status="
-                    + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
+            //get those tasks that are in the parse phase
+            //if they are selected or in process, it is possible that
+            //they'll need to be retried.  So, include all statuses
+            //meaning that the parse has not completed.
+            sql = "select count(1) from task_queue where status in (" +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() + ", " +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED.ordinal() + ", " +
+                    AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal() + ")";
             countAvailableTasks = connection.prepareStatement(sql);
 
-            sql = "update workers set status="+
+            sql = "update workers set status=" +
                     AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
-                " where worker_id = ?";
+                    " where worker_id = ?";
             shutdownWorker = connection.prepareStatement(sql);
+
+            sql = "select id, retry, json from task_queue where retry >=" + maxRetries;
+            findMaxRetrieds = connection.prepareStatement(sql);
+
+            sql = "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code)" +
+                    "values (?,?,CURRENT_TIMESTAMP(), ?," +
+                    AsyncWorkerProcess.ERROR_CODES.MAX_RETRIES.ordinal() + ")";
+            logMaxRetrieds = connection.prepareStatement(sql);
+
+            sql = "delete from task_queue where id=?";
+            removeMaxRetrieds = connection.prepareStatement(sql);
+        }
+
+        protected boolean hasCompletedTasks() {
+            return hasCompleted;
         }
 
         @Override
         public Integer call() throws Exception {
 
             while (true) {
+                removeMaxRetrieds();
                 List<Integer> missingWorkers = getMissingWorkers();
                 reallocateFromMissingWorkers(missingWorkers);
                 redistribute();
@@ -362,6 +460,39 @@ public class AsyncProcessor implements Closeable {
             }
         }
 
+        private void removeMaxRetrieds() throws SQLException {
+            Set<Long> toRemove = new HashSet<>();
+            try (ResultSet rs = findMaxRetrieds.executeQuery()) {
+                while (rs.next()) {
+                    long id = rs.getLong(1);
+                    String json = rs.getString(2);
+                    int retries = rs.getInt(3);
+                    toRemove.add(id);
+                    FetchEmitTuple t;
+                    try (Reader reader = new StringReader(json)) {
+                        t = JsonFetchEmitTuple.fromJson(reader);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        //need to log this in the error_logs table
+                        continue;
+                    }
+                    logMaxRetrieds.clearParameters();
+                    logMaxRetrieds.setLong(1, id);
+                    logMaxRetrieds.setString(2, t.getFetchKey().getFetchKey());
+                    logMaxRetrieds.setInt(3, retries);
+                    logMaxRetrieds.addBatch();
+                }
+            }
+            logMaxRetrieds.executeBatch();
+
+            for (Long id : toRemove) {
+                removeMaxRetrieds.clearParameters();
+                removeMaxRetrieds.setLong(1, id);
+                removeMaxRetrieds.addBatch();
+            }
+            removeMaxRetrieds.executeBatch();
+        }
+
         private void notifyWorkers() throws SQLException {
             for (int workerId : getActiveWorkers(connection)) {
                 shutdownWorker.clearParameters();
@@ -371,12 +502,19 @@ public class AsyncProcessor implements Closeable {
         }
 
         private boolean isComplete() throws SQLException {
+            if (hasCompleted) {
+                return hasCompleted;
+            }
             if (!enqueuer.isComplete) {
                 return false;
             }
             try (ResultSet rs = countAvailableTasks.executeQuery()) {
                 while (rs.next()) {
-                    return rs.getInt(1) == 0;
+                    int availTasks = rs.getInt(1);
+                    if (availTasks == 0) {
+                        hasCompleted = true;
+                        return true;
+                    }
                 }
             }
             return false;
@@ -419,7 +557,8 @@ public class AsyncProcessor implements Closeable {
 
         }
 
-        private void reallocateFromMissingWorkers(List<Integer> missingWorkers) throws SQLException {
+        private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
+                throws SQLException {
 
             if (missingWorkers.size() == 0) {
                 return;
@@ -436,8 +575,7 @@ public class AsyncProcessor implements Closeable {
                 allocateNonworkersToWorkers.setInt(1, active);
                 allocateNonworkersToWorkers.setInt(2, missing);
                 allocateNonworkersToWorkers.execute();
-                LOG.debug("allocating missing working ({}) to ({})",
-                        missing, active);
+                LOG.debug("allocating missing working ({}) to ({})", missing, active);
             }
         }
 
@@ -453,16 +591,4 @@ public class AsyncProcessor implements Closeable {
             return missingWorkers;
         }
     }
-
-    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
-        PreparedStatement findActiveWorkers = connection.prepareStatement(
-                "select worker_id from workers");
-        List<Integer> workers = new ArrayList<>();
-        try (ResultSet rs = findActiveWorkers.executeQuery()) {
-            while (rs.next()) {
-                workers.add(rs.getInt(1));
-            }
-        }
-        return workers;
-    }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index 24acdf8..e0c214a 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -1,20 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.tika.pipes.emitter.EmitKey;
+
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncTask extends FetchEmitTuple {
 
-    public static final AsyncTask SHUTDOWN_SEMAPHORE
-            = new AsyncTask(-1, (short)-1, new FetchEmitTuple(null, null, null));
-
-    private long taskId;
+    public static final AsyncTask SHUTDOWN_SEMAPHORE =
+            new AsyncTask(-1, (short) -1, new FetchEmitTuple(null, null, null));
     private final short retry;
+    private long taskId;
 
     public AsyncTask(@JsonProperty("taskId") long taskId, @JsonProperty("retry") short retry,
                      @JsonProperty("fetchEmitTuple") FetchEmitTuple fetchEmitTuple) {
-        super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(), fetchEmitTuple.getMetadata());
+        super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(),
+                fetchEmitTuple.getMetadata());
         this.taskId = taskId;
         this.retry = retry;
     }
@@ -23,18 +39,16 @@ public class AsyncTask extends FetchEmitTuple {
         return taskId;
     }
 
+    public void setTaskId(long taskId) {
+        this.taskId = taskId;
+    }
+
     public short getRetry() {
         return retry;
     }
 
-    public void setTaskId(long taskId) {
-        this.taskId = taskId;
-    }
     @Override
     public String toString() {
-        return "AsyncTask{" +
-                "taskId=" + taskId +
-                ", retry=" + retry +
-                '}';
+        return "AsyncTask{" + "taskId=" + taskId + ", retry=" + retry + '}';
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index 0b685a2..c26ee0f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.ProcessUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
+import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
 
 import java.io.IOException;
 import java.io.StringReader;
@@ -18,8 +31,11 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 /**
  * This controls monitoring of the AsyncWorkerProcess
@@ -39,26 +55,26 @@ public class AsyncWorker implements Callable<Integer> {
     private final PreparedStatement selectActiveTasks;
     private final PreparedStatement insertErrorLog;
     private final PreparedStatement resetStatus;
+    //TODO: make this configurable
+    private final int maxRetries = 2;
 
-    public AsyncWorker(Connection connection,
-                       String connectionString, int workerId,
+    public AsyncWorker(Connection connection, String connectionString, int workerId,
                        Path tikaConfigPath) throws SQLException {
         this.connectionString = connectionString;
         this.workerId = workerId;
         this.tikaConfigPath = tikaConfigPath;
         this.connection = connection;
-        String sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.SHUTDOWN.ordinal()+
+        String sql = "update workers set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
                 " where worker_id = (" + workerId + ")";
         finished = connection.prepareStatement(sql);
 
-        sql = "update workers set status="+
-                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal()+
+        sql = "update workers set status=" +
+                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
                 " where worker_id = (" + workerId + ")";
         restarting = connection.prepareStatement(sql);
         //this checks if the process was able to reset the status
-        sql = "select id, retry, json from task_queue where worker_id="
-                + workerId +
+        sql = "select id, retry, json from task_queue where worker_id=" + workerId +
                 " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
         selectActiveTasks = connection.prepareStatement(sql);
 
@@ -69,6 +85,45 @@ public class AsyncWorker implements Callable<Integer> {
         resetStatus = prepareReset(connection);
     }
 
+    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
+                               PreparedStatement insertErrorLog, PreparedStatement resetStatus,
+                               Logger logger) {
+        try {
+            insertErrorLog.clearParameters();
+            insertErrorLog.setLong(1, task.getTaskId());
+            insertErrorLog.setString(2, task.getFetchKey().getFetchKey());
+            insertErrorLog.setInt(3, task.getRetry());
+            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
+            insertErrorLog.execute();
+        } catch (SQLException e) {
+            logger.error("Can't update error log", e);
+        }
+
+        try {
+            resetStatus.clearParameters();
+            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
+            resetStatus.setShort(2, (short) (task.getRetry() + 1));
+            resetStatus.setLong(3, task.getTaskId());
+            resetStatus.execute();
+        } catch (SQLException e) {
+            logger.error("Can't reset try status", e);
+        }
+    }
+
+    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
+        //if not, this is called to insert into the error log
+        return connection.prepareStatement(
+                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
+                        " values (?,?,CURRENT_TIMESTAMP(),?,?)");
+    }
+
+    static PreparedStatement prepareReset(Connection connection) throws SQLException {
+        //and this is called to reset the status on error
+        return connection.prepareStatement(
+                "update task_queue set " + "status=?, " + "time_stamp=CURRENT_TIMESTAMP(), " +
+                        "retry=? " + "where id=?");
+    }
+
     @Override
     public Integer call() throws Exception {
         Process p = null;
@@ -80,7 +135,7 @@ public class AsyncWorker implements Callable<Integer> {
                 if (finished) {
                     int exitValue = p.exitValue();
                     if (exitValue == 0) {
-                        LOG.info("forked worker process finished with exitValue=0");
+                        LOG.debug("forked worker process finished with exitValue=0");
                         return 1;
                     }
                     reportCrash(++restarts, exitValue);
@@ -95,24 +150,24 @@ public class AsyncWorker implements Callable<Integer> {
         }
     }
 
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
     private Process start() throws IOException {
-        String[] args = new String[]{
-                "java", "-Djava.awt.headless=true",
-                "-cp", System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncWorkerProcess",
-                Integer.toString(workerId)
-        };
+        String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
+                System.getProperty("java.class.path"),
+                "org.apache.tika.pipes.async.AsyncWorkerProcess", Integer.toString(workerId)};
         ProcessBuilder pb = new ProcessBuilder(args);
         pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
-        pb.environment().put(TIKA_ASYNC_CONFIG_FILE_KEY,
-                tikaConfigPath.toAbsolutePath().toString());
+        pb.environment()
+                .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
         pb.inheritIO();
         return pb.start();
     }
 
     private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
-        LOG.warn("worker id={} terminated, exitValue={}",
-                workerId, exitValue);
+        LOG.warn("worker id={} terminated, exitValue={}", workerId, exitValue);
         restarting.execute();
         List<AsyncTask> activeTasks = new ArrayList<>();
         try (ResultSet rs = selectActiveTasks.executeQuery()) {
@@ -123,60 +178,18 @@ public class AsyncWorker implements Callable<Integer> {
             activeTasks.add(new AsyncTask(taskId, retry, tuple));
         }
         if (activeTasks.size() == 0) {
-            LOG.info("worker reset active tasks, nothing extra to report");
+            LOG.debug("worker reset active tasks, nothing extra to report");
             return;
         }
+
         if (activeTasks.size() > 1) {
             LOG.warn("more than one active task? this should never happen!");
         }
 
         for (AsyncTask t : activeTasks) {
-            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE,
-                    insertErrorLog, resetStatus, LOG);
-        }
-
-    }
-
-    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
-                             PreparedStatement insertErrorLog, PreparedStatement resetStatus,
-                             Logger logger) {
-        try {
-            insertErrorLog.clearParameters();
-            insertErrorLog.setLong(1, task.getTaskId());
-            insertErrorLog.setString(2, task.getFetchKey().getKey());
-            insertErrorLog.setInt(3, task.getRetry());
-            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
-            insertErrorLog.execute();
-        } catch (SQLException e) {
-            logger.error("Can't update error log", e);
-        }
-
-        try {
-            resetStatus.clearParameters();
-            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            resetStatus.setShort(2, (short)(task.getRetry()+1));
-            resetStatus.setLong(3, task.getTaskId());
-            resetStatus.execute();
-        } catch (SQLException e) {
-            logger.error("Can't reset try status", e);
+            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE, insertErrorLog,
+                    resetStatus, LOG);
         }
-    }
-
-    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
-        //if not, this is called to insert into the error log
-        return connection.prepareStatement(
-                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
-                " values (?,?,CURRENT_TIMESTAMP(),?,?)"
-        );
-    }
 
-    static PreparedStatement prepareReset(Connection connection) throws SQLException {
-        //and this is called to reset the status on error
-        return connection.prepareStatement(
-                "update task_queue set " +
-                        "status=?, " +
-                        "time_stamp=CURRENT_TIMESTAMP(), " +
-                        "retry=? " +
-                        "where id=?");
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index 8675f22..988748f 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -1,27 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.EncryptedDocumentException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.sax.RecursiveParserWrapperHandler;
-import org.apache.tika.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
+import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
+import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
+import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -44,52 +42,37 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
-import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
-
-public class AsyncWorkerProcess {
-
-    enum TASK_STATUS_CODES {
-        AVAILABLE,
-        SELECTED,
-        IN_PROCESS,
-        AVAILABLE_EMIT,
-        SELECTED_EMIT,
-        IN_PROCESS_EMIT,
-        FAILED_EMIT,
-        EMITTED
-    }
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import net.jpountz.lz4.LZ4Factory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
 
-    public enum WORKER_STATUS_CODES {
-        ACTIVE,
-        RESTARTING,
-        HIBERNATING,
-        SHOULD_SHUTDOWN,
-        SHUTDOWN
-    }
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.StringUtils;
 
-    enum ERROR_CODES {
-        TIMEOUT,
-        SECURITY_EXCEPTION,
-        OTHER_EXCEPTION,
-        OOM,
-        OTHER_ERROR,
-        UNKNOWN_PARSE,
-        EMIT_SERIALIZATION,
-        EMIT_SQL_INSERT_EXCEPTION,
-        EMIT_SQL_SELECT_EXCEPTION,
-        EMIT_DESERIALIZATION,
-        EMIT_EXCEPTION
-    }
+public class AsyncWorkerProcess {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerProcess.class);
-
     //make these all configurable
     private static final long SHUTDOWN_AFTER_MS = 120000;
-    private static long PULSE_MS = 1000;
-    private long parseTimeoutMs = 60000;
+    private static final long PULSE_MS = 1000;
+    private final long parseTimeoutMs = 60000;
 
     public static void main(String[] args) throws Exception {
         Path tikaConfigPath = Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
@@ -104,15 +87,15 @@ public class AsyncWorkerProcess {
         System.exit(0);
     }
 
-    private void execute(Connection connection,
-                         int workerId, TikaConfig tikaConfig) throws SQLException {
-
+    private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
+            throws SQLException {
+        //3 = worker + forkwatcher + active task
         ExecutorService service = Executors.newFixedThreadPool(3);
         ExecutorCompletionService<Integer> executorCompletionService =
                 new ExecutorCompletionService<>(service);
 
-        executorCompletionService.submit(new Worker(connection, workerId,
-                tikaConfig, parseTimeoutMs));
+        executorCompletionService
+                .submit(new Worker(connection, workerId, tikaConfig, parseTimeoutMs));
         executorCompletionService.submit(new ForkWatcher(System.in));
 
         int completed = 0;
@@ -135,6 +118,23 @@ public class AsyncWorkerProcess {
         return;
     }
 
+    enum TASK_STATUS_CODES {
+        AVAILABLE, SELECTED, IN_PROCESS, AVAILABLE_EMIT, SELECTED_EMIT, IN_PROCESS_EMIT,
+        FAILED_EMIT, EMITTED
+    }
+
+    public enum WORKER_STATUS_CODES {
+        ACTIVE, RESTARTING, HIBERNATING, CAN_SHUTDOWN,//if there's nothing else to process, shutdown
+        SHOULD_SHUTDOWN, //shutdown now whether or not there's anything else to process
+        HAS_SHUTDOWN
+    }
+
+    enum ERROR_CODES {
+        TIMEOUT, SECURITY_EXCEPTION, OTHER_EXCEPTION, OOM, OTHER_ERROR, UNKNOWN_PARSE, MAX_RETRIES,
+        EMIT_SERIALIZATION, EMIT_SQL_INSERT_EXCEPTION, EMIT_SQL_SELECT_EXCEPTION,
+        EMIT_DESERIALIZATION, EMIT_EXCEPTION
+    }
+
     private static class TaskQueue {
         private final Connection connection;
         private final int workerId;
@@ -149,28 +149,22 @@ public class AsyncWorkerProcess {
             this.connection = connection;
             this.workerId = workerId;
             //TODO -- need to update timestamp
-            String sql = "update task_queue set status=" +
-                    TASK_STATUS_CODES.SELECTED.ordinal()+
-                    ", time_stamp=CURRENT_TIMESTAMP()"+
-                    " where id = " +
-                    " (select id from task_queue where worker_id = " + workerId +
-                    " and status="+ TASK_STATUS_CODES.AVAILABLE.ordinal()+
+            String sql = "update task_queue set status=" + TASK_STATUS_CODES.SELECTED.ordinal() +
+                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id = " +
+                    " (select id from task_queue where worker_id = " + workerId + " and status=" +
+                    TASK_STATUS_CODES.AVAILABLE.ordinal() +
                     " order by time_stamp asc limit 1 for update)";
             markForSelecting = connection.prepareStatement(sql);
             sql = "select id, retry, json from task_queue where status=" +
-                    TASK_STATUS_CODES.SELECTED.ordinal() +
-                    " and " +
-                    " worker_id=" + workerId +
+                    TASK_STATUS_CODES.SELECTED.ordinal() + " and " + " worker_id=" + workerId +
                     " order by time_stamp asc limit 1";
             selectForProcessing = connection.prepareStatement(sql);
-            sql = "update task_queue set status="+
-                    TASK_STATUS_CODES.IN_PROCESS.ordinal()+
-                    ", time_stamp=CURRENT_TIMESTAMP()"+
-                    " where id=?";
+            sql = "update task_queue set status=" + TASK_STATUS_CODES.IN_PROCESS.ordinal() +
+                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
             markForProcessing = connection.prepareStatement(sql);
 
-            sql = "select count(1) from workers where worker_id=" + workerId +
-            " and status="+WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
+            sql = "select count(1) from workers where worker_id=" + workerId + " and status=" +
+                    WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
             checkForShutdown = connection.prepareStatement(sql);
         }
 
@@ -183,7 +177,7 @@ public class AsyncWorkerProcess {
                 }
                 int i = markForSelecting.executeUpdate();
                 if (i == 0) {
-                    debugQueue();
+//                   debugQueue();
                     Thread.sleep(PULSE_MS);
                 } else {
                     long taskId = -1;
@@ -213,13 +207,12 @@ public class AsyncWorkerProcess {
         }
 
         private void debugQueue() throws SQLException {
-            try (ResultSet rs = connection.createStatement().executeQuery(
-                    "select * from task_queue limit 10")) {
+            try (ResultSet rs = connection.createStatement()
+                    .executeQuery("select id, status, worker_id from task_queue limit 10")) {
                 while (rs.next()) {
-                    for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
-                        System.out.print(rs.getString(i)+ " ");
-                    }
-                    System.out.println("");
+                    System.out.println(
+                            "id: " + rs.getInt(1) + " status: " + rs.getInt(2) + " worker_id: " +
+                                    rs.getInt(3));
                 }
             }
         }
@@ -243,18 +236,17 @@ public class AsyncWorkerProcess {
         private final RecursiveParserWrapper parser;
         private final TikaConfig tikaConfig;
         private final long parseTimeoutMs;
-        private ExecutorService executorService;
-        private ExecutorCompletionService<AsyncData> executorCompletionService;
         private final PreparedStatement insertErrorLog;
         private final PreparedStatement resetStatus;
         private final PreparedStatement insertEmitData;
         private final PreparedStatement updateStatusForEmit;
         private final ObjectMapper objectMapper = new ObjectMapper();
         LZ4Factory factory = LZ4Factory.fastestInstance();
+        private final ExecutorService executorService;
+        private final ExecutorCompletionService<AsyncData> executorCompletionService;
 
-        public Worker(Connection connection,
-                      int workerId,
-                      TikaConfig tikaConfig, long parseTimeoutMs) throws SQLException {
+        public Worker(Connection connection, int workerId, TikaConfig tikaConfig,
+                      long parseTimeoutMs) throws SQLException {
             this.connection = connection;
             this.workerId = workerId;
             this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
@@ -264,22 +256,24 @@ public class AsyncWorkerProcess {
             this.parseTimeoutMs = parseTimeoutMs;
 
             SimpleModule module = new SimpleModule();
-            module.addSerializer(Metadata.class, new JsonMetadata());
+            module.addSerializer(Metadata.class, new JsonMetadataSerializer());
             objectMapper.registerModule(module);
-            String sql = "insert into workers (worker_id, status) " +
-                    "values (" + workerId + ", "+
-                    WORKER_STATUS_CODES.ACTIVE.ordinal()+")";
+            String sql = "merge into workers key (worker_id) " + "values (" + workerId + ", " +
+                    WORKER_STATUS_CODES.ACTIVE.ordinal() + ")";
             connection.createStatement().execute(sql);
             insertErrorLog = prepareInsertErrorLog(connection);
             resetStatus = prepareReset(connection);
             insertEmitData = prepareInsertEmitData(connection);
-            sql = "update task_queue set status="+
-                    TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal()+
-                    ", time_stamp=CURRENT_TIMESTAMP()" +
-                    " where id=?";
+            sql = "update task_queue set status=" + TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
+                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
             updateStatusForEmit = connection.prepareStatement(sql);
         }
 
+        static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
+            return connection.prepareStatement(
+                    "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
+                            " values (?,CURRENT_TIMESTAMP(),?,?)");
+        }
 
         public Integer call() throws Exception {
             AsyncTask task = null;
@@ -306,26 +300,22 @@ public class AsyncWorkerProcess {
                     }
                 }
             } catch (TimeoutException e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.TIMEOUT,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.TIMEOUT, insertErrorLog, resetStatus, LOG);
             } catch (SecurityException e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION, insertErrorLog, resetStatus,
+                        LOG);
             } catch (Exception e) {
                 e.printStackTrace();
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION, insertErrorLog, resetStatus, LOG);
             } catch (OutOfMemoryError e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.OOM,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.OOM, insertErrorLog, resetStatus, LOG);
             } catch (Error e) {
-                LOG.warn(task.getFetchKey().getKey(), e);
-                reportAndReset(task, ERROR_CODES.OTHER_ERROR,
-                        insertErrorLog, resetStatus, LOG);
+                LOG.warn(task.getFetchKey().getFetchKey(), e);
+                reportAndReset(task, ERROR_CODES.OTHER_ERROR, insertErrorLog, resetStatus, LOG);
             } finally {
                 executorService.shutdownNow();
                 return 1;
@@ -338,15 +328,16 @@ public class AsyncWorkerProcess {
                 LOG.debug("received shutdown notification");
                 return;
             } else {
-                executorCompletionService.submit(new TaskProcessor(task, tikaConfig, parser,
-                        workerId));
-                Future<AsyncData> future = executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
+                executorCompletionService
+                        .submit(new TaskProcessor(task, tikaConfig, parser, workerId));
+                Future<AsyncData> future =
+                        executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
                 if (future == null) {
-                    handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+                    handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
                 } else {
                     AsyncData asyncData = future.get(1000, TimeUnit.MILLISECONDS);
                     if (asyncData == null) {
-                        handleTimeout(task.getTaskId(), task.getFetchKey().getKey());
+                        handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
                     }
                     boolean shouldEmit = checkForParseException(asyncData);
                     if (shouldEmit) {
@@ -354,13 +345,11 @@ public class AsyncWorkerProcess {
                             emit(asyncData);
                         } catch (JsonProcessingException e) {
                             e.printStackTrace();
-                            recordBadEmit(task.getTaskId(),
-                                    task.getFetchKey().getKey(),
+                            recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
                                     ERROR_CODES.EMIT_SERIALIZATION.ordinal());
                         } catch (SQLException e) {
                             e.printStackTrace();
-                            recordBadEmit(task.getTaskId(),
-                                    task.getFetchKey().getKey(),
+                            recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
                                     ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
                         }
                     }
@@ -372,18 +361,16 @@ public class AsyncWorkerProcess {
             //stub
         }
 
-        private void emit(AsyncData asyncData) throws SQLException,
-                JsonProcessingException {
+        private void emit(AsyncData asyncData) throws SQLException, JsonProcessingException {
             insertEmitData.clearParameters();
-            insertEmitData.setLong(1, asyncData.getAsyncTask().getTaskId());
+            insertEmitData.setLong(1, asyncData.getTaskId());
             byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
             byte[] compressed = factory.fastCompressor().compress(bytes);
             insertEmitData.setLong(2, bytes.length);
             insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
             insertEmitData.execute();
             updateStatusForEmit.clearParameters();
-            updateStatusForEmit.setLong(1,
-                    asyncData.getAsyncTask().getTaskId());
+            updateStatusForEmit.setLong(1, asyncData.getTaskId());
             updateStatusForEmit.execute();
         }
 
@@ -392,12 +379,10 @@ public class AsyncWorkerProcess {
             throw new TimeoutException(key);
         }
 
-
         private boolean checkForParseException(AsyncData asyncData) {
             if (asyncData == null || asyncData.getMetadataList() == null ||
                     asyncData.getMetadataList().size() == 0) {
-                LOG.warn("empty or null emit data ({})", asyncData.getAsyncTask()
-                        .getFetchKey().getKey());
+                LOG.warn("empty or null emit data ({})", asyncData.getFetchKey().getFetchKey());
                 return false;
             }
             boolean shouldEmit = true;
@@ -405,9 +390,8 @@ public class AsyncWorkerProcess {
             String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
             if (stack != null) {
                 LOG.warn("fetchKey ({}) container parse exception ({})",
-                        asyncData.getAsyncTask().getFetchKey().getKey(), stack);
-                if (asyncData.getAsyncTask().getOnParseException()
-                        == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
+                        asyncData.getFetchKey().getFetchKey(), stack);
+                if (asyncData.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
                     shouldEmit = false;
                 }
             }
@@ -417,18 +401,11 @@ public class AsyncWorkerProcess {
                 String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
                 if (embeddedStack != null) {
                     LOG.warn("fetchKey ({}) embedded parse exception ({})",
-                            asyncData.getAsyncTask().getFetchKey().getKey(), embeddedStack);
+                            asyncData.getFetchKey().getFetchKey(), embeddedStack);
                 }
             }
             return shouldEmit;
         }
-
-        static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
-            return connection.prepareStatement(
-                    "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
-                            " values (?,CURRENT_TIMESTAMP(),?,?)"
-            );
-        }
     }
 
     private static class TaskProcessor implements Callable<AsyncData> {
@@ -438,9 +415,7 @@ public class AsyncWorkerProcess {
         private final TikaConfig tikaConfig;
         private final int workerId;
 
-        public TaskProcessor(AsyncTask task,
-                             TikaConfig tikaConfig,
-                             Parser parser, int workerId) {
+        public TaskProcessor(AsyncTask task, TikaConfig tikaConfig, Parser parser, int workerId) {
             this.task = task;
             this.parser = parser;
             this.tikaConfig = tikaConfig;
@@ -451,14 +426,11 @@ public class AsyncWorkerProcess {
             Metadata userMetadata = task.getMetadata();
             Metadata metadata = new Metadata();
             String fetcherName = task.getFetchKey().getFetcherName();
-            String fetchKey = task.getFetchKey().getKey();
+            String fetchKey = task.getFetchKey().getFetchKey();
             List<Metadata> metadataList = null;
-            try (InputStream stream = tikaConfig.getFetcherManager()
-                    .getFetcher(fetcherName)
+            try (InputStream stream = tikaConfig.getFetcherManager().getFetcher(fetcherName)
                     .fetch(fetchKey, metadata)) {
-                metadataList = parseMetadata(task.getFetchKey(),
-                        stream,
-                        metadata);
+                metadataList = parseMetadata(task.getFetchKey(), stream, metadata);
             } catch (SecurityException e) {
                 throw e;
             }
@@ -468,11 +440,12 @@ public class AsyncWorkerProcess {
                 emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
                 task.setEmitKey(emitKey);
             }
-            return new AsyncData(task, metadataList);
+            return new AsyncData(task.getTaskId(), task.getFetchKey(), task.getEmitKey(),
+                    task.getOnParseException(), metadataList);
         }
 
-        private List<Metadata> parseMetadata(FetchKey fetchKey,
-                                             InputStream stream, Metadata metadata) {
+        private List<Metadata> parseMetadata(FetchKey fetchKey, InputStream stream,
+                                             Metadata metadata) {
             //make these configurable
             BasicContentHandlerFactory.HANDLER_TYPE type =
                     BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
@@ -486,16 +459,16 @@ public class AsyncWorkerProcess {
             try {
                 parser.parse(stream, handler, metadata, parseContext);
             } catch (SAXException e) {
-                LOG.warn("problem:" + fetchKey.getKey(), e);
+                LOG.warn("problem:" + fetchKey.getFetchKey(), e);
             } catch (EncryptedDocumentException e) {
-                LOG.warn("encrypted:" + fetchKey.getKey(), e);
+                LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
             } catch (SecurityException e) {
-                LOG.warn("security exception: " + fetchKey.getKey());
+                LOG.warn("security exception: " + fetchKey.getFetchKey());
                 throw e;
             } catch (Exception e) {
-                LOG.warn("exception: " + fetchKey.getKey());
+                LOG.warn("exception: " + fetchKey.getFetchKey());
             } catch (OutOfMemoryError e) {
-                LOG.error("oom: " + fetchKey.getKey());
+                LOG.error("oom: " + fetchKey.getFetchKey());
                 throw e;
             }
             return handler.getMetadataList();
@@ -514,6 +487,7 @@ public class AsyncWorkerProcess {
 
     private static class ForkWatcher implements Callable<Integer> {
         private final InputStream in;
+
         public ForkWatcher(InputStream in) {
             this.in = in;
         }
diff --git a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
index 585b03b..43553a4 100644
--- a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
+++ b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=debug,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
deleted file mode 100644
index c653267..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncCliTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.tika.pipes.async;
-
-import org.junit.Test;
-
-public class AsyncCliTest {
-    @Test
-    public void testBasic() throws Exception {
-        String[] args = {
-                "/Users/allison/Desktop/tika-tmp/tika-config.xml"
-        };
-        AsyncCli.main(args);
-    }
-
-    @Test
-    public void testUnhandled() throws InterruptedException {
-        Thread t = new Thread(new Task());
-
-        t.start();
-        t.join();
-        for (StackTraceElement el : t.getStackTrace()) {
-            System.out.println(el);
-        }
-    }
-
-    private static class Task implements Runnable {
-
-        @Override
-        public void run() {
-            Thread.currentThread().setUncaughtExceptionHandler(new MyUncaught());
-            for (int i = 0; i < 5; i++) {
-                System.out.println(i);
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException();
-                }
-            }
-            throw new RuntimeException("kaboom");
-        }
-    }
-
-    private static class MyUncaught implements Thread.UncaughtExceptionHandler {
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-            throw new RuntimeException("bad");
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 920634e..04890b3 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -1,13 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -18,6 +28,18 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncProcessorTest {
 
@@ -30,29 +52,21 @@ public class AsyncProcessorTest {
     public void setUp() throws SQLException, IOException {
         dbDir = Files.createTempDirectory("async-db");
         dbFile = dbDir.resolve("emitted-db");
-        String jdbc = "jdbc:h2:file:"+dbFile.toAbsolutePath().toString()+";AUTO_SERVER=TRUE";
-        String sql = "create table emitted (id int auto_increment primary key, json varchar(20000))";
+        String jdbc = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
+        String sql = "create table emitted (id int auto_increment primary key, " +
+                "emitkey varchar(2000), json varchar(20000))";
 
         connection = DriverManager.getConnection(jdbc);
         connection.createStatement().execute(sql);
         tikaConfigPath = dbDir.resolve("tika-config.xml");
-        String xml = "" +
-                "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" +
-                "<properties>" +
-                "  <emitters>"+
-                "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
-                "    <params>\n" +
-                "      <param name=\"name\" type=\"string\">mock</param>\n"+
-                "      <param name=\"jdbc\" type=\"string\">"+jdbc+"</param>\n"+
-                "    </params>" +
-                "  </emitter>" +
-                "  </emitters>"+
-                "  <fetchers>" +
+        String xml = "" + "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" +
+                "  <emitters>" + "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
+                "    <params>\n" + "      <param name=\"name\" type=\"string\">mock</param>\n" +
+                "      <param name=\"jdbc\" type=\"string\">" + jdbc + "</param>\n" +
+                "    </params>" + "  </emitter>" + "  </emitters>" + "  <fetchers>" +
                 "    <fetcher class=\"org.apache.tika.pipes.async.MockFetcher\">" +
-                "      <param name=\"name\" type=\"string\">mock</param>\n"+
-                "    </fetcher>" +
-                "  </fetchers>"+
-                "</properties>";
+                "      <param name=\"name\" type=\"string\">mock</param>\n" + "    </fetcher>" +
+                "  </fetchers>" + "</properties>";
         Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
     }
 
@@ -68,21 +82,24 @@ public class AsyncProcessorTest {
 
 
         AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
-        for (int i = 0 ; i < 100; i++) {
-            FetchEmitTuple t = new FetchEmitTuple(
-                    new FetchKey("mock", "key-"+i),
-                    new EmitKey("mock", "emit-"+i),
-                    new Metadata()
-            );
+        int max = 100;
+        for (int i = 0; i < max; i++) {
+            FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", "key-" + i),
+                    new EmitKey("mock", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
         }
         processor.close();
-        String sql = "select * from emitted";
-        try (Statement st = connection.createStatement();
-             ResultSet rs = st.executeQuery(sql)) {
+        String sql = "select emitkey from emitted";
+        Set<String> emitKeys = new HashSet<>();
+        try (Statement st = connection.createStatement(); ResultSet rs = st.executeQuery(sql)) {
             while (rs.next()) {
-                System.out.println(rs.getInt(1) + " : "+rs.getString(2));
+                String emitKey = rs.getString(1);
+                emitKeys.add(emitKey);
             }
         }
+        assertEquals(max, emitKeys.size());
+        for (int i = 0; i < max; i++) {
+            assertTrue(emitKeys.contains("emit-" + i));
+        }
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
index 9f07dec..5576f73 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -1,32 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 public class MockEmitter implements Initializable, Emitter {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private Connection connection;
     private String jdbc;
     private PreparedStatement insert;
 
+    public MockEmitter() {
+        SimpleModule module = new SimpleModule();
+        module.addSerializer(Metadata.class, new JsonMetadataSerializer());
+        objectMapper.registerModule(module);
+    }
+
     @Field
     public void setJdbc(String jdbc) {
         this.jdbc = jdbc;
@@ -38,10 +63,10 @@ public class MockEmitter implements Initializable, Emitter {
     }
 
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
-        emit(Collections.singletonList(
-                new EmitData(
-                new EmitKey(getName(), emitKey), metadataList)));
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
+        emit(Collections
+                .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
     }
 
     @Override
@@ -50,7 +75,8 @@ public class MockEmitter implements Initializable, Emitter {
             String json = objectMapper.writeValueAsString(d);
             try {
                 insert.clearParameters();
-                insert.setString(1, json);
+                insert.setString(1, d.getEmitKey().getEmitKey());
+                insert.setString(2, json);
                 insert.execute();
             } catch (SQLException e) {
                 throw new TikaEmitterException("problem inserting", e);
@@ -62,7 +88,7 @@ public class MockEmitter implements Initializable, Emitter {
     public void initialize(Map<String, Param> params) throws TikaConfigException {
         try {
             connection = DriverManager.getConnection(jdbc);
-            String sql = "insert into emitted (json) values (?)";
+            String sql = "insert into emitted (emitkey, json) values (?, ?)";
             insert = connection.prepareStatement(sql);
         } catch (SQLException e) {
             throw new TikaConfigException("problem w connection", e);
@@ -70,7 +96,8 @@ public class MockEmitter implements Initializable, Emitter {
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException {
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
 
     }
 }
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index 8c99ecf..7a74f2a 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -1,21 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.pipes.async;
 
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.fetcher.Fetcher;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.Fetcher;
+
 public class MockFetcher implements Fetcher {
 
-    private static byte[] BYTES = new String("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>"+
-            "<mock>"+
-            "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>"+
-            "<write element=\"p\">main_content</write>"+
-        "</mock>").getBytes(StandardCharsets.UTF_8);
+    private static final byte[] BYTES = ("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+            "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>" +
+            "<write element=\"p\">main_content</write>" + "</mock>")
+            .getBytes(StandardCharsets.UTF_8);
 
     @Override
     public String getName() {
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
new file mode 100644
index 0000000..ebbd8f0
--- /dev/null
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.junit.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
+
+public class SerializationTest {
+
+    @Test
+    public void testBasic() throws Exception {
+        String json = "{\"taskId\":49,\"fetchKey\":{\"fetcherName\":\"mock\"," +
+                "\"fetchKey\":\"key-48\"},\"emitKey\":{\"emitterName\":\"mock\"," +
+                "\"emitKey\":\"emit-48\"},\"onParseException\":\"EMIT\",\"metadataList\":" +
+                "[{\"X-TIKA:Parsed-By\":" +
+                "\"org.apache.tika.parser.EmptyParser\",\"X-TIKA:parse_time_millis\":" +
+                "\"0\",\"X-TIKA:embedded_depth\":\"0\"}]}";
+
+        ObjectMapper mapper = new ObjectMapper();
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
+        mapper.registerModule(module);
+        AsyncData asyncData = mapper.readValue(json, AsyncData.class);
+        assertEquals(49, asyncData.getTaskId());
+        assertEquals("mock", asyncData.getFetchKey().getFetcherName());
+        assertEquals(1, asyncData.getMetadataList().size());
+    }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
deleted file mode 100644
index 394fb16..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/TestPipesDriver.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.tika.pipes.async;
-
-
-import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestPipesDriver {
-
-    static Path TMP_DIR;
-    static Path DB;
-
-    static AtomicInteger PROCESSED = new AtomicInteger(0);
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        TMP_DIR = Files.createTempDirectory("pipes-driver-");
-        DB = Files.createTempFile(TMP_DIR, "", "");
-    }
-
-    @AfterClass
-    public static void tearDown() throws Exception {
-        FileUtils.deleteDirectory(TMP_DIR.toFile());
-    }
-
-
-    @Test
-    public void testQueue() throws Exception {
-        int numThreads = 20;
-        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10000 + numThreads);
-        for (int i = 0; i < 10000; i++) {
-            queue.add(1);
-        }
-        for (int i = 0; i < numThreads; i++) {
-            queue.offer(-1);
-        }
-        ExecutorService service = Executors.newFixedThreadPool(numThreads);
-        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(service);
-
-        long start = System.currentTimeMillis();
-        executorCompletionService.submit(new Watcher(queue));
-        for (int i = 0; i < numThreads; i++) {
-            executorCompletionService.submit(new QueueWorker(queue));
-        }
-        int finished = 0;
-        while (finished++ < numThreads) {
-            executorCompletionService.take();
-        }
-        long elapsed = System.currentTimeMillis() - start;
-        System.out.println("elapsed: " + elapsed);
-        service.shutdownNow();
-    }
-
-    private static class Watcher implements Callable<Integer> {
-        private final ArrayBlockingQueue<Integer> queue;
-
-        Watcher(ArrayBlockingQueue<Integer> queue) {
-            this.queue = queue;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            long start = System.currentTimeMillis();
-            while (true) {
-                long elapsed = System.currentTimeMillis() - start;
-                Thread.sleep(1000);
-            }
-        }
-    }
-
-    private static class QueueWorker implements Callable<Integer> {
-        static AtomicInteger counter = new AtomicInteger(0);
-
-
-        private final int id;
-        private final ArrayBlockingQueue<Integer> queue;
-
-        QueueWorker(ArrayBlockingQueue<Integer> queue) {
-            id = counter.incrementAndGet();
-            this.queue = queue;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            while (true) {
-                Integer val = queue.poll(1, TimeUnit.SECONDS);
-                if (val != null) {
-                    if (val < 0) {
-                        return 1;
-                    } else {
-                        long sleep = id * 100;
-                        Thread.sleep(sleep);
-                    }
-                }
-            }
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 332813f..7fbba83 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -17,23 +17,6 @@
 
 package org.apache.tika.pipes;
 
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.iterable.S3Objects;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.s3.S3Emitter;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.junit.Ignore;
-import org.junit.Test;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -49,6 +32,24 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.iterable.S3Objects;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.s3.S3Emitter;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
 @Ignore("turn these into actual tests")
 public class PipeIntegrationTests {
 
@@ -59,10 +60,8 @@ public class PipeIntegrationTests {
         String region = "";
         String profile = "";
         String bucket = "";
-        AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
-                .withRegion(region)
-                .withCredentials(new ProfileCredentialsProvider(profile))
-                .build();
+        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region)
+                .withCredentials(new ProfileCredentialsProvider(profile)).build();
         s3Client.listObjects(bucket);
         int cnt = 0;
         long sz = 0;
@@ -72,17 +71,18 @@ public class PipeIntegrationTests {
             if (Files.isRegularFile(targ)) {
                 continue;
             }
-            if (! Files.isDirectory(targ.getParent())) {
+            if (!Files.isDirectory(targ.getParent())) {
                 Files.createDirectories(targ.getParent());
             }
-            System.out.println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
+            System.out
+                    .println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
             S3Object s3Object = s3Client.getObject(bucket, summary.getKey());
             Files.copy(s3Object.getObjectContent(), targ);
             summary.getSize();
             cnt++;
             sz += summary.getSize();
         }
-        System.out.println("iterated: "+cnt + " sz: "+sz);
+        System.out.println("iterated: " + cnt + " sz: " + sz);
     }
 
     @Test
@@ -94,8 +94,8 @@ public class PipeIntegrationTests {
         ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
         for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new FSFetcherEmitter(
-                    queue, tikaConfig.getFetcherManager().getFetcher("s3"), null));
+            completionService.submit(new FSFetcherEmitter(queue,
+                    tikaConfig.getFetcherManager().getFetcher("s3"), null));
         }
         for (FetchEmitTuple t : it) {
             queue.offer(t);
@@ -105,7 +105,7 @@ public class PipeIntegrationTests {
         }
         int finished = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers + 1) {
                 Future<Integer> future = completionService.take();
                 future.get();
             }
@@ -122,9 +122,9 @@ public class PipeIntegrationTests {
         ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
         for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new S3FetcherEmitter(
-                    queue, tikaConfig.getFetcherManager().getFetcher("s3f"),
-                    (S3Emitter)tikaConfig.getEmitterManager().getEmitter("s3e")));
+            completionService.submit(new S3FetcherEmitter(queue,
+                    tikaConfig.getFetcherManager().getFetcher("s3f"),
+                    (S3Emitter) tikaConfig.getEmitterManager().getEmitter("s3e")));
         }
         FetchIterator it = tikaConfig.getFetchIterator();
         for (FetchEmitTuple t : it) {
@@ -135,7 +135,7 @@ public class PipeIntegrationTests {
         }
         int finished = 0;
         try {
-            while (finished++ < numConsumers+1) {
+            while (finished++ < numConsumers + 1) {
                 Future<Integer> future = completionService.take();
                 future.get();
             }
@@ -145,8 +145,7 @@ public class PipeIntegrationTests {
     }
 
     private TikaConfig getConfig(String fileName) throws Exception {
-        try (InputStream is =
-                     PipeIntegrationTests.class.getResourceAsStream("/"+fileName)) {
+        try (InputStream is = PipeIntegrationTests.class.getResourceAsStream("/" + fileName)) {
             return new TikaConfig(is);
         }
     }
@@ -159,8 +158,8 @@ public class PipeIntegrationTests {
         private final Emitter emitter;
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
 
-        FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
-                fetcher, Emitter emitter) {
+        FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
+                         Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
             this.emitter = emitter;
@@ -182,12 +181,12 @@ public class PipeIntegrationTests {
         }
 
         private void process(FetchEmitTuple t) throws IOException, TikaException {
-            Path targ = OUTDIR.resolve(t.getFetchKey().getKey());
+            Path targ = OUTDIR.resolve(t.getFetchKey().getFetchKey());
             if (Files.isRegularFile(targ)) {
                 return;
             }
-            try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
-                System.out.println(counter.getAndIncrement() + " : "+t );
+            try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
+                System.out.println(counter.getAndIncrement() + " : " + t);
                 Files.createDirectories(targ.getParent());
                 Files.copy(is, targ);
             }
@@ -201,8 +200,8 @@ public class PipeIntegrationTests {
         private final S3Emitter emitter;
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
 
-        S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher
-                fetcher, S3Emitter emitter) {
+        S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
+                         S3Emitter emitter) {
             this.queue = queue;
             this.fetcher = fetcher;
             this.emitter = emitter;
@@ -227,7 +226,7 @@ public class PipeIntegrationTests {
             Metadata userMetadata = new Metadata();
             userMetadata.set("project", "my-project");
 
-            try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
+            try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
                 emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
             }
         }
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties b/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
index 1bee240..2b2da1a 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/log4j.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,13 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 #info,debug, error,fatal ...
 log4j.rootLogger=info,stderr
-
 #console
 log4j.appender.stderr=org.apache.log4j.ConsoleAppender
 log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
 log4j.appender.stderr.Target=System.err
-
 log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 3024f6a..78ba9d2 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -133,7 +133,7 @@ public class JsonFetchEmitTuple {
     static void writeTuple(FetchEmitTuple t, JsonGenerator jsonGenerator) throws IOException {
         jsonGenerator.writeStartObject();
         jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
-        jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
+        jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getFetchKey());
         jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
         if (!StringUtils.isBlank(t.getEmitKey().getEmitKey())) {
             jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getEmitKey());
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index 06612a1..b33c2a9 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -26,19 +26,13 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
-public class JsonMetadata extends StdSerializer<Metadata> {
+public class JsonMetadata {
 
     static volatile boolean PRETTY_PRINT = false;
 
-    public JsonMetadata() {
-        super(Metadata.class);
-    }
-
     /**
      * Serializes a Metadata object to Json.  This does not flush or close the writer.
      *
@@ -146,10 +140,4 @@ public class JsonMetadata extends StdSerializer<Metadata> {
         PRETTY_PRINT = prettyPrint;
     }
 
-    @Override
-    public void serialize(Metadata metadata,
-                          JsonGenerator jsonGenerator,
-                          SerializerProvider serializerProvider) throws IOException {
-        writeMetadataObject(metadata, jsonGenerator, false);
-    }
 }
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java
new file mode 100644
index 0000000..5a9b7d0
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import static org.apache.tika.metadata.serialization.JsonMetadata.readMetadataObject;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import org.apache.tika.metadata.Metadata;
+
+public class JsonMetadataDeserializer extends StdDeserializer<Metadata> {
+
+    public JsonMetadataDeserializer() {
+        super(Metadata.class);
+    }
+
+    @Override
+    public Metadata deserialize(JsonParser jsonParser,
+                                DeserializationContext deserializationContext)
+            throws IOException, JsonProcessingException {
+        return readMetadataObject(jsonParser);
+    }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java
new file mode 100644
index 0000000..afcf012
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import static org.apache.tika.metadata.serialization.JsonMetadata.writeMetadataObject;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import org.apache.tika.metadata.Metadata;
+
+public class JsonMetadataSerializer extends StdSerializer<Metadata> {
+
+    public JsonMetadataSerializer() {
+        super(Metadata.class);
+    }
+
+    @Override
+    public void serialize(Metadata metadata,
+                          JsonGenerator jsonGenerator,
+                          SerializerProvider serializerProvider) throws IOException {
+        writeMetadataObject(metadata, jsonGenerator, false);
+    }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index efd9a5c..d68cc11 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -64,7 +64,7 @@ public class AsyncParser implements Callable<Integer> {
                     if (!offered) {
                         //TODO: deal with this
                         LOG.warn("Failed to add ({}) " + "to emit queue after 10 minutes.",
-                                request.getFetchKey().getKey());
+                                request.getFetchKey().getFetchKey());
                     }
                 }
             } else {
@@ -76,14 +76,14 @@ public class AsyncParser implements Callable<Integer> {
     private boolean checkForParseException(FetchEmitTuple request, EmitData emitData) {
         if (emitData == null || emitData.getMetadataList() == null ||
                 emitData.getMetadataList().size() == 0) {
-            LOG.warn("empty or null emit data ({})", request.getFetchKey().getKey());
+            LOG.warn("empty or null emit data ({})", request.getFetchKey().getFetchKey());
             return false;
         }
         boolean shouldEmit = true;
         Metadata container = emitData.getMetadataList().get(0);
         String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
         if (stack != null) {
-            LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getKey(),
+            LOG.warn("fetchKey ({}) container parse exception ({})", request.getFetchKey().getFetchKey(),
                     stack);
             if (request.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
                 shouldEmit = false;
@@ -95,7 +95,7 @@ public class AsyncParser implements Callable<Integer> {
             String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
             if (embeddedStack != null) {
                 LOG.warn("fetchKey ({}) embedded parse exception ({})",
-                        request.getFetchKey().getKey(), embeddedStack);
+                        request.getFetchKey().getFetchKey(), embeddedStack);
             }
         }
         return shouldEmit;
@@ -105,7 +105,7 @@ public class AsyncParser implements Callable<Integer> {
         Metadata userMetadata = t.getMetadata();
         Metadata metadata = new Metadata();
         String fetcherName = t.getFetchKey().getFetcherName();
-        String fetchKey = t.getFetchKey().getKey();
+        String fetchKey = t.getFetchKey().getFetchKey();
         List<Metadata> metadataList = null;
         try (InputStream stream = TikaResource.getConfig().getFetcherManager()
                 .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 47dbc50..001926b 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -138,7 +138,7 @@ public class AsyncResource {
     private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
         List<String> fetchKeys = new ArrayList<>();
         for (FetchEmitTuple t : notAdded) {
-            fetchKeys.add(t.getFetchKey().getKey());
+            fetchKeys.add(t.getFetchKey().getFetchKey());
         }
         Map<String, Object> map = new HashMap<>();
         map.put("status", "throttled");
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 306732a..16aa76e 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -71,7 +71,7 @@ public class EmitterResource {
         //TODO: clean this up?
         EmitKey emitKey = t.getEmitKey();
         if (StringUtils.isBlank(emitKey.getEmitKey())) {
-            emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getKey());
+            emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey());
         }
         return emitKey;
     }
@@ -171,7 +171,7 @@ public class EmitterResource {
         List<Metadata> metadataList = null;
         try (InputStream stream = TikaResource.getConfig().getFetcherManager()
                 .getFetcher(t.getFetchKey().getFetcherName())
-                .fetch(t.getFetchKey().getKey(), metadata)) {
+                .fetch(t.getFetchKey().getFetchKey(), metadata)) {
 
             metadataList = RecursiveMetadataResource
                     .parseMetadata(stream, metadata, httpHeaders.getRequestHeaders(), info, "text");
@@ -213,7 +213,7 @@ public class EmitterResource {
                 shouldEmit = false;
             }
             LOG.warn("fetchKey ({}) caught container parse exception ({})",
-                    t.getFetchKey().getKey(), stack);
+                    t.getFetchKey().getFetchKey(), stack);
         }
 
         for (int i = 1; i < metadataList.size(); i++) {
@@ -221,7 +221,7 @@ public class EmitterResource {
             String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
             if (embeddedStack != null) {
                 LOG.warn("fetchKey ({}) caught embedded parse exception ({})",
-                        t.getFetchKey().getKey(), embeddedStack);
+                        t.getFetchKey().getFetchKey(), embeddedStack);
             }
         }