You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/04/28 15:45:29 UTC

[tika] branch main updated: TIKA-3370 -- refactor AsyncProcessor

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 05709ee  TIKA-3370 -- refactor AsyncProcessor
05709ee is described below

commit 05709eef16d1b290467aa0c2b96ed4f9cd8d2489
Author: tallison <ta...@apache.org>
AuthorDate: Wed Apr 28 11:45:00 2021 -0400

    TIKA-3370 -- refactor AsyncProcessor
---
 tika-core/pom.xml                                  |   6 +
 .../java/org/apache/tika/pipes/FetchEmitTuple.java |   3 +-
 .../java/org/apache/tika/pipes/HandlerConfig.java  |   8 +-
 .../org/apache/tika/pipes/async/AsyncClient.java   | 142 +++++
 .../apache/tika/pipes/async/AsyncClientConfig.java |  23 +-
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  21 +-
 .../org/apache/tika/pipes/async}/AsyncEmitter.java |  18 +-
 .../tika/pipes/async/AsyncEmitterConfig.java       |   9 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    | 199 +++++++
 .../tika/pipes/async/AsyncRuntimeException.java    |   0
 .../org/apache/tika/pipes/async/AsyncServer.java   | 337 ++++++++++++
 .../tika/pipes/async/OfferLargerThanQueueSize.java |  26 +-
 .../org/apache/tika/pipes/emitter/EmitData.java    |   7 +-
 .../org/apache/tika/pipes/emitter/EmitKey.java     |   9 +-
 .../org/apache/tika/pipes/fetcher/FetchKey.java    |   9 +-
 .../tika/pipes/async/AsyncProcessorTest.java       |  85 +--
 .../org/apache/tika/pipes/async/MockEmitter.java   |  62 +++
 .../org/apache/tika/pipes/async/MockFetcher.java   |   0
 tika-pipes/pom.xml                                 |   1 -
 tika-pipes/tika-pipes-async/pom.xml                | 101 ----
 .../java/org/apache/tika/pipes/async/AsyncCli.java | 358 -------------
 .../org/apache/tika/pipes/async/AsyncConfig.java   |  76 ---
 .../org/apache/tika/pipes/async/AsyncData.java     |  57 --
 .../org/apache/tika/pipes/async/AsyncEmitter.java  | 125 -----
 .../tika/pipes/async/AsyncEmitterProcess.java      | 379 -------------
 .../tika/pipes/async/AsyncPipesEmitHook.java       |  61 ---
 .../apache/tika/pipes/async/AsyncProcessor.java    | 594 ---------------------
 .../org/apache/tika/pipes/async/AsyncTask.java     |  54 --
 .../org/apache/tika/pipes/async/AsyncWorker.java   | 195 -------
 .../tika/pipes/async/AsyncWorkerProcess.java       | 505 ------------------
 .../src/main/resources/log4j.properties            |  22 -
 .../org/apache/tika/pipes/async/MockEmitter.java   | 103 ----
 .../apache/tika/pipes/async/SerializationTest.java |  49 --
 tika-server/tika-server-core/pom.xml               |   1 -
 .../org/apache/tika/server/core/TikaServerCli.java |   3 +-
 .../apache/tika/server/core/TikaServerConfig.java  |  13 +-
 .../apache/tika/server/core/TikaServerProcess.java |  63 +--
 .../tika/server/core/resource/AsyncParser.java     | 139 -----
 .../tika/server/core/resource/AsyncResource.java   |  51 +-
 .../core/TikaServerAsyncIntegrationTest.java       |  21 +-
 .../tika/server/core/TikaServerConfigTest.java     |   7 +
 41 files changed, 971 insertions(+), 2971 deletions(-)

diff --git a/tika-core/pom.xml b/tika-core/pom.xml
index 93a8dc2..ff98d41 100644
--- a/tika-core/pom.xml
+++ b/tika-core/pom.xml
@@ -89,6 +89,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>${log4j2.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 35621e9..ac5383c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -16,13 +16,14 @@
  */
 package org.apache.tika.pipes;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
-public class FetchEmitTuple {
+public class FetchEmitTuple implements Serializable {
 
     public static final ON_PARSE_EXCEPTION DEFAULT_ON_PARSE_EXCEPTION = ON_PARSE_EXCEPTION.EMIT;
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
index 93e7a98..5272642 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
@@ -16,11 +16,17 @@
  */
 package org.apache.tika.pipes;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import org.apache.tika.sax.BasicContentHandlerFactory;
 
-public class HandlerConfig {
+public class HandlerConfig implements Serializable {
+
+    /**
+     * Serial version UID
+     */
+    private static final long serialVersionUID = -3861669115439125268L;
 
     public static HandlerConfig DEFAULT_HANDLER_CONFIG =
             new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1, -1);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
new file mode 100644
index 0000000..83baa94
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.utils.ProcessUtils;
+
+public class AsyncClient implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
+
+    //TODO: make these configurable
+    private long parseTimeoutMillis = 30000;
+    private long waitTimeoutMillis = 500000;
+
+    private Process process;
+    private final Path tikaConfigPath;
+    private DataOutputStream output;
+    private DataInputStream input;
+
+    public AsyncClient(Path tikaConfigPath) {
+        this.tikaConfigPath = tikaConfigPath;
+    }
+
+    private int filesProcessed = 0;
+
+    public int getFilesProcessed() {
+        return filesProcessed;
+    }
+
+    private boolean ping() {
+        if (process == null || ! process.isAlive()) {
+            return false;
+        }
+        try {
+            output.write(AsyncServer.PING);
+            output.flush();
+            int ping = input.read();
+            if (ping == AsyncServer.PING) {
+                return true;
+            }
+        } catch (IOException e) {
+            return false;
+        }
+        return false;
+    }
+
+    @Override
+    public void close() {
+        process.destroyForcibly();
+    }
+
+    public EmitData process(FetchEmitTuple t) throws IOException {
+        if (! ping()) {
+            restart();
+        }
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+            objectOutputStream.writeObject(t);
+        }
+        byte[] bytes = bos.toByteArray();
+        output.write(AsyncServer.CALL);
+        output.writeInt(bytes.length);
+        output.write(bytes);
+        output.flush();
+
+        int status = input.read();
+
+        //TODO clean this up, never return null
+        if (status == AsyncServer.OOM) {
+            LOG.warn(t.getFetchKey().getFetchKey() + " oom");
+            return null;
+        } else if (status == AsyncServer.READY) {
+        } else {
+
+            throw new IOException("problem reading response from server " + status);
+        }
+        int length = input.readInt();
+        bytes = new byte[length];
+        input.readFully(bytes);
+        try (ObjectInputStream objectInputStream =
+                     new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+            return (EmitData)objectInputStream.readObject();
+        } catch (ClassNotFoundException e) {
+            //this should be catastrophic
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void restart() throws IOException {
+        if (process != null) {
+            process.destroyForcibly();
+        }
+        LOG.debug("restarting process");
+        ProcessBuilder pb = new ProcessBuilder(getCommandline());
+        pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+        process = pb.start();
+        input = new DataInputStream(process.getInputStream());
+        output = new DataOutputStream(process.getOutputStream());
+    }
+
+    private String[] getCommandline() {
+        //TODO: make this all configurable
+        return new String[]{
+                "java",
+                "-cp",
+                System.getProperty("java.class.path"),
+                "org.apache.tika.pipes.async.AsyncServer",
+                ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()),
+                Long.toString(parseTimeoutMillis),
+                Long.toString(waitTimeoutMillis),
+        };
+    }
+}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
similarity index 64%
copy from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
copy to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
index 875cc90..c8b8ccb 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
@@ -16,12 +16,23 @@
  */
 package org.apache.tika.pipes.async;
 
-/**
- * Fatal exception that means that something went seriously wrong.
- */
-public class AsyncRuntimeException extends RuntimeException {
+import java.io.IOException;
+import java.nio.file.Path;
+
+class AsyncClientConfig {
+
+    private int fetchQueueSize = 20000;
+    private int numWorkers = 10;
+    private String[] workerJVMArgs;
+    private long parseTimeoutMs;
+    private long waitTimeoutMs;
+    private long maxFilesProcessed;
 
-    public AsyncRuntimeException(Throwable t) {
-        super(t);
+    public static AsyncClientConfig load(Path p) throws IOException {
+        AsyncClientConfig asyncConfig = new AsyncClientConfig();
+
+        return asyncConfig;
     }
+
+
 }
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
similarity index 69%
copy from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
copy to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 875cc90..d16dd16 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -16,12 +16,21 @@
  */
 package org.apache.tika.pipes.async;
 
-/**
- * Fatal exception that means that something went seriously wrong.
- */
-public class AsyncRuntimeException extends RuntimeException {
+import java.io.IOException;
+import java.nio.file.Path;
+
+class AsyncConfig {
+
+
+    private AsyncClientConfig asyncClientConfig;
+    private AsyncClientConfig asyncRetryClientConfig;
+    private AsyncEmitterConfig asyncEmitterConfig;
 
-    public AsyncRuntimeException(Throwable t) {
-        super(t);
+    public static AsyncConfig load(Path p) throws IOException {
+        AsyncConfig asyncConfig = new AsyncConfig();
+
+        return asyncConfig;
     }
+
+
 }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
similarity index 88%
rename from tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
rename to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 054a1d2..fa53764 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.server.core.resource;
+package org.apache.tika.pipes.async;
 
 import java.io.IOException;
 import java.time.Instant;
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.utils.ExceptionUtils;
 
@@ -42,6 +43,9 @@ import org.apache.tika.utils.ExceptionUtils;
  */
 public class AsyncEmitter implements Callable<Integer> {
 
+    static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null);
+    static final int EMITTER_FUTURE_CODE = 2;
+
     private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
 
     //TODO -- need to configure these
@@ -49,12 +53,14 @@ public class AsyncEmitter implements Callable<Integer> {
 
     private long maxEstimatedBytes = 10_000_000;
 
+    private final EmitterManager emitterManager;
     private final ArrayBlockingQueue<EmitData> emitDataQueue;
 
     Instant lastEmitted = Instant.now();
 
-    public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData) {
+    public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData, EmitterManager emitterManager) {
         this.emitDataQueue = emitData;
+        this.emitterManager = emitterManager;
     }
 
     @Override
@@ -63,6 +69,10 @@ public class AsyncEmitter implements Callable<Integer> {
 
         while (true) {
             EmitData emitData = emitDataQueue.poll(100, TimeUnit.MILLISECONDS);
+            if (emitData == EMIT_DATA_STOP_SEMAPHORE) {
+                cache.emitAll();
+                return EMITTER_FUTURE_CODE;
+            }
             if (emitData != null) {
                 //this can block on emitAll
                 cache.add(emitData);
@@ -116,11 +126,11 @@ public class AsyncEmitter implements Callable<Integer> {
             int emitted = 0;
             LOG.debug("about to emit {}", size);
             for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
-                Emitter emitter = TikaResource.getConfig()
-                        .getEmitterManager().getEmitter(e.getKey());
+                Emitter emitter = emitterManager.getEmitter(e.getKey());
                 tryToEmit(emitter, e.getValue());
                 emitted += e.getValue().size();
             }
+
             LOG.debug("emitted: {}", emitted);
             estimatedSize = 0;
             size = 0;
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
similarity index 85%
rename from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
rename to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
index 502c8b4..ed6973c 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
@@ -16,9 +16,12 @@
  */
 package org.apache.tika.pipes.async;
 
-public interface AsyncEmitHook {
+class AsyncEmitterConfig {
+
+    private long emitWithinMs;
+    private long emitMaxEstimatedBytes;
+    private final int numEmitters = 1;
+
 
-    void onSuccess(AsyncTask task);
 
-    void onFail(AsyncTask task);
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
new file mode 100644
index 0000000..0f81bf7
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
+/**
+ * This is the main class for handling async requests. This manages
+ * AsyncClients and AsyncEmitters.
+ *
+ * The AsyncClient and AsyncServer communicate over AsyncServer's
+ * STDERR because the default log4j 2 appender writes to STDOUT.  If you configure logging for the
+ * AsyncServer, DO NOT write to STDERR.
+ */
+public class AsyncProcessor implements Closeable {
+
+    static final int PARSER_FUTURE_CODE = 1;
+    private final Path tikaConfigPath;
+    private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
+    private final ArrayBlockingQueue<EmitData> emitData;
+    private final ExecutorCompletionService<Integer> executorCompletionService;
+    private final ExecutorService executorService;
+    private final int fetchEmitTupleQSize = 1000;
+    private int numParserThreads = 10;
+    private int numEmitterThreads = 2;
+    private int numParserThreadsFinished = 0;
+    private boolean addedEmitterSemaphores = false;
+    private int finished = 0;
+    boolean isShuttingDown = false;
+
+    public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
+        this.tikaConfigPath = tikaConfigPath;
+        this.fetchEmitTuples = new ArrayBlockingQueue<>(fetchEmitTupleQSize);
+        this.emitData = new ArrayBlockingQueue<>(100);
+        this.executorService = Executors.newFixedThreadPool(numParserThreads + numEmitterThreads);
+        this.executorCompletionService =
+                new ExecutorCompletionService<>(executorService);
+        for (int i = 0; i < numParserThreads; i++) {
+            executorCompletionService.submit(new FetchEmitWorker(tikaConfigPath, fetchEmitTuples,
+                    emitData));
+        }
+
+        EmitterManager emitterManager = new TikaConfig(tikaConfigPath).getEmitterManager();
+        for (int i = 0; i < numEmitterThreads; i++) {
+            executorCompletionService.submit(new AsyncEmitter(emitData, emitterManager));
+        }
+    }
+
+    public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, long offerMs)
+            throws AsyncRuntimeException, InterruptedException {
+        if (isShuttingDown) {
+            throw new IllegalStateException(
+                    "Can't call offer after calling close() or " + "shutdownNow()");
+        }
+        if (newFetchEmitTuples.size() > fetchEmitTupleQSize) {
+            throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
+                    fetchEmitTupleQSize);
+        }
+        long start = System.currentTimeMillis();
+        long elapsed = System.currentTimeMillis() - start;
+        while (elapsed < offerMs) {
+            if (fetchEmitTuples.remainingCapacity() > newFetchEmitTuples.size()) {
+                try {
+                    fetchEmitTuples.addAll(newFetchEmitTuples);
+                    return true;
+                } catch (IllegalStateException e) {
+                    e.printStackTrace();
+                    //this means that the add all failed because the queue couldn't
+                    //take the full list
+                }
+            }
+            Thread.sleep(100);
+            elapsed = System.currentTimeMillis() - start;
+        }
+        return false;
+    }
+
+    public int getCapacity() {
+        return fetchEmitTuples.remainingCapacity();
+    }
+
+    public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+            throws AsyncRuntimeException, InterruptedException {
+        if (fetchEmitTuples == null) {
+            throw new IllegalStateException("queue hasn't been initialized yet.");
+        } else if (isShuttingDown) {
+            throw new IllegalStateException(
+                    "Can't call offer after calling close() or " + "shutdownNow()");
+        }
+        checkActive();
+        return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
+    }
+
+    public boolean checkActive() {
+
+        Future<Integer> future = executorCompletionService.poll();
+        if (future != null) {
+            try {
+                Integer i = future.get();
+                if (i == PARSER_FUTURE_CODE) {
+                    numParserThreadsFinished++;
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+            finished++;
+        }
+        if (numParserThreadsFinished == numParserThreads && ! addedEmitterSemaphores) {
+            for (int i = 0; i < numEmitterThreads; i++) {
+                emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE);
+            }
+            addedEmitterSemaphores = true;
+        }
+        return finished != (numEmitterThreads + numParserThreads);
+    }
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdownNow();
+    }
+
+    private class FetchEmitWorker implements Callable<Integer> {
+
+        private final Path tikaConfigPath;
+        private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
+        private final ArrayBlockingQueue<EmitData> emitDataQueue;
+
+        private FetchEmitWorker(Path tikaConfigPath,
+                                ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples,
+                                ArrayBlockingQueue<EmitData> emitDataQueue) {
+            this.tikaConfigPath = tikaConfigPath;
+            this.fetchEmitTuples = fetchEmitTuples;
+            this.emitDataQueue = emitDataQueue;
+        }
+        @Override
+        public Integer call() throws Exception {
+
+            try (AsyncClient asyncClient = new AsyncClient(tikaConfigPath)) {
+                while (true) {
+                    FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
+                    if (t == null) {
+                        //skip
+                    } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                        return PARSER_FUTURE_CODE;
+                    } else {
+                        EmitData emitData = null;
+
+                        try {
+                            emitData = asyncClient.process(t);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                            continue;
+                        }
+                        if (emitData != null) {
+                            //TODO -- add timeout, this currently hangs forever
+                            emitDataQueue.offer(emitData);
+                        }
+                    }
+                    checkActive();
+                }
+            }
+        }
+    }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
similarity index 100%
copy from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
copy to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
new file mode 100644
index 0000000..cc1e14b
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.StringUtils;
+
+public class AsyncServer implements Runnable {
+
+    public static final byte ERROR = -1;
+
+    public static final byte DONE = 0;
+
+    public static final byte CALL = 1;
+
+    public static final byte PING = 2;
+
+    public static final byte RESOURCE = 3;
+
+    public static final byte READY = 4;
+
+    public static final byte FAILED_TO_START = 5;
+
+    public static final byte OOM = 6;
+
+    public static final byte TIMEOUT = 7;
+
+    private final Object[] lock = new Object[0];
+    private final Path tikaConfigPath;
+    private final DataInputStream input;
+    private final DataOutputStream output;
+    private final long serverParseTimeoutMillis;
+    private final long serverWaitTimeoutMillis;
+    private Parser parser;
+    private TikaConfig tikaConfig;
+    private volatile boolean parsing;
+    private volatile long since;
+
+    //logging is fussy...the logging frameworks grab stderr/stdout
+    //before we can redirect.  slf4j complains on stderr, log4j2 unconfigured writes to stdout
+    //We can add logging later but it has to be done carefully...
+    public AsyncServer(Path tikaConfigPath, InputStream in, PrintStream out,
+                       long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
+            throws IOException, TikaException, SAXException {
+        this.tikaConfigPath = tikaConfigPath;
+        this.input = new DataInputStream(in);
+        this.output = new DataOutputStream(out);
+        this.serverParseTimeoutMillis = serverParseTimeoutMillis;
+        this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
+        this.parsing = false;
+        this.since = System.currentTimeMillis();
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        Path tikaConfig = Paths.get(args[0]);
+        long serverParseTimeoutMillis = Long.parseLong(args[1]);
+        long serverWaitTimeoutMillis = Long.parseLong(args[2]);
+
+        AsyncServer server =
+                new AsyncServer(tikaConfig, System.in, System.out, serverParseTimeoutMillis,
+                        serverWaitTimeoutMillis);
+        System.setIn(new ByteArrayInputStream(new byte[0]));
+        System.setOut(System.err);
+
+        Thread watchdog = new Thread(server, "Tika Watchdog");
+        watchdog.setDaemon(true);
+        watchdog.start();
+
+        server.processRequests();
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                synchronized (lock) {
+                    long elapsed = System.currentTimeMillis() - since;
+                    if (parsing && elapsed > serverParseTimeoutMillis) {
+                        //LOG.error("timeout");
+                        System.exit(1);
+                    } else if (!parsing && serverWaitTimeoutMillis > 0 &&
+                            elapsed > serverWaitTimeoutMillis) {
+                        //LOG.info("closing down from inactivity");
+                        System.exit(0);
+                    }
+                }
+                Thread.sleep(250);
+            }
+        } catch (InterruptedException e) {
+            //swallow
+        }
+    }
+
+    public void processRequests() {
+        //initialize
+        try {
+            initializeParser();
+        } catch (Throwable t) {
+            t.printStackTrace();
+            System.err.flush();
+            try {
+                output.writeByte(FAILED_TO_START);
+                output.flush();
+            } catch (IOException e) {
+                e.printStackTrace();
+                System.err.flush();
+            }
+            return;
+        }
+        //main loop
+        try {
+            while (true) {
+                int request = input.read();
+                if (request == -1) {
+                    break;
+                } else if (request == PING) {
+                    output.writeByte(PING);
+                    output.flush();
+                } else if (request == CALL) {
+                    EmitData emitData = parseOne();
+                    write(emitData);
+                } else {
+                    throw new IllegalStateException("Unexpected request");
+                }
+                output.flush();
+            }
+        } catch (Throwable t) {
+            t.printStackTrace();
+        }
+        System.err.flush();
+    }
+
+    private void write(EmitData emitData) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+                objectOutputStream.writeObject(emitData);
+            }
+            byte[] bytes = bos.toByteArray();
+            int len = bytes.length;
+            output.write(READY);
+            output.writeInt(len);
+            output.write(bytes);
+            output.flush();
+        } catch (IOException e) {
+            e.printStackTrace();
+            //LOG.error("problem writing emit data", e);
+            exit(1);
+        }
+    }
+
+    private EmitData parseOne() throws FetchException {
+        synchronized (lock) {
+            parsing = true;
+            since = System.currentTimeMillis();
+        }
+        try {
+            FetchEmitTuple t = readFetchEmitTuple();
+            Metadata userMetadata = t.getMetadata();
+            Metadata metadata = new Metadata();
+            String fetcherName = t.getFetchKey().getFetcherName();
+            String fetchKey = t.getFetchKey().getFetchKey();
+            List<Metadata> metadataList = null;
+            Fetcher fetcher = null;
+            try {
+                fetcher = tikaConfig.getFetcherManager().getFetcher(fetcherName);
+            } catch (TikaException | IOException e) {
+                //LOG.error("can't get fetcher", e);
+                throw new FetchException(e);
+            }
+
+            try (InputStream stream = fetcher.fetch(fetchKey, metadata)) {
+                metadataList = parseMetadata(t, stream, metadata);
+            } catch (SecurityException e) {
+                throw e;
+            } catch (TikaException | IOException e) {
+                //LOG.error("problem reading from fetcher", e);
+                throw new FetchException(e);
+            } catch (OutOfMemoryError e) {
+                //LOG.error("oom", e);
+                handleOOM(e);
+            }
+
+            injectUserMetadata(userMetadata, metadataList);
+            EmitKey emitKey = t.getEmitKey();
+            if (StringUtils.isBlank(emitKey.getEmitKey())) {
+                emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+                t.setEmitKey(emitKey);
+            }
+            return new EmitData(t.getEmitKey(), metadataList);
+        } finally {
+            synchronized (lock) {
+                parsing = false;
+                since = System.currentTimeMillis();
+            }
+        }
+    }
+
+    private void handleOOM(OutOfMemoryError oom) {
+        try {
+            output.writeByte(OOM);
+            output.flush();
+        } catch (IOException e) {
+            //swallow at this point
+        }
+        exit(1);
+    }
+
+    private List<Metadata> parseMetadata(FetchEmitTuple fetchEmitTuple, InputStream stream,
+                                         Metadata metadata) {
+        //make these configurable
+        BasicContentHandlerFactory.HANDLER_TYPE type = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+
+
+        RecursiveParserWrapperHandler handler =
+                new RecursiveParserWrapperHandler(new BasicContentHandlerFactory(type,
+                        fetchEmitTuple.getHandlerConfig().getWriteLimit()),
+                        fetchEmitTuple.getHandlerConfig().getMaxEmbeddedResources(),
+                        tikaConfig.getMetadataFilter());
+        ParseContext parseContext = new ParseContext();
+        FetchKey fetchKey = fetchEmitTuple.getFetchKey();
+        try {
+            parser.parse(stream, handler, metadata, parseContext);
+        } catch (SAXException e) {
+            //LOG.warn("problem:" + fetchKey.getFetchKey(), e);
+        } catch (EncryptedDocumentException e) {
+            //LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
+        } catch (SecurityException e) {
+            //LOG.warn("security exception: " + fetchKey.getFetchKey());
+            throw e;
+        } catch (Exception e) {
+            //LOG.warn("exception: " + fetchKey.getFetchKey());
+        } catch (OutOfMemoryError e) {
+            //TODO, maybe return file type gathered so far and then crash?
+            //LOG.error("oom: " + fetchKey.getFetchKey());
+            throw e;
+        }
+        return handler.getMetadataList();
+    }
+
+    private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
+        for (String n : userMetadata.names()) {
+            //overwrite whatever was there
+            metadataList.get(0).set(n, null);
+            for (String val : userMetadata.getValues(n)) {
+                metadataList.get(0).add(n, val);
+            }
+        }
+    }
+
+
+    private void exit(int exitCode) {
+        System.exit(exitCode);
+    }
+
+
+    private FetchEmitTuple readFetchEmitTuple() {
+        try {
+            int length = input.readInt();
+            byte[] bytes = new byte[length];
+            input.readFully(bytes);
+            try (ObjectInputStream objectInputStream =
+                    new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+                return (FetchEmitTuple) objectInputStream.readObject();
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.err.flush();
+            //LOG.error("problem reading tuple", e);
+            System.exit(1);
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+            System.err.flush();
+            //LOG.error("can't find class?!", e);
+            System.exit(1);
+        }
+        //unreachable, no?!
+        return null;
+    }
+
+    private void initializeParser() throws TikaException, IOException, SAXException {
+        //TODO allowed named configurations in tika config
+        this.tikaConfig = new TikaConfig(tikaConfigPath);
+        Parser autoDetectParser = new AutoDetectParser(this.tikaConfig);
+        this.parser = new RecursiveParserWrapper(autoDetectParser);
+
+    }
+
+    private static class FetchException extends IOException {
+        FetchException(Throwable t) {
+            super(t);
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
similarity index 58%
rename from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
rename to tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
index 875cc90..da96c80 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
@@ -16,12 +16,26 @@
  */
 package org.apache.tika.pipes.async;
 
-/**
- * Fatal exception that means that something went seriously wrong.
- */
-public class AsyncRuntimeException extends RuntimeException {
+public class OfferLargerThanQueueSize extends IllegalArgumentException {
+    private final int sizeOffered;
+    private final int queueSize;
+
+    public OfferLargerThanQueueSize(int sizeOffered, int queueSize) {
+        this.sizeOffered = sizeOffered;
+        this.queueSize = queueSize;
+    }
+
+    @Override
+    public String getMessage() {
+        return "sizeOffered (" + sizeOffered + ") is greater than queue size (" +
+                queueSize + ")";
+    }
+
+    public int getQueueSize() {
+        return queueSize;
+    }
 
-    public AsyncRuntimeException(Throwable t) {
-        super(t);
+    public int getSizeOffered() {
+        return sizeOffered;
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
index b3fec7f..797de58 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -16,11 +16,16 @@
  */
 package org.apache.tika.pipes.emitter;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.tika.metadata.Metadata;
 
-public class EmitData {
+public class EmitData implements Serializable {
+    /**
+     * Serial version UID
+     */
+    private static final long serialVersionUID = -3861669115439125268L;
 
     private final EmitKey emitKey;
     private final List<Metadata> metadataList;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index b565a43..08d798d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -16,7 +16,14 @@
  */
 package org.apache.tika.pipes.emitter;
 
-public class EmitKey {
+import java.io.Serializable;
+
+public class EmitKey implements Serializable {
+
+    /**
+     * Serial version UID
+     */
+    private static final long serialVersionUID = -3861669115439125268L;
 
     private String emitterName;
     private String emitKey;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index 2c0ea64..090001e 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -16,11 +16,18 @@
  */
 package org.apache.tika.pipes.fetcher;
 
+import java.io.Serializable;
+
 /**
  * Pair of fetcherName (which fetcher to call) and the key
  * to send to that fetcher to retrieve a specific file.
  */
-public class FetchKey {
+public class FetchKey implements Serializable {
+    /**
+     * Serial version UID
+     */
+    private static final long serialVersionUID = -3861669115439125268L;
+
     private String fetcherName;
     private String fetchKey;
 
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
similarity index 52%
rename from tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
rename to tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 2408a63..7fbfef8 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -17,18 +17,14 @@
 package org.apache.tika.pipes.async;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.HashSet;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
@@ -38,68 +34,79 @@ import org.junit.Test;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.ProcessUtils;
 
 public class AsyncProcessorTest {
 
-    private Path dbDir;
-    private Path dbFile;
-    private Connection connection;
+    private final String OOM = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+            "<throw class=\"java.lang.OutOfMemoryError\">oom message</throw>\n</mock>";
+    private final String OK = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+            "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>" +
+            "<write element=\"p\">main_content</write>" + "</mock>";
     private Path tikaConfigPath;
+    private Path inputDir;
+    private final int totalFiles = 100;
+    private int ok = 0;
+    private int oom = 0;
 
     @Before
     public void setUp() throws SQLException, IOException {
-        dbDir = Files.createTempDirectory("async-db");
-        dbFile = dbDir.resolve("emitted-db");
-        String jdbc = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
-        String sql = "create table emitted (id int auto_increment primary key, " +
-                "emitkey varchar(2000), json varchar(20000))";
-
-        connection = DriverManager.getConnection(jdbc);
-        connection.createStatement().execute(sql);
-        tikaConfigPath = dbDir.resolve("tika-config.xml");
+        inputDir = Files.createTempDirectory("tika-async-");
+        tikaConfigPath = Files.createTempFile("tika-config-", ".xml");
         String xml = "" + "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" +
                 "  <emitters>" + "  <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
                 "    <params>\n" + "      <param name=\"name\" type=\"string\">mock</param>\n" +
-                "      <param name=\"jdbc\" type=\"string\">" + jdbc + "</param>\n" +
                 "    </params>" + "  </emitter>" + "  </emitters>" + "  <fetchers>" +
-                "    <fetcher class=\"org.apache.tika.pipes.async.MockFetcher\">" +
-                "      <param name=\"name\" type=\"string\">mock</param>\n" + "    </fetcher>" +
+                "    <fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+                "      <params><param name=\"name\" type=\"string\">mock</param>\n" +
+                "      <param name=\"basePath\" type=\"string\">" +
+                ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString())
+                + "</param></params>\n" +
+                "    </fetcher>" +
                 "  </fetchers>" + "</properties>";
         Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
+        Random r = new Random();
+        for (int i = 0; i < totalFiles; i++) {
+            if (r.nextFloat() < 0.1) {
+                Files.write(inputDir.resolve(i + ".xml"), OOM.getBytes(StandardCharsets.UTF_8));
+                oom++;
+            } else {
+                Files.write(inputDir.resolve(i + ".xml"), OK.getBytes(StandardCharsets.UTF_8));
+                ok++;
+            }
+        }
     }
 
     @After
     public void tearDown() throws SQLException, IOException {
-        connection.createStatement().execute("drop table emitted");
-        connection.close();
-        FileUtils.deleteDirectory(dbDir.toFile());
+        Files.delete(tikaConfigPath);
+        FileUtils.deleteDirectory(inputDir.toFile());
     }
 
     @Test
     public void testBasic() throws Exception {
-
-
-        AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
-        int max = 100;
-        for (int i = 0; i < max; i++) {
-            FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", "key-" + i),
+        AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+        for (int i = 0; i < totalFiles; i++) {
+            FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock",  i + ".xml"),
                     new EmitKey("mock", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
         }
+        for (int i = 0; i < 10; i++) {
+            processor.offer(FetchIterator.COMPLETED_SEMAPHORE, 1000);
+        }
+        //TODO clean this up
+        while (processor.checkActive()) {
+            Thread.sleep(100);
+        }
         processor.close();
-        String sql = "select emitkey from emitted";
         Set<String> emitKeys = new HashSet<>();
-        try (Statement st = connection.createStatement(); ResultSet rs = st.executeQuery(sql)) {
-            while (rs.next()) {
-                String emitKey = rs.getString(1);
-                emitKeys.add(emitKey);
-            }
-        }
-        assertEquals(max, emitKeys.size());
-        for (int i = 0; i < max; i++) {
-            assertTrue(emitKeys.contains("emit-" + i));
+        for (EmitData d : MockEmitter.EMIT_DATA) {
+            emitKeys.add(d.getEmitKey().getEmitKey());
         }
+        assertEquals(ok, emitKeys.size());
     }
 }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
new file mode 100644
index 0000000..1672b33
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+public class MockEmitter implements Emitter {
+
+    public MockEmitter() {
+    }
+
+    static ArrayBlockingQueue<EmitData> EMIT_DATA = new ArrayBlockingQueue<>(10000);
+
+    @Override
+    public String getName() {
+        return "mock";
+    }
+
+    @Override
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
+        emit(Collections
+                .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
+    }
+
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
+        int inserted = 0;
+        for (EmitData d : emitData) {
+            EMIT_DATA.offer(d);
+        }
+    }
+
+    public static List<EmitData> getData() {
+        return new ArrayList<>(EMIT_DATA);
+    }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
similarity index 100%
rename from tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
rename to tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 97eed63..8614090 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,7 +36,6 @@
     <module>tika-fetchers</module>
     <module>tika-emitters</module>
     <module>tika-fetch-iterators</module>
-    <module>tika-pipes-async</module>
     <module>tika-pipes-integration-tests</module>
   </modules>
 
diff --git a/tika-pipes/tika-pipes-async/pom.xml b/tika-pipes/tika-pipes-async/pom.xml
deleted file mode 100644
index 08d1ff2..0000000
--- a/tika-pipes/tika-pipes-async/pom.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xmlns="http://maven.apache.org/POM/4.0.0"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <groupId>org.apache.tika</groupId>
-    <artifactId>tika-pipes</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>tika-pipes-async</artifactId>
-  <packaging>pom</packaging>
-  <name>Apache Tika emitters</name>
-  <url>https://tika.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>tika-core</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>tika-serialization</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <version>${commons.io.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.h2database</groupId>
-      <artifactId>h2</artifactId>
-      <version>${h2.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>net.jpountz.lz4</groupId>
-      <artifactId>lz4</artifactId>
-      <version>1.3.0</version>
-    </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>tika-emitter-fs</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>tika-core</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <!--
-    <dependency>
-        <groupId>${project.groupId}</groupId>
-        <artifactId>tika-parsers-classic-package</artifactId>
-        <version>${project.version}</version>
-        <scope>test</scope>
-    </dependency>-->
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-
-</project>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
deleted file mode 100644
index 4321c2c..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-
-public class AsyncCli {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
-
-    public static void main(String[] args) throws Exception {
-        Path configPath = Paths.get(args[0]);
-        int maxConsumers = 20;
-        AsyncCli asyncCli = new AsyncCli();
-        Path dbDir = Files.createTempDirectory("tika-async-db-");
-        try {
-            asyncCli.execute(dbDir, configPath, maxConsumers);
-        } finally {
-            FileUtils.deleteDirectory(dbDir.toFile());
-        }
-
-    }
-
-    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
-        PreparedStatement findActiveWorkers =
-                connection.prepareStatement("select worker_id from workers");
-        List<Integer> workers = new ArrayList<>();
-        try (ResultSet rs = findActiveWorkers.executeQuery()) {
-            while (rs.next()) {
-                workers.add(rs.getInt(1));
-            }
-        }
-        return workers;
-    }
-
-    private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
-        TikaConfig tikaConfig = new TikaConfig(configPath);
-
-        String connectionString = setupTables(dbDir);
-
-        ExecutorService executorService = Executors.newFixedThreadPool(maxConsumers + 3);
-        ExecutorCompletionService<Integer> executorCompletionService =
-                new ExecutorCompletionService<>(executorService);
-
-        try (Connection connection = DriverManager.getConnection(connectionString)) {
-            FetchIterator fetchIterator = tikaConfig.getFetchIterator();
-            if (fetchIterator instanceof EmptyFetchIterator) {
-                throw new IllegalArgumentException("can't have empty fetch iterator");
-            }
-            ArrayBlockingQueue<FetchEmitTuple> q =
-                    new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
-            AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
-            executorCompletionService.submit(fetchIterator);
-            executorCompletionService.submit(enqueuer);
-            executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
-
-            for (int i = 0; i < maxConsumers; i++) {
-                executorCompletionService
-                        .submit(new AsyncWorker(connection, connectionString, i, configPath));
-            }
-            int completed = 0;
-            while (completed < maxConsumers + 3) {
-                Future<Integer> future = executorCompletionService.take();
-                if (future != null) {
-                    int val = future.get();
-                    completed++;
-                    LOG.debug("finished " + val);
-                }
-            }
-        } finally {
-            executorService.shutdownNow();
-        }
-    }
-
-    private String setupTables(Path dbDir) throws SQLException {
-        Path dbFile = dbDir.resolve("tika-async");
-        String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
-        Connection connection = DriverManager.getConnection(url);
-
-        String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
-                "status tinyint," + //byte
-                "worker_id integer," + "retry smallint," + //short
-                "time_stamp timestamp," + "json varchar(64000))";
-        connection.createStatement().execute(sql);
-        //no clear benefit to creating an index on timestamp
-//        sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
-        sql = "create table workers (worker_id int primary key)";
-        connection.createStatement().execute(sql);
-
-        sql = "create table workers_shutdown (worker_id int primary key)";
-        connection.createStatement().execute(sql);
-
-        sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
-                "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
-        connection.createStatement().execute(sql);
-
-        return url;
-    }
-
-    //this reads fetchemittuples from the queue and inserts them in the db
-    //for the workers to read
-    private static class AsyncTaskEnqueuer implements Callable<Integer> {
-        private final PreparedStatement insert;
-
-        private final ArrayBlockingQueue<FetchEmitTuple> queue;
-        private final Connection connection;
-        private final Random random = new Random();
-
-        private volatile boolean isComplete = false;
-
-        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
-                throws SQLException {
-            this.queue = queue;
-            this.connection = connection;
-            String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
-                    "values (?,CURRENT_TIMESTAMP(),?,?,?)";
-            insert = connection.prepareStatement(sql);
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            List<Integer> workers = new ArrayList<>();
-            while (true) {
-                FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
-                LOG.debug("enqueing to db " + t);
-                if (t == null) {
-                    //log.trace?
-                } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
-                    isComplete = true;
-                    return 1;
-                } else {
-                    long start = System.currentTimeMillis();
-                    long elapsed = System.currentTimeMillis() - start;
-                    //TODO -- fix this
-                    while (workers.size() == 0 && elapsed < 600000) {
-                        workers = getActiveWorkers(connection);
-                        Thread.sleep(100);
-                        elapsed = System.currentTimeMillis() - start;
-                    }
-                    insert(t, workers);
-                }
-            }
-        }
-
-        boolean isComplete() {
-            return isComplete;
-        }
-
-        private void insert(FetchEmitTuple t, List<Integer> workers)
-                throws IOException, SQLException {
-            int workerId = workers.size() == 1 ? workers.get(0) :
-                    workers.get(random.nextInt(workers.size()));
-            insert.clearParameters();
-            insert.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            insert.setInt(2, workerId);
-            insert.setShort(3, (short) 0);
-            insert.setString(4, JsonFetchEmitTuple.toJson(t));
-            insert.execute();
-        }
-    }
-
-    private static class AssignmentManager implements Callable {
-
-        private final Connection connection;
-        private final AsyncTaskEnqueuer enqueuer;
-        private final PreparedStatement getQueueDistribution;
-        private final PreparedStatement findMissingWorkers;
-        private final PreparedStatement allocateNonworkersToWorkers;
-        private final PreparedStatement reallocate;
-        private final PreparedStatement countAvailableTasks;
-        private final PreparedStatement insertWorkersShutdown;
-        private final Random random = new Random();
-
-
-        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer)
-                throws SQLException {
-            this.connection = connection;
-            this.enqueuer = enqueuer;
-            //this gets workers and # of tasks in desc order of number of tasks
-            String sql = "select w.worker_id, p.cnt " + "from workers w " +
-                    "left join (select worker_id, count(1) as cnt from task_queue " +
-                    "where status=0 group by worker_id)" +
-                    " p on p.worker_id=w.worker_id order by p.cnt desc";
-            getQueueDistribution = connection.prepareStatement(sql);
-            //find workers that have assigned tasks but are not in the
-            //workers table
-            sql = "select p.worker_id, count(1) as cnt from task_queue p " +
-                    "left join workers w on p.worker_id=w.worker_id " +
-                    "where w.worker_id is null group by p.worker_id";
-            findMissingWorkers = connection.prepareStatement(sql);
-
-            sql = "update task_queue set worker_id=? where worker_id=?";
-            allocateNonworkersToWorkers = connection.prepareStatement(sql);
-
-            //current strategy reallocate tasks from longest queue to shortest
-            //TODO: might consider randomly shuffling or other algorithms
-            sql = "update task_queue set worker_id= ? where id in " +
-                    "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
-                    "and status=0 for update)";
-            reallocate = connection.prepareStatement(sql);
-
-            sql = "select count(1) from task_queue where status=" +
-                    AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
-            countAvailableTasks = connection.prepareStatement(sql);
-
-            sql = "insert into workers_shutdown (worker_id) values (?)";
-            insertWorkersShutdown = connection.prepareStatement(sql);
-        }
-
-        @Override
-        public Integer call() throws Exception {
-
-            while (true) {
-                List<Integer> missingWorkers = getMissingWorkers();
-                reallocateFromMissingWorkers(missingWorkers);
-                redistribute();
-                if (isComplete()) {
-                    notifyWorkers();
-                    return 1;
-                }
-                Thread.sleep(100);
-            }
-        }
-
-        private void notifyWorkers() throws SQLException {
-            for (int workerId : getActiveWorkers(connection)) {
-                insertWorkersShutdown.clearParameters();
-                insertWorkersShutdown.setInt(1, workerId);
-                insertWorkersShutdown.execute();
-            }
-        }
-
-        private boolean isComplete() throws SQLException {
-            if (!enqueuer.isComplete) {
-                return false;
-            }
-            try (ResultSet rs = countAvailableTasks.executeQuery()) {
-                while (rs.next()) {
-                    return rs.getInt(1) == 0;
-                }
-            }
-            return false;
-        }
-
-        private void redistribute() throws SQLException {
-            //parallel lists of workerid = task queue size
-            List<Integer> workerIds = new ArrayList<>();
-            List<Integer> queueSize = new ArrayList<>();
-            int totalTasks = 0;
-
-            try (ResultSet rs = getQueueDistribution.executeQuery()) {
-                while (rs.next()) {
-                    int workerId = rs.getInt(1);
-                    int numTasks = rs.getInt(2);
-                    workerIds.add(workerId);
-                    queueSize.add(numTasks);
-                    LOG.debug("workerId: ({}) numTasks: ({})", workerId, numTasks);
-                    totalTasks += numTasks;
-                }
-            }
-            if (workerIds.size() == 0) {
-                return;
-            }
-            int averagePerWorker = Math.round((float) totalTasks / (float) workerIds.size());
-            int midPoint = Math.round((float) queueSize.size() / 2) + 1;
-            for (int i = queueSize.size() - 1, j = 0; i > midPoint && j < midPoint; i--, j++) {
-                int shortestQueue = queueSize.get(i);
-                int longestQueue = queueSize.get(j);
-                if ((shortestQueue < 5 && longestQueue > 5) ||
-                        longestQueue > 5 && longestQueue > (int) (1.5 * averagePerWorker)) {
-                    int shortestQueueWorker = workerIds.get(i);
-                    int longestQueueWorker = workerIds.get(j);
-                    reallocate.clearParameters();
-                    reallocate.setLong(1, shortestQueueWorker);
-                    reallocate.setLong(2, longestQueueWorker);
-                    reallocate.execute();
-                }
-            }
-
-        }
-
-        private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
-                throws SQLException {
-
-            if (missingWorkers.size() == 0) {
-                return;
-            }
-
-            List<Integer> activeWorkers = getActiveWorkers(connection);
-            if (activeWorkers.size() == 0) {
-                return;
-            }
-
-            for (int missing : missingWorkers) {
-                int active = activeWorkers.get(random.nextInt(activeWorkers.size()));
-                allocateNonworkersToWorkers.clearParameters();
-                allocateNonworkersToWorkers.setInt(1, active);
-                allocateNonworkersToWorkers.setInt(2, missing);
-                allocateNonworkersToWorkers.execute();
-                LOG.debug("allocating missing working ({}) to ({})", missing, active);
-            }
-        }
-
-        private List<Integer> getMissingWorkers() throws SQLException {
-            List<Integer> missingWorkers = new ArrayList<>();
-            try (ResultSet rs = findMissingWorkers.executeQuery()) {
-                while (rs.next()) {
-                    int workerId = rs.getInt(1);
-                    missingWorkers.add(workerId);
-                    LOG.debug("Worker ({}) no longer active", workerId);
-                }
-            }
-            return missingWorkers;
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
deleted file mode 100644
index 33964b3..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.apache.tika.utils.StringUtils;
-
-public class AsyncConfig {
-
-    private final int queueSize = 1000;
-    private final int numWorkers = 10;
-    private final int numEmitters = 1;
-    private String jdbcString;
-    private Path dbDir;
-
-    public static AsyncConfig load(Path p) throws IOException {
-        AsyncConfig asyncConfig = new AsyncConfig();
-
-        if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
-            asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
-            Path dbFile = asyncConfig.dbDir.resolve("tika-async");
-            asyncConfig.setJdbcString(
-                    "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE");
-        } else {
-            asyncConfig.dbDir = null;
-        }
-        return asyncConfig;
-    }
-
-    public int getQueueSize() {
-        return queueSize;
-    }
-
-    public int getNumWorkers() {
-        return numWorkers;
-    }
-
-    public int getNumEmitters() {
-        return numEmitters;
-    }
-
-    public String getJdbcString() {
-        return jdbcString;
-    }
-
-    public void setJdbcString(String jdbcString) {
-        this.jdbcString = jdbcString;
-    }
-
-    /**
-     * If no jdbc connection was specified, this
-     * dir contains the h2 database.  Otherwise, null.
-     *
-     * @return
-     */
-    public Path getTempDBDir() {
-        return dbDir;
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
deleted file mode 100644
index b34f872..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.util.List;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-
-public class AsyncData extends EmitData {
-
-    private final long taskId;
-    private final FetchKey fetchKey;
-    private final FetchEmitTuple.ON_PARSE_EXCEPTION onParseException;
-
-    public AsyncData(@JsonProperty("taskId") long taskId,
-                     @JsonProperty("fetchKey") FetchKey fetchKey,
-                     @JsonProperty("emitKey") EmitKey emitKey, @JsonProperty("onParseException")
-                             FetchEmitTuple.ON_PARSE_EXCEPTION onParseException,
-                     @JsonProperty("metadataList") List<Metadata> metadataList) {
-        super(emitKey, metadataList);
-        this.taskId = taskId;
-        this.fetchKey = fetchKey;
-        this.onParseException = onParseException;
-    }
-
-    public FetchKey getFetchKey() {
-        return fetchKey;
-    }
-
-    public long getTaskId() {
-        return taskId;
-    }
-
-    public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
-        return onParseException;
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
deleted file mode 100644
index 4c44f9c..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AsyncEmitter implements Callable<Integer> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
-
-
-    private final String connectionString;
-    private final int emitterId;
-    private final Path tikaConfigPath;
-    private final Connection connection;
-    private final PreparedStatement finished;
-    private final PreparedStatement restarting;
-
-    public AsyncEmitter(Connection connection, String connectionString, int emitterId,
-                        Path tikaConfigPath) throws SQLException {
-        this.connectionString = connectionString;
-        this.emitterId = emitterId;
-        this.tikaConfigPath = tikaConfigPath;
-        this.connection = connection;
-        String sql = "update emitters set status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
-                " where emitter_id = (" + emitterId + ")";
-        finished = connection.prepareStatement(sql);
-
-        sql = "update emitters set status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
-                " where emitter_id = (" + emitterId + ")";
-        restarting = connection.prepareStatement(sql);
-    }
-
-    @Override
-    public Integer call() throws Exception {
-        Process p = null;
-        try {
-            p = start();
-            int restarts = 0;
-            while (true) {
-                boolean finished = p.waitFor(60, TimeUnit.SECONDS);
-                if (finished) {
-                    int exitValue = p.exitValue();
-                    if (exitValue == 0) {
-                        LOG.debug("forked emitter process finished with exitValue=0");
-                        return 1;
-                    }
-                    reportCrash(++restarts, exitValue);
-                    p = start();
-                }
-            }
-        } finally {
-            if (p != null) {
-                p.destroyForcibly();
-            }
-            finished.execute();
-        }
-    }
-
-    private Process start() throws IOException {
-        String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
-                System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncEmitterProcess", Integer.toString(emitterId)};
-        ProcessBuilder pb = new ProcessBuilder(args);
-        pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
-        pb.environment()
-                .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
-        pb.inheritIO();
-        return pb.start();
-    }
-
-    private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
-        LOG.warn("emitter id={} terminated, exitValue={}", emitterId, exitValue);
-        restarting.execute();
-        //should we unassign emit tasks here?
-    }
-
- /*
-    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
-        //if not, this is called to insert into the error log
-        return connection.prepareStatement(
-                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
-                        " values (?,?,CURRENT_TIMESTAMP(),?,?)"
-        );
-    }
-
-    static PreparedStatement prepareReset(Connection connection) throws SQLException {
-        //and this is called to reset the status on error
-        return connection.prepareStatement(
-                "update task_queue set " +
-                        "status=?, " +
-                        "time_stamp=CURRENT_TIMESTAMP(), " +
-                        "retry=? " +
-                        "where id=?");
-    }*/
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
deleted file mode 100644
index 03f5b93..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Paths;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-
-public class AsyncEmitterProcess {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitterProcess.class);
-    private final LZ4FastDecompressor decompressor =
-            LZ4Factory.fastestInstance().fastDecompressor();
-    private final ObjectMapper objectMapper = new ObjectMapper();
-    private final FutureTask<Integer> stdinWatcher;
-    int recordsPerPulse = 10;
-    //TODO -- parameterize these
-    private final long emitWithinMs = 10000;
-    private final long emitMaxBytes = 10_000_000;
-    private PreparedStatement markForSelecting;
-    private PreparedStatement selectForProcessing;
-    private PreparedStatement emitStatusUpdate;
-    private PreparedStatement checkForCanShutdown;
-    private PreparedStatement checkForShouldShutdown;
-    private PreparedStatement updateEmitterStatus;
-
-    private AsyncEmitterProcess(InputStream stdin) {
-        SimpleModule module = new SimpleModule();
-        module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
-        objectMapper.registerModule(module);
-        stdinWatcher = new FutureTask<>(new ForkWatcher(stdin));
-        new Thread(stdinWatcher).start();
-    }
-
-    public static void main(String[] args) throws Exception {
-        String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
-        TikaConfig tikaConfig =
-                new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
-        int workerId = Integer.parseInt(args[0]);
-        LOG.debug("trying to get connection {} >{}<", workerId, db);
-        try (Connection connection = DriverManager.getConnection(db)) {
-            AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess(System.in);
-            asyncEmitter.execute(connection, workerId, tikaConfig);
-        }
-        System.exit(0);
-    }
-
-    private static void reportEmitStatus(List<Long> ids,
-                                         AsyncWorkerProcess.TASK_STATUS_CODES emitted,
-                                         PreparedStatement emitStatusUpdate) throws SQLException {
-        for (long id : ids) {
-            emitStatusUpdate.clearParameters();
-            emitStatusUpdate.setByte(1, (byte) emitted.ordinal());
-            emitStatusUpdate.setLong(2, id);
-            emitStatusUpdate.addBatch();
-        }
-        emitStatusUpdate.executeBatch();
-    }
-
-    private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
-            throws SQLException, InterruptedException {
-        prepareStatements(connection, workerId);
-        updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.ACTIVE.ordinal());
-        EmitterManager emitterManager = tikaConfig.getEmitterManager();
-        EmitDataCache emitDataCache =
-                new EmitDataCache(emitterManager, emitMaxBytes, emitStatusUpdate);
-        try {
-            mainLoop(emitDataCache);
-        } finally {
-            emitDataCache.emitAll();
-            updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal());
-        }
-    }
-
-    private void mainLoop(EmitDataCache emitDataCache) throws InterruptedException, SQLException {
-
-        while (true) {
-            if (shouldShutdown()) {
-                LOG.debug("received should shutdown signal");
-                return;
-            }
-            int toEmit = markForSelecting.executeUpdate();
-            if (toEmit == 0 && canShutdown()) {
-                //avoid race condition; double check there's nothing
-                //left to emit
-                toEmit = markForSelecting.executeUpdate();
-                if (toEmit == 0) {
-                    LOG.debug("received can shutdown and didn't update any for selecting");
-                    return;
-                }
-            }
-            if (toEmit > 0) {
-                try {
-                    tryToEmitNextBatch(emitDataCache);
-                } catch (IOException e) {
-                    LOG.warn("IOException trying to emit", e);
-                }
-            }
-            if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
-                emitDataCache.emitAll();
-            }
-            Thread.sleep(500);
-        }
-    }
-
-    private void tryToEmitNextBatch(EmitDataCache emitDataCache) throws IOException, SQLException {
-        List<AsyncEmitTuple> toEmitList = new ArrayList<>();
-        try (ResultSet rs = selectForProcessing.executeQuery()) {
-            while (rs.next()) {
-                long id = rs.getLong(1);
-                Timestamp ts = rs.getTimestamp(2);
-                int uncompressedSize = rs.getInt(3);
-                Blob blob = rs.getBlob(4);
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                IOUtils.copyLarge(blob.getBinaryStream(), bos);
-                byte[] bytes = bos.toByteArray();
-                toEmitList.add(new AsyncEmitTuple(id, ts, uncompressedSize, bytes));
-            }
-        }
-        List<Long> successes = new ArrayList<>();
-        List<Long> exceptions = new ArrayList<>();
-        for (AsyncEmitTuple tuple : toEmitList) {
-            try {
-                tryToEmit(tuple, emitDataCache);
-                successes.add(tuple.id);
-            } catch (IOException | SQLException e) {
-                exceptions.add(tuple.id);
-            }
-        }
-        reportEmitStatus(successes, AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS_EMIT,
-                emitStatusUpdate);
-        reportEmitStatus(exceptions, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
-                emitStatusUpdate);
-
-    }
-
-    private void updateStatus(byte status) throws SQLException {
-        updateEmitterStatus.clearParameters();
-        updateEmitterStatus.setByte(1, status);
-        updateEmitterStatus.executeUpdate();
-    }
-
-    private void tryToEmit(AsyncEmitTuple asyncEmitTuple, EmitDataCache emitDataCache)
-            throws SQLException, IOException {
-        AsyncData asyncData = deserialize(asyncEmitTuple.bytes, asyncEmitTuple.uncompressedSize);
-        emitDataCache.add(asyncData);
-    }
-
-    boolean shouldShutdown() throws SQLException {
-        if (stdinWatcher.isDone()) {
-            LOG.info("parent inputstream closed; shutting down now");
-        }
-        try (ResultSet rs = checkForShouldShutdown.executeQuery()) {
-            if (rs.next()) {
-                int val = rs.getInt(1);
-                return val > 0;
-            }
-        }
-        return false;
-    }
-
-    boolean canShutdown() throws SQLException {
-        try (ResultSet rs = checkForCanShutdown.executeQuery()) {
-            if (rs.next()) {
-                int val = rs.getInt(1);
-                return val > 0;
-            }
-        }
-        return false;
-    }
-
-    private void prepareStatements(Connection connection, int workerId) throws SQLException {
-        String sql = "update task_queue set status=" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + ", worker_id=" +
-                workerId + ", time_stamp=CURRENT_TIMESTAMP()" + " where id in " +
-                " (select id from task_queue " + //where worker_id = " + workerId +
-                " where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
-                " order by time_stamp asc limit " + recordsPerPulse + " for update)";
-        markForSelecting = connection.prepareStatement(sql);
-
-        sql = "select q.id, q.time_stamp, uncompressed_size, bytes " + "from emits e " +
-                "join task_queue q on e.id=q.id " + "where q.status=" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + " and worker_id=" +
-                workerId + " order by time_stamp asc";
-        selectForProcessing = connection.prepareStatement(sql);
-
-        //only update the status if it is not already emitted or failed emit
-        sql = "update task_queue set status=?" + ", time_stamp=CURRENT_TIMESTAMP()" +
-                " where id=? and status not in (" +
-                AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED.ordinal() + ", " +
-                AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT.ordinal() + ")";
-
-        emitStatusUpdate = connection.prepareStatement(sql);
-
-        sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
-        checkForCanShutdown = connection.prepareStatement(sql);
-
-        sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
-        checkForShouldShutdown = connection.prepareStatement(sql);
-
-        sql = "merge into emitters key (emitter_id) " + "values (" + workerId + ", ? )";
-        updateEmitterStatus = connection.prepareStatement(sql);
-    }
-
-    private AsyncData deserialize(byte[] compressed, int decompressedLength) throws IOException {
-        byte[] restored = new byte[decompressedLength];
-        int compressedLength2 =
-                decompressor.decompress(compressed, 0, restored, 0, decompressedLength);
-        return objectMapper.readerFor(AsyncData.class).readValue(restored);
-    }
-
-    private static class EmitDataCache {
-        private final EmitterManager emitterManager;
-        private final long maxBytes;
-        private final PreparedStatement emitStatusUpdate;
-        long estimatedSize = 0;
-        int size = 0;
-        Map<String, List<AsyncData>> map = new HashMap<>();
-        private Instant lastAdded = Instant.now();
-
-        public EmitDataCache(EmitterManager emitterManager, long maxBytes,
-                             PreparedStatement emitStatusUpdate) {
-            this.emitterManager = emitterManager;
-            this.maxBytes = maxBytes;
-            this.emitStatusUpdate = emitStatusUpdate;
-        }
-
-        void updateEstimatedSize(long newBytes) {
-            estimatedSize += newBytes;
-        }
-
-        void add(AsyncData data) {
-
-            size++;
-            long sz = AbstractEmitter
-                    .estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
-            if (estimatedSize + sz > maxBytes) {
-                LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
-                        (estimatedSize + sz), maxBytes);
-                emitAll();
-            }
-            List<AsyncData> cached = map.get(data.getEmitKey().getEmitterName());
-            if (cached == null) {
-                cached = new ArrayList<>();
-                map.put(data.getEmitKey().getEmitterName(), cached);
-            }
-            updateEstimatedSize(sz);
-            cached.add(data);
-            lastAdded = Instant.now();
-        }
-
-        private void emitAll() {
-            int emitted = 0;
-            LOG.debug("about to emit {}", size);
-            for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
-                Emitter emitter = emitterManager.getEmitter(e.getKey());
-
-                try {
-                    tryToEmit(emitter, e.getValue());
-                } catch (SQLException ex) {
-                    ex.printStackTrace();
-                }
-                emitted += e.getValue().size();
-            }
-            LOG.debug("emitted: {}", emitted);
-            estimatedSize = 0;
-            size = 0;
-            map.clear();
-        }
-
-        private long tryToEmit(Emitter emitter, List<AsyncData> cachedEmitData)
-                throws SQLException {
-            List<Long> ids = new ArrayList<>();
-            for (AsyncData d : cachedEmitData) {
-                ids.add(d.getTaskId());
-            }
-            try {
-                emitter.emit(cachedEmitData);
-            } catch (IOException | TikaEmitterException e) {
-                LOG.warn("emitter class ({}): {}", emitter.getClass(),
-                        ExceptionUtils.getStackTrace(e));
-                reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
-                        emitStatusUpdate);
-            }
-            reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED, emitStatusUpdate);
-            return 1;
-        }
-
-
-        public boolean exceedsEmitWithin(long emitWithinMs) {
-            return ChronoUnit.MILLIS.between(lastAdded, Instant.now()) > emitWithinMs;
-        }
-    }
-
-    private static class ForkWatcher implements Callable<Integer> {
-        private final InputStream in;
-
-        public ForkWatcher(InputStream in) {
-            this.in = in;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            //this should block forever
-            //if the forking process dies,
-            // this will either throw an IOException or read -1.
-            try {
-                int i = in.read();
-            } finally {
-                LOG.warn("forking process shutdown; exiting now");
-                System.exit(0);
-            }
-            return 1;
-        }
-    }
-
-    private static class AsyncEmitTuple {
-        final long id;
-        final Timestamp timestamp;
-        final int uncompressedSize;
-        final byte[] bytes;
-
-        public AsyncEmitTuple(long id, Timestamp timestamp, int uncompressedSize, byte[] bytes) {
-            this.id = id;
-            this.timestamp = timestamp;
-            this.uncompressedSize = uncompressedSize;
-            this.bytes = bytes;
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
deleted file mode 100644
index 79ed80d..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AsyncPipesEmitHook implements AsyncEmitHook {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncPipesEmitHook.class);
-
-    private final PreparedStatement markSuccess;
-    private final PreparedStatement markFailure;
-
-    public AsyncPipesEmitHook(Connection connection) throws SQLException {
-        String sql = "delete from task_queue where id=?";
-        markSuccess = connection.prepareStatement(sql);
-        //TODO --fix this
-        markFailure = connection.prepareStatement(sql);
-    }
-
-    @Override
-    public void onSuccess(AsyncTask task) {
-        try {
-            markSuccess.clearParameters();
-            markSuccess.setLong(1, task.getTaskId());
-            markSuccess.execute();
-        } catch (SQLException e) {
-            LOG.warn("problem with on success: " + task.getTaskId(), e);
-        }
-    }
-
-    @Override
-    public void onFail(AsyncTask task) {
-        try {
-            markFailure.clearParameters();
-            markFailure.setLong(1, task.getTaskId());
-            markFailure.execute();
-        } catch (SQLException e) {
-            LOG.warn("problem with on fail: " + task.getTaskId(), e);
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
deleted file mode 100644
index c718a48..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-
-public class AsyncProcessor implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
-    protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
-    protected static String TIKA_ASYNC_CONFIG_FILE_KEY = "TIKA_ASYNC_CONFIG_FILE_KEY";
-    private final Path tikaConfigPath;
-    private final ArrayBlockingQueue<FetchEmitTuple> queue;
-    private final Connection connection;
-    private final int totalThreads;
-    private final AsyncConfig asyncConfig;
-    private PreparedStatement emittersCanShutdown;
-    private volatile boolean isShuttingDown = false;
-    private AssignmentManager assignmentManager;
-    private int finishedThreads = 0;
-    private ExecutorService executorService;
-    private ExecutorCompletionService<Integer> executorCompletionService;
-
-    private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
-        this.tikaConfigPath = tikaConfigPath;
-        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
-        this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
-        this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
-        this.totalThreads = asyncConfig.getNumWorkers() + asyncConfig.getNumEmitters() +
-                2;//assignment manager and enqueuer threads
-    }
-
-    public static AsyncProcessor build(Path tikaConfigPath) throws AsyncRuntimeException {
-        try {
-            AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
-            processor.init();
-            return processor;
-        } catch (SQLException | IOException e) {
-            throw new AsyncRuntimeException(e);
-        }
-    }
-
-    private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
-        PreparedStatement findActiveWorkers =
-                connection.prepareStatement("select worker_id from workers");
-        List<Integer> workers = new ArrayList<>();
-        try (ResultSet rs = findActiveWorkers.executeQuery()) {
-            while (rs.next()) {
-                workers.add(rs.getInt(1));
-            }
-        }
-        return workers;
-    }
-
-    public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs)
-            throws AsyncRuntimeException, InterruptedException {
-        if (queue == null) {
-            throw new IllegalStateException("queue hasn't been initialized yet.");
-        } else if (isShuttingDown) {
-            throw new IllegalStateException(
-                    "Can't call offer after calling close() or " + "shutdownNow()");
-        }
-        long start = System.currentTimeMillis();
-        long elapsed = System.currentTimeMillis() - start;
-        while (elapsed < offerMs) {
-            checkActive();
-            if (queue.remainingCapacity() > fetchEmitTuples.size()) {
-                try {
-                    queue.addAll(fetchEmitTuples);
-                    return true;
-                } catch (IllegalStateException e) {
-                    //swallow
-                }
-            }
-            Thread.sleep(100);
-            elapsed = System.currentTimeMillis() - start;
-        }
-        return false;
-    }
-
-    public synchronized boolean offer(FetchEmitTuple t, long offerMs)
-            throws AsyncRuntimeException, InterruptedException {
-        if (queue == null) {
-            throw new IllegalStateException("queue hasn't been initialized yet.");
-        } else if (isShuttingDown) {
-            throw new IllegalStateException(
-                    "Can't call offer after calling close() or " + "shutdownNow()");
-        }
-        checkActive();
-        return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * This polls the executorcompletionservice to check for execution exceptions
-     * and to make sure that some threads are still active.
-     *
-     * @return
-     * @throws AsyncRuntimeException
-     * @throws InterruptedException
-     */
-    public synchronized boolean checkActive() throws AsyncRuntimeException, InterruptedException {
-        Future<Integer> future = executorCompletionService.poll();
-        if (future != null) {
-            try {
-                future.get();
-            } catch (ExecutionException e) {
-                throw new AsyncRuntimeException(e);
-            }
-            finishedThreads++;
-        }
-        return finishedThreads != totalThreads;
-    }
-
-    private void init() throws SQLException {
-
-        setupTables();
-        String sql = "update emitters set status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
-        this.emittersCanShutdown = connection.prepareStatement(sql);
-        executorService = Executors.newFixedThreadPool(totalThreads);
-        executorCompletionService = new ExecutorCompletionService<>(executorService);
-
-        AsyncTaskEnqueuer taskEnqueuer = new AsyncTaskEnqueuer(queue, connection);
-
-        executorCompletionService.submit(taskEnqueuer);
-
-        List<AsyncWorker> workers = buildWorkers(connection, asyncConfig, tikaConfigPath);
-        int maxRetries = 0;
-        for (AsyncWorker worker : workers) {
-            if (worker.getMaxRetries() > maxRetries) {
-                maxRetries = worker.getMaxRetries();
-            }
-            executorCompletionService.submit(worker);
-        }
-        assignmentManager = new AssignmentManager(connection, taskEnqueuer, maxRetries);
-        executorCompletionService.submit(assignmentManager);
-        for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
-            executorCompletionService
-                    .submit(new AsyncEmitter(connection, asyncConfig.getJdbcString(),
-                            asyncConfig.getNumWorkers() + i, tikaConfigPath));
-        }
-    }
-
-    private List<AsyncWorker> buildWorkers(Connection connection, AsyncConfig asyncConfig,
-                                           Path tikaConfigPath) throws SQLException {
-        //TODO -- make these workers configurable via the tika config, e.g. max retries
-        //and jvm args, etc.
-        List<AsyncWorker> workers = new ArrayList<>();
-        for (int i = 0; i < asyncConfig.getNumWorkers(); i++) {
-            workers.add(
-                    new AsyncWorker(connection, asyncConfig.getJdbcString(), i, tikaConfigPath));
-        }
-        return workers;
-    }
-
-    private void setupTables() throws SQLException {
-
-        String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
-                "status tinyint," + //byte
-                "worker_id integer," + "retry smallint," + //short
-                "time_stamp timestamp," +
-                "json varchar(64000))";//this is the AsyncTask ... not the emit data!
-        try (Statement st = connection.createStatement()) {
-            st.execute(sql);
-        }
-        sql = "create table workers (worker_id int primary key, status tinyint)";
-        try (Statement st = connection.createStatement()) {
-            st.execute(sql);
-        }
-
-        sql = "create table emitters (emitter_id int primary key, status tinyint)";
-        try (Statement st = connection.createStatement()) {
-            st.execute(sql);
-        }
-
-        sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
-                "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
-
-        try (Statement st = connection.createStatement()) {
-            st.execute(sql);
-        }
-
-        sql = "create table emits (" + "id bigint primary key, " + "time_stamp timestamp, " +
-                "uncompressed_size bigint, " + "bytes blob)";
-        try (Statement st = connection.createStatement()) {
-            st.execute(sql);
-        }
-    }
-
-    public void shutdownNow() throws IOException, AsyncRuntimeException {
-        isShuttingDown = true;
-        try {
-            executorService.shutdownNow();
-        } finally {
-            //close down processes and db
-            if (asyncConfig.getTempDBDir() != null) {
-                FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
-            }
-        }
-    }
-
-    /**
-     * This is a blocking close.  It will wait for all tasks successfully submitted before this
-     * call to close() to complete before closing.  If you need to shutdown immediately, try
-     * {@link #shutdownNow()}.
-     *
-     * @throws IOException
-     */
-    @Override
-    public void close() throws IOException {
-        isShuttingDown = true;
-        try {
-            completeAndShutdown();
-        } catch (SQLException | InterruptedException e) {
-            throw new IOException(e);
-        } finally {
-            executorService.shutdownNow();
-            SQLException ex = null;
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                ex = e;
-            }
-            //close down processes and db
-            if (asyncConfig.getTempDBDir() != null) {
-                FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
-            }
-            if (ex != null) {
-                throw new IOException(ex);
-            }
-        }
-    }
-
-    //this will block until everything finishes
-    private void completeAndShutdown() throws SQLException, InterruptedException {
-
-        //blocking...notify taskEnqueuer
-        queue.put(FetchIterator.COMPLETED_SEMAPHORE);
-
-        //wait for assignmentManager to finish
-        //it will only complete after the task enqueuer has completed
-        //and there are no more parse tasks available, selected or in process
-        while (!assignmentManager.hasCompletedTasks()) {
-            Thread.sleep(100);
-        }
-
-        emittersCanShutdown.executeUpdate();
-
-        //wait for emitters to finish
-        long start = System.currentTimeMillis();
-        long elapsed = System.currentTimeMillis() - start;
-        try {
-            boolean isActive = checkActive();
-            while (isActive) {
-                isActive = checkActive();
-                elapsed = System.currentTimeMillis();
-            }
-        } catch (InterruptedException e) {
-            return;
-        }
-    }
-
-    //this reads fetchemittuples from the queue and inserts them in the db
-    //for the workers to read
-    private static class AsyncTaskEnqueuer implements Callable<Integer> {
-        private final PreparedStatement insert;
-
-        private final ArrayBlockingQueue<FetchEmitTuple> queue;
-        private final Connection connection;
-        private final Random random = new Random();
-
-        private volatile boolean isComplete = false;
-
-        AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
-                throws SQLException {
-            this.queue = queue;
-            this.connection = connection;
-            String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
-                    "values (?,CURRENT_TIMESTAMP(),?,?,?)";
-            insert = connection.prepareStatement(sql);
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            List<Integer> workers = new ArrayList<>();
-            while (true) {
-                FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
-                LOG.debug("enqueing to db " + t);
-                if (t == null) {
-                    //log.trace?
-                } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
-                    isComplete = true;
-                    return 1;
-                } else {
-                    long start = System.currentTimeMillis();
-                    long elapsed = System.currentTimeMillis() - start;
-                    //TODO -- fix this -- this loop waits for workers to register
-                    while (workers.size() == 0 && elapsed < 600000) {
-                        workers = getActiveWorkers(connection);
-                        Thread.sleep(100);
-                        elapsed = System.currentTimeMillis() - start;
-                    }
-                    insert(t, workers);
-                }
-            }
-        }
-
-        boolean isComplete() {
-            return isComplete;
-        }
-
-        private void insert(FetchEmitTuple t, List<Integer> workers)
-                throws IOException, SQLException {
-            int workerId = workers.size() == 1 ? workers.get(0) :
-                    workers.get(random.nextInt(workers.size()));
-            insert.clearParameters();
-            insert.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            insert.setInt(2, workerId);
-            insert.setShort(3, (short) 0);
-            insert.setString(4, JsonFetchEmitTuple.toJson(t));
-            insert.execute();
-        }
-    }
-
-    private static class AssignmentManager implements Callable {
-
-        private final Connection connection;
-        private final AsyncTaskEnqueuer enqueuer;
-        private final PreparedStatement getQueueDistribution;
-        private final PreparedStatement findMissingWorkers;
-        private final PreparedStatement allocateNonworkersToWorkers;
-        private final PreparedStatement reallocate;
-        private final PreparedStatement countAvailableTasks;
-        private final PreparedStatement shutdownWorker;
-        private final PreparedStatement findMaxRetrieds;
-        private final PreparedStatement logMaxRetrieds;
-        private final PreparedStatement removeMaxRetrieds;
-        private final Random random = new Random();
-        private final int maxRetries;
-        private volatile boolean hasCompleted = false;
-
-
-        public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer, int maxRetries)
-                throws SQLException {
-            this.connection = connection;
-            this.enqueuer = enqueuer;
-            this.maxRetries = maxRetries;
-            //this gets workers and # of tasks in desc order of number of tasks
-            String sql = "select w.worker_id, p.cnt " + "from workers w " +
-                    "left join (select worker_id, count(1) as cnt from task_queue " +
-                    "where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() +
-                    " group by worker_id)" + " p on p.worker_id=w.worker_id order by p.cnt desc";
-            getQueueDistribution = connection.prepareStatement(sql);
-            //find workers that have assigned tasks but are not in the
-            //workers table
-            sql = "select p.worker_id, count(1) as cnt from task_queue p " +
-                    "left join workers w on p.worker_id=w.worker_id " +
-                    "where w.worker_id is null group by p.worker_id";
-            findMissingWorkers = connection.prepareStatement(sql);
-
-            sql = "update task_queue set worker_id=? where worker_id=?";
-            allocateNonworkersToWorkers = connection.prepareStatement(sql);
-
-            //current strategy reallocate tasks from longest queue to shortest
-            //TODO: might consider randomly shuffling or other algorithms
-            sql = "update task_queue set worker_id= ? where id in " +
-                    "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
-                    "and status=0 for update)";
-            reallocate = connection.prepareStatement(sql);
-
-            //get those tasks that are in the parse phase
-            //if they are selected or in process, it is possible that
-            //they'll need to be retried.  So, include all statuses
-            //meaning that the parse has not completed.
-            sql = "select count(1) from task_queue where status in (" +
-                    AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() + ", " +
-                    AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED.ordinal() + ", " +
-                    AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal() + ")";
-            countAvailableTasks = connection.prepareStatement(sql);
-
-            sql = "update workers set status=" +
-                    AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
-                    " where worker_id = ?";
-            shutdownWorker = connection.prepareStatement(sql);
-
-            sql = "select id, retry, json from task_queue where retry >=" + maxRetries;
-            findMaxRetrieds = connection.prepareStatement(sql);
-
-            sql = "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code)" +
-                    "values (?,?,CURRENT_TIMESTAMP(), ?," +
-                    AsyncWorkerProcess.ERROR_CODES.MAX_RETRIES.ordinal() + ")";
-            logMaxRetrieds = connection.prepareStatement(sql);
-
-            sql = "delete from task_queue where id=?";
-            removeMaxRetrieds = connection.prepareStatement(sql);
-        }
-
-        protected boolean hasCompletedTasks() {
-            return hasCompleted;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-
-            while (true) {
-                removeMaxRetrieds();
-                List<Integer> missingWorkers = getMissingWorkers();
-                reallocateFromMissingWorkers(missingWorkers);
-                redistribute();
-                if (isComplete()) {
-                    notifyWorkers();
-                    return 1;
-                }
-                Thread.sleep(200);
-            }
-        }
-
-        private void removeMaxRetrieds() throws SQLException {
-            Set<Long> toRemove = new HashSet<>();
-            try (ResultSet rs = findMaxRetrieds.executeQuery()) {
-                while (rs.next()) {
-                    long id = rs.getLong(1);
-                    String json = rs.getString(2);
-                    int retries = rs.getInt(3);
-                    toRemove.add(id);
-                    FetchEmitTuple t;
-                    try (Reader reader = new StringReader(json)) {
-                        t = JsonFetchEmitTuple.fromJson(reader);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        //need to log this in the error_logs table
-                        continue;
-                    }
-                    logMaxRetrieds.clearParameters();
-                    logMaxRetrieds.setLong(1, id);
-                    logMaxRetrieds.setString(2, t.getFetchKey().getFetchKey());
-                    logMaxRetrieds.setInt(3, retries);
-                    logMaxRetrieds.addBatch();
-                }
-            }
-            logMaxRetrieds.executeBatch();
-
-            for (Long id : toRemove) {
-                removeMaxRetrieds.clearParameters();
-                removeMaxRetrieds.setLong(1, id);
-                removeMaxRetrieds.addBatch();
-            }
-            removeMaxRetrieds.executeBatch();
-        }
-
-        private void notifyWorkers() throws SQLException {
-            for (int workerId : getActiveWorkers(connection)) {
-                shutdownWorker.clearParameters();
-                shutdownWorker.setInt(1, workerId);
-                shutdownWorker.execute();
-            }
-        }
-
-        private boolean isComplete() throws SQLException {
-            if (hasCompleted) {
-                return hasCompleted;
-            }
-            if (!enqueuer.isComplete) {
-                return false;
-            }
-            try (ResultSet rs = countAvailableTasks.executeQuery()) {
-                while (rs.next()) {
-                    int availTasks = rs.getInt(1);
-                    if (availTasks == 0) {
-                        hasCompleted = true;
-                        return true;
-                    }
-                }
-            }
-            return false;
-        }
-
-        private void redistribute() throws SQLException {
-            //parallel lists of workerid = task queue size
-            List<Integer> workerIds = new ArrayList<>();
-            List<Integer> queueSize = new ArrayList<>();
-            int totalTasks = 0;
-
-            try (ResultSet rs = getQueueDistribution.executeQuery()) {
-                while (rs.next()) {
-                    int workerId = rs.getInt(1);
-                    int numTasks = rs.getInt(2);
-                    workerIds.add(workerId);
-                    queueSize.add(numTasks);
-                    LOG.debug("workerId: ({}) numTasks: ({})", workerId, numTasks);
-                    totalTasks += numTasks;
-                }
-            }
-            if (workerIds.size() == 0) {
-                return;
-            }
-            int averagePerWorker = Math.round((float) totalTasks / (float) workerIds.size());
-            int midPoint = Math.round((float) queueSize.size() / 2) + 1;
-            for (int i = queueSize.size() - 1, j = 0; i > midPoint && j < midPoint; i--, j++) {
-                int shortestQueue = queueSize.get(i);
-                int longestQueue = queueSize.get(j);
-                if ((shortestQueue < 5 && longestQueue > 5) ||
-                        longestQueue > 5 && longestQueue > (int) (1.5 * averagePerWorker)) {
-                    int shortestQueueWorker = workerIds.get(i);
-                    int longestQueueWorker = workerIds.get(j);
-                    reallocate.clearParameters();
-                    reallocate.setLong(1, shortestQueueWorker);
-                    reallocate.setLong(2, longestQueueWorker);
-                    reallocate.execute();
-                }
-            }
-
-        }
-
-        private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
-                throws SQLException {
-
-            if (missingWorkers.size() == 0) {
-                return;
-            }
-
-            List<Integer> activeWorkers = getActiveWorkers(connection);
-            if (activeWorkers.size() == 0) {
-                return;
-            }
-
-            for (int missing : missingWorkers) {
-                int active = activeWorkers.get(random.nextInt(activeWorkers.size()));
-                allocateNonworkersToWorkers.clearParameters();
-                allocateNonworkersToWorkers.setInt(1, active);
-                allocateNonworkersToWorkers.setInt(2, missing);
-                allocateNonworkersToWorkers.execute();
-                LOG.debug("allocating missing working ({}) to ({})", missing, active);
-            }
-        }
-
-        private List<Integer> getMissingWorkers() throws SQLException {
-            List<Integer> missingWorkers = new ArrayList<>();
-            try (ResultSet rs = findMissingWorkers.executeQuery()) {
-                while (rs.next()) {
-                    int workerId = rs.getInt(1);
-                    missingWorkers.add(workerId);
-                    LOG.debug("Worker ({}) no longer active", workerId);
-                }
-            }
-            return missingWorkers;
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
deleted file mode 100644
index d9cca42..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.tika.pipes.FetchEmitTuple;
-
-public class AsyncTask extends FetchEmitTuple {
-
-    public static final AsyncTask SHUTDOWN_SEMAPHORE =
-            new AsyncTask(-1, (short) -1, new FetchEmitTuple(null, null, null));
-    private final short retry;
-    private long taskId;
-
-    public AsyncTask(@JsonProperty("taskId") long taskId, @JsonProperty("retry") short retry,
-                     @JsonProperty("fetchEmitTuple") FetchEmitTuple fetchEmitTuple) {
-        super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(),
-                fetchEmitTuple.getMetadata());
-        this.taskId = taskId;
-        this.retry = retry;
-    }
-
-    public long getTaskId() {
-        return taskId;
-    }
-
-    public void setTaskId(long taskId) {
-        this.taskId = taskId;
-    }
-
-    public short getRetry() {
-        return retry;
-    }
-
-    @Override
-    public String toString() {
-        return "AsyncTask{" + "taskId=" + taskId + ", retry=" + retry + '}';
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
deleted file mode 100644
index ee3882b..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-
-/**
- * This controls monitoring of the AsyncWorkerProcess
- * and updates to the db on crashes etc.
- */
-public class AsyncWorker implements Callable<Integer> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncWorker.class);
-
-
-    private final String connectionString;
-    private final int workerId;
-    private final Path tikaConfigPath;
-    private final Connection connection;
-    private final PreparedStatement finished;
-    private final PreparedStatement restarting;
-    private final PreparedStatement selectActiveTasks;
-    private final PreparedStatement insertErrorLog;
-    private final PreparedStatement resetStatus;
-    //TODO: make this configurable
-    private final int maxRetries = 2;
-
-    public AsyncWorker(Connection connection, String connectionString, int workerId,
-                       Path tikaConfigPath) throws SQLException {
-        this.connectionString = connectionString;
-        this.workerId = workerId;
-        this.tikaConfigPath = tikaConfigPath;
-        this.connection = connection;
-        String sql = "update workers set status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
-                " where worker_id = (" + workerId + ")";
-        finished = connection.prepareStatement(sql);
-
-        sql = "update workers set status=" +
-                AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
-                " where worker_id = (" + workerId + ")";
-        restarting = connection.prepareStatement(sql);
-        //this checks if the process was able to reset the status
-        sql = "select id, retry, json from task_queue where worker_id=" + workerId +
-                " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
-        selectActiveTasks = connection.prepareStatement(sql);
-
-        //if not, this is called to insert into the error log
-        insertErrorLog = prepareInsertErrorLog(connection);
-
-        //and this is called to reset the status on error
-        resetStatus = prepareReset(connection);
-    }
-
-    static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
-                               PreparedStatement insertErrorLog, PreparedStatement resetStatus,
-                               Logger logger) {
-        try {
-            insertErrorLog.clearParameters();
-            insertErrorLog.setLong(1, task.getTaskId());
-            insertErrorLog.setString(2, task.getFetchKey().getFetchKey());
-            insertErrorLog.setInt(3, task.getRetry());
-            insertErrorLog.setByte(4, (byte) errorCode.ordinal());
-            insertErrorLog.execute();
-        } catch (SQLException e) {
-            logger.error("Can't update error log", e);
-        }
-
-        try {
-            resetStatus.clearParameters();
-            resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
-            resetStatus.setShort(2, (short) (task.getRetry() + 1));
-            resetStatus.setLong(3, task.getTaskId());
-            resetStatus.execute();
-        } catch (SQLException e) {
-            logger.error("Can't reset try status", e);
-        }
-    }
-
-    static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
-        //if not, this is called to insert into the error log
-        return connection.prepareStatement(
-                "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
-                        " values (?,?,CURRENT_TIMESTAMP(),?,?)");
-    }
-
-    static PreparedStatement prepareReset(Connection connection) throws SQLException {
-        //and this is called to reset the status on error
-        return connection.prepareStatement(
-                "update task_queue set " + "status=?, " + "time_stamp=CURRENT_TIMESTAMP(), " +
-                        "retry=? " + "where id=?");
-    }
-
-    @Override
-    public Integer call() throws Exception {
-        Process p = null;
-        try {
-            p = start();
-            int restarts = 0;
-            while (true) {
-                boolean finished = p.waitFor(60, TimeUnit.SECONDS);
-                if (finished) {
-                    int exitValue = p.exitValue();
-                    if (exitValue == 0) {
-                        LOG.debug("forked worker process finished with exitValue=0");
-                        return 1;
-                    }
-                    reportCrash(++restarts, exitValue);
-                    p = start();
-                }
-            }
-        } finally {
-            if (p != null) {
-                p.destroyForcibly();
-            }
-            finished.execute();
-        }
-    }
-
-    public int getMaxRetries() {
-        return maxRetries;
-    }
-
-    private Process start() throws IOException {
-        String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
-                System.getProperty("java.class.path"),
-                "org.apache.tika.pipes.async.AsyncWorkerProcess", Integer.toString(workerId)};
-        ProcessBuilder pb = new ProcessBuilder(args);
-        pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
-        pb.environment()
-                .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
-        pb.inheritIO();
-        return pb.start();
-    }
-
-    private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
-        LOG.warn("worker id={} terminated, exitValue={}", workerId, exitValue);
-        restarting.execute();
-        List<AsyncTask> activeTasks = new ArrayList<>();
-        try (ResultSet rs = selectActiveTasks.executeQuery()) {
-            long taskId = rs.getLong(1);
-            short retry = rs.getShort(2);
-            String json = rs.getString(3);
-            FetchEmitTuple tuple = JsonFetchEmitTuple.fromJson(new StringReader(json));
-            activeTasks.add(new AsyncTask(taskId, retry, tuple));
-        }
-        if (activeTasks.size() == 0) {
-            LOG.debug("worker reset active tasks, nothing extra to report");
-            return;
-        }
-
-        if (activeTasks.size() > 1) {
-            LOG.warn("more than one active task? this should never happen!");
-        }
-
-        for (AsyncTask t : activeTasks) {
-            reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE, insertErrorLog,
-                    resetStatus, LOG);
-        }
-
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
deleted file mode 100644
index 43f96df..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
-import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.EncryptedDocumentException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.sax.RecursiveParserWrapperHandler;
-import org.apache.tika.utils.StringUtils;
-
-public class AsyncWorkerProcess {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerProcess.class);
-    //make these all configurable
-    private static final long SHUTDOWN_AFTER_MS = 120000;
-    private static final long PULSE_MS = 1000;
-    private final long parseTimeoutMs = 60000;
-
-    public static void main(String[] args) throws Exception {
-        Path tikaConfigPath = Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
-        String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
-        TikaConfig tikaConfig = new TikaConfig(tikaConfigPath);
-        int workerId = Integer.parseInt(args[0]);
-        LOG.debug("trying to get connection {} >{}<", workerId, db);
-        try (Connection connection = DriverManager.getConnection(db)) {
-            AsyncWorkerProcess asyncWorker = new AsyncWorkerProcess();
-            asyncWorker.execute(connection, workerId, tikaConfig);
-        }
-        System.exit(0);
-    }
-
-    private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
-            throws SQLException {
-        //3 = worker + forkwatcher + active task
-        ExecutorService service = Executors.newFixedThreadPool(3);
-        ExecutorCompletionService<Integer> executorCompletionService =
-                new ExecutorCompletionService<>(service);
-
-        executorCompletionService
-                .submit(new Worker(connection, workerId, tikaConfig, parseTimeoutMs));
-        executorCompletionService.submit(new ForkWatcher(System.in));
-
-        int completed = 0;
-
-        //if either one stops, we need to stop
-        try {
-            while (completed < 1) {
-                Future<Integer> future = executorCompletionService.poll(60, TimeUnit.SECONDS);
-                if (future != null) {
-                    completed++;
-                    future.get();
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOG.error("worker " + workerId + " had a mainloop exception", e);
-        } finally {
-            service.shutdownNow();
-        }
-        return;
-    }
-
-    enum TASK_STATUS_CODES {
-        AVAILABLE, SELECTED, IN_PROCESS, AVAILABLE_EMIT, SELECTED_EMIT, IN_PROCESS_EMIT,
-        FAILED_EMIT, EMITTED
-    }
-
-    public enum WORKER_STATUS_CODES {
-        ACTIVE, RESTARTING, HIBERNATING, CAN_SHUTDOWN,//if there's nothing else to process, shutdown
-        SHOULD_SHUTDOWN, //shutdown now whether or not there's anything else to process
-        HAS_SHUTDOWN
-    }
-
-    enum ERROR_CODES {
-        TIMEOUT, SECURITY_EXCEPTION, OTHER_EXCEPTION, OOM, OTHER_ERROR, UNKNOWN_PARSE, MAX_RETRIES,
-        EMIT_SERIALIZATION, EMIT_SQL_INSERT_EXCEPTION, EMIT_SQL_SELECT_EXCEPTION,
-        EMIT_DESERIALIZATION, EMIT_EXCEPTION
-    }
-
-    private static class TaskQueue {
-        private final Connection connection;
-        private final int workerId;
-
-        private final PreparedStatement markForSelecting;
-        private final PreparedStatement selectForProcessing;
-        private final PreparedStatement markForProcessing;
-        private final PreparedStatement checkForShutdown;
-
-
-        TaskQueue(Connection connection, int workerId) throws SQLException {
-            this.connection = connection;
-            this.workerId = workerId;
-            //TODO -- need to update timestamp
-            String sql = "update task_queue set status=" + TASK_STATUS_CODES.SELECTED.ordinal() +
-                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id = " +
-                    " (select id from task_queue where worker_id = " + workerId + " and status=" +
-                    TASK_STATUS_CODES.AVAILABLE.ordinal() +
-                    " order by time_stamp asc limit 1 for update)";
-            markForSelecting = connection.prepareStatement(sql);
-            sql = "select id, retry, json from task_queue where status=" +
-                    TASK_STATUS_CODES.SELECTED.ordinal() + " and " + " worker_id=" + workerId +
-                    " order by time_stamp asc limit 1";
-            selectForProcessing = connection.prepareStatement(sql);
-            sql = "update task_queue set status=" + TASK_STATUS_CODES.IN_PROCESS.ordinal() +
-                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
-            markForProcessing = connection.prepareStatement(sql);
-
-            sql = "select count(1) from workers where worker_id=" + workerId + " and status=" +
-                    WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
-            checkForShutdown = connection.prepareStatement(sql);
-        }
-
-        AsyncTask poll(long pollMs) throws InterruptedException, IOException, SQLException {
-            long start = System.currentTimeMillis();
-            long elapsed = System.currentTimeMillis() - start;
-            while (elapsed < pollMs) {
-                if (shouldShutdown()) {
-                    return SHUTDOWN_SEMAPHORE;
-                }
-                int i = markForSelecting.executeUpdate();
-                if (i == 0) {
-//                   debugQueue();
-                    Thread.sleep(PULSE_MS);
-                } else {
-                    long taskId = -1;
-                    short retry = -1;
-                    String json = "";
-                    try (ResultSet rs = selectForProcessing.executeQuery()) {
-                        while (rs.next()) {
-                            taskId = rs.getLong(1);
-                            retry = rs.getShort(2);
-                            json = rs.getString(3);
-                        }
-                    }
-                    markForProcessing.clearParameters();
-                    markForProcessing.setLong(1, taskId);
-                    markForProcessing.execute();
-
-                    FetchEmitTuple t = null;
-                    try (Reader reader = new StringReader(json)) {
-                        t = JsonFetchEmitTuple.fromJson(reader);
-                    }
-                    AsyncTask task = new AsyncTask(taskId, retry, t);
-                    return task;
-                }
-                elapsed = System.currentTimeMillis() - start;
-            }
-            return null;
-        }
-
-        private void debugQueue() throws SQLException {
-            try (ResultSet rs = connection.createStatement()
-                    .executeQuery("select id, status, worker_id from task_queue limit 10")) {
-                while (rs.next()) {
-                    System.out.println(
-                            "id: " + rs.getInt(1) + " status: " + rs.getInt(2) + " worker_id: " +
-                                    rs.getInt(3));
-                }
-            }
-        }
-
-        boolean shouldShutdown() throws SQLException {
-            try (ResultSet rs = checkForShutdown.executeQuery()) {
-                if (rs.next()) {
-                    int val = rs.getInt(1);
-                    return val > 0;
-                }
-            }
-            return false;
-        }
-    }
-
-
-    private static class Worker implements Callable<Integer> {
-
-        private final Connection connection;
-        private final int workerId;
-        private final RecursiveParserWrapper parser;
-        private final TikaConfig tikaConfig;
-        private final long parseTimeoutMs;
-        private final PreparedStatement insertErrorLog;
-        private final PreparedStatement resetStatus;
-        private final PreparedStatement insertEmitData;
-        private final PreparedStatement updateStatusForEmit;
-        private final ObjectMapper objectMapper = new ObjectMapper();
-        LZ4Factory factory = LZ4Factory.fastestInstance();
-        private final ExecutorService executorService;
-        private final ExecutorCompletionService<AsyncData> executorCompletionService;
-
-        public Worker(Connection connection, int workerId, TikaConfig tikaConfig,
-                      long parseTimeoutMs) throws SQLException {
-            this.connection = connection;
-            this.workerId = workerId;
-            this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
-            this.tikaConfig = tikaConfig;
-            this.executorService = Executors.newFixedThreadPool(1);
-            this.executorCompletionService = new ExecutorCompletionService<>(executorService);
-            this.parseTimeoutMs = parseTimeoutMs;
-
-            SimpleModule module = new SimpleModule();
-            module.addSerializer(Metadata.class, new JsonMetadataSerializer());
-            objectMapper.registerModule(module);
-            String sql = "merge into workers key (worker_id) " + "values (" + workerId + ", " +
-                    WORKER_STATUS_CODES.ACTIVE.ordinal() + ")";
-            connection.createStatement().execute(sql);
-            insertErrorLog = prepareInsertErrorLog(connection);
-            resetStatus = prepareReset(connection);
-            insertEmitData = prepareInsertEmitData(connection);
-            sql = "update task_queue set status=" + TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
-                    ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
-            updateStatusForEmit = connection.prepareStatement(sql);
-        }
-
-        static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
-            return connection.prepareStatement(
-                    "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
-                            " values (?,CURRENT_TIMESTAMP(),?,?)");
-        }
-
-        public Integer call() throws Exception {
-            AsyncTask task = null;
-            try {
-
-                TaskQueue queue = new TaskQueue(connection, workerId);
-
-                long lastProcessed = System.currentTimeMillis();
-
-                while (true) {
-
-                    task = queue.poll(1000);
-                    if (task == null) {
-                        long elapsed = System.currentTimeMillis() - lastProcessed;
-                        if (elapsed > SHUTDOWN_AFTER_MS) {
-                            LOG.debug("shutting down after no assignments in {}ms", elapsed);
-                            return 1;
-                        }
-                    } else if (task == SHUTDOWN_SEMAPHORE) {
-                        break;
-                    } else {
-                        processTask(task);
-                        lastProcessed = System.currentTimeMillis();
-                    }
-                }
-            } catch (TimeoutException e) {
-                LOG.warn(task.getFetchKey().getFetchKey(), e);
-                reportAndReset(task, ERROR_CODES.TIMEOUT, insertErrorLog, resetStatus, LOG);
-            } catch (SecurityException e) {
-                LOG.warn(task.getFetchKey().getFetchKey(), e);
-                reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION, insertErrorLog, resetStatus,
-                        LOG);
-            } catch (Exception e) {
-                e.printStackTrace();
-                LOG.warn(task.getFetchKey().getFetchKey(), e);
-                reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION, insertErrorLog, resetStatus, LOG);
-            } catch (OutOfMemoryError e) {
-                LOG.warn(task.getFetchKey().getFetchKey(), e);
-                reportAndReset(task, ERROR_CODES.OOM, insertErrorLog, resetStatus, LOG);
-            } catch (Error e) {
-                LOG.warn(task.getFetchKey().getFetchKey(), e);
-                reportAndReset(task, ERROR_CODES.OTHER_ERROR, insertErrorLog, resetStatus, LOG);
-            } finally {
-                executorService.shutdownNow();
-                return 1;
-            }
-        }
-
-        private void processTask(AsyncTask task) throws Exception {
-
-            if (task == SHUTDOWN_SEMAPHORE) {
-                LOG.debug("received shutdown notification");
-                return;
-            } else {
-                executorCompletionService
-                        .submit(new TaskProcessor(task, tikaConfig, parser, workerId));
-                Future<AsyncData> future =
-                        executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
-                if (future == null) {
-                    handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
-                } else {
-                    AsyncData asyncData = future.get(1000, TimeUnit.MILLISECONDS);
-                    if (asyncData == null) {
-                        handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
-                    }
-                    boolean shouldEmit = checkForParseException(asyncData);
-                    if (shouldEmit) {
-                        try {
-                            emit(asyncData);
-                        } catch (JsonProcessingException e) {
-                            e.printStackTrace();
-                            recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
-                                    ERROR_CODES.EMIT_SERIALIZATION.ordinal());
-                        } catch (SQLException e) {
-                            e.printStackTrace();
-                            recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
-                                    ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
-                        }
-                    }
-                }
-            }
-        }
-
-        private void recordBadEmit(long taskId, String key, int ordinal) {
-            //stub
-        }
-
-        private void emit(AsyncData asyncData) throws SQLException, JsonProcessingException {
-            insertEmitData.clearParameters();
-            insertEmitData.setLong(1, asyncData.getTaskId());
-            byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
-            byte[] compressed = factory.fastCompressor().compress(bytes);
-            insertEmitData.setLong(2, bytes.length);
-            insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
-            insertEmitData.execute();
-            updateStatusForEmit.clearParameters();
-            updateStatusForEmit.setLong(1, asyncData.getTaskId());
-            updateStatusForEmit.execute();
-        }
-
-        private void handleTimeout(long taskId, String key) throws TimeoutException {
-            LOG.warn("timeout taskid:{} fetchKey:{}", taskId, key);
-            throw new TimeoutException(key);
-        }
-
-        private boolean checkForParseException(AsyncData asyncData) {
-            if (asyncData == null || asyncData.getMetadataList() == null ||
-                    asyncData.getMetadataList().size() == 0) {
-                LOG.warn("empty or null emit data ({})", asyncData.getFetchKey().getFetchKey());
-                return false;
-            }
-            boolean shouldEmit = true;
-            Metadata container = asyncData.getMetadataList().get(0);
-            String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
-            if (stack != null) {
-                LOG.warn("fetchKey ({}) container parse exception ({})",
-                        asyncData.getFetchKey().getFetchKey(), stack);
-                if (asyncData.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
-                    shouldEmit = false;
-                }
-            }
-
-            for (int i = 1; i < asyncData.getMetadataList().size(); i++) {
-                Metadata m = asyncData.getMetadataList().get(i);
-                String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
-                if (embeddedStack != null) {
-                    LOG.warn("fetchKey ({}) embedded parse exception ({})",
-                            asyncData.getFetchKey().getFetchKey(), embeddedStack);
-                }
-            }
-            return shouldEmit;
-        }
-    }
-
-    private static class TaskProcessor implements Callable<AsyncData> {
-
-        private final AsyncTask task;
-        private final Parser parser;
-        private final TikaConfig tikaConfig;
-        private final int workerId;
-
-        public TaskProcessor(AsyncTask task, TikaConfig tikaConfig, Parser parser, int workerId) {
-            this.task = task;
-            this.parser = parser;
-            this.tikaConfig = tikaConfig;
-            this.workerId = workerId;
-        }
-
-        public AsyncData call() throws Exception {
-            Metadata userMetadata = task.getMetadata();
-            Metadata metadata = new Metadata();
-            String fetcherName = task.getFetchKey().getFetcherName();
-            String fetchKey = task.getFetchKey().getFetchKey();
-            List<Metadata> metadataList = null;
-            try (InputStream stream = tikaConfig.getFetcherManager().getFetcher(fetcherName)
-                    .fetch(fetchKey, metadata)) {
-                metadataList = parseMetadata(task.getFetchKey(), stream, metadata);
-            } catch (SecurityException e) {
-                throw e;
-            }
-            injectUserMetadata(userMetadata, metadataList);
-            EmitKey emitKey = task.getEmitKey();
-            if (StringUtils.isBlank(emitKey.getEmitKey())) {
-                emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
-                task.setEmitKey(emitKey);
-            }
-            return new AsyncData(task.getTaskId(), task.getFetchKey(), task.getEmitKey(),
-                    task.getOnParseException(), metadataList);
-        }
-
-        private List<Metadata> parseMetadata(FetchKey fetchKey, InputStream stream,
-                                             Metadata metadata) {
-            //make these configurable
-            BasicContentHandlerFactory.HANDLER_TYPE type =
-                    BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
-            int writeLimit = -1;
-            int maxEmbeddedResources = 1000;
-
-            RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
-                    new BasicContentHandlerFactory(type, writeLimit), maxEmbeddedResources,
-                    tikaConfig.getMetadataFilter());
-            ParseContext parseContext = new ParseContext();
-            try {
-                parser.parse(stream, handler, metadata, parseContext);
-            } catch (SAXException e) {
-                LOG.warn("problem:" + fetchKey.getFetchKey(), e);
-            } catch (EncryptedDocumentException e) {
-                LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
-            } catch (SecurityException e) {
-                LOG.warn("security exception: " + fetchKey.getFetchKey());
-                throw e;
-            } catch (Exception e) {
-                LOG.warn("exception: " + fetchKey.getFetchKey());
-            } catch (OutOfMemoryError e) {
-                LOG.error("oom: " + fetchKey.getFetchKey());
-                throw e;
-            }
-            return handler.getMetadataList();
-        }
-
-        private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
-            for (String n : userMetadata.names()) {
-                //overwrite whatever was there
-                metadataList.get(0).set(n, null);
-                for (String val : userMetadata.getValues(n)) {
-                    metadataList.get(0).add(n, val);
-                }
-            }
-        }
-    }
-
-    private static class ForkWatcher implements Callable<Integer> {
-        private final InputStream in;
-
-        public ForkWatcher(InputStream in) {
-            this.in = in;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            //this should block forever
-            //if the forking process dies,
-            // this will either throw an IOException or read -1.
-            int i = in.read();
-            LOG.info("forking process notified forked to shutdown");
-            return 1;
-        }
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
deleted file mode 100644
index 43553a4..0000000
--- a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#info,debug, error,fatal ...
-log4j.rootLogger=debug,stderr
-#console
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
deleted file mode 100644
index 5576f73..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-import org.apache.tika.config.Field;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-
-public class MockEmitter implements Initializable, Emitter {
-    private final ObjectMapper objectMapper = new ObjectMapper();
-    private Connection connection;
-    private String jdbc;
-    private PreparedStatement insert;
-
-    public MockEmitter() {
-        SimpleModule module = new SimpleModule();
-        module.addSerializer(Metadata.class, new JsonMetadataSerializer());
-        objectMapper.registerModule(module);
-    }
-
-    @Field
-    public void setJdbc(String jdbc) {
-        this.jdbc = jdbc;
-    }
-
-    @Override
-    public String getName() {
-        return "mock";
-    }
-
-    @Override
-    public void emit(String emitKey, List<Metadata> metadataList)
-            throws IOException, TikaEmitterException {
-        emit(Collections
-                .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
-    }
-
-    @Override
-    public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
-        for (EmitData d : emitData) {
-            String json = objectMapper.writeValueAsString(d);
-            try {
-                insert.clearParameters();
-                insert.setString(1, d.getEmitKey().getEmitKey());
-                insert.setString(2, json);
-                insert.execute();
-            } catch (SQLException e) {
-                throw new TikaEmitterException("problem inserting", e);
-            }
-        }
-    }
-
-    @Override
-    public void initialize(Map<String, Param> params) throws TikaConfigException {
-        try {
-            connection = DriverManager.getConnection(jdbc);
-            String sql = "insert into emitted (emitkey, json) values (?, ?)";
-            insert = connection.prepareStatement(sql);
-        } catch (SQLException e) {
-            throw new TikaConfigException("problem w connection", e);
-        }
-    }
-
-    @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler)
-            throws TikaConfigException {
-
-    }
-}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
deleted file mode 100644
index ebbd8f0..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import static org.junit.Assert.assertEquals;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.junit.Test;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
-
-public class SerializationTest {
-
-    @Test
-    public void testBasic() throws Exception {
-        String json = "{\"taskId\":49,\"fetchKey\":{\"fetcherName\":\"mock\"," +
-                "\"fetchKey\":\"key-48\"},\"emitKey\":{\"emitterName\":\"mock\"," +
-                "\"emitKey\":\"emit-48\"},\"onParseException\":\"EMIT\",\"metadataList\":" +
-                "[{\"X-TIKA:Parsed-By\":" +
-                "\"org.apache.tika.parser.EmptyParser\",\"X-TIKA:parse_time_millis\":" +
-                "\"0\",\"X-TIKA:embedded_depth\":\"0\"}]}";
-
-        ObjectMapper mapper = new ObjectMapper();
-        SimpleModule module = new SimpleModule();
-        module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
-        mapper.registerModule(module);
-        AsyncData asyncData = mapper.readValue(json, AsyncData.class);
-        assertEquals(49, asyncData.getTaskId());
-        assertEquals("mock", asyncData.getFetchKey().getFetcherName());
-        assertEquals(1, asyncData.getMetadataList().size());
-    }
-
-}
diff --git a/tika-server/tika-server-core/pom.xml b/tika-server/tika-server-core/pom.xml
index 3ed9dd6..5b60a59 100644
--- a/tika-server/tika-server-core/pom.xml
+++ b/tika-server/tika-server-core/pom.xml
@@ -36,7 +36,6 @@
   </pluginRepositories>
 
   <dependencies>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-translate</artifactId>
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index e5a4921..4081102 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -141,9 +141,10 @@ public class TikaServerCli {
         return ret.toArray(new String[0]);
     }
 
-    public static void noFork(TikaServerConfig tikaServerConfig) {
+    public static void noFork(TikaServerConfig tikaServerConfig) throws Exception {
         List<String> args = tikaServerConfig
                 .getForkedProcessArgs(tikaServerConfig.getPort(), tikaServerConfig.getIdBase());
+        args.add("-noFork");
         TikaServerProcess.main(args.toArray(new String[0]));
     }
 
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
index e12dd37..1cba4cc 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
@@ -130,7 +130,7 @@ public class TikaServerConfig {
         TikaServerConfig config = null;
         Set<String> settings = new HashSet<>();
         if (commandLine.hasOption("c")) {
-            config = load(Paths.get(commandLine.getOptionValue("c")), settings);
+            config = load(Paths.get(commandLine.getOptionValue("c")), commandLine, settings);
             config.setConfigPath(commandLine.getOptionValue("c"));
         } else {
             config = new TikaServerConfig();
@@ -175,13 +175,14 @@ public class TikaServerConfig {
         return config;
     }
 
-    static TikaServerConfig load(Path p, Set<String> settings) throws IOException, TikaException {
+    static TikaServerConfig load(Path p, CommandLine commandLine, Set<String> settings) throws IOException,
+            TikaException {
         try (InputStream is = Files.newInputStream(p)) {
-            return TikaServerConfig.load(is, settings);
+            return TikaServerConfig.load(is, commandLine, settings);
         }
     }
 
-    static TikaServerConfig load(InputStream is, Set<String> settings)
+    static TikaServerConfig load(InputStream is, CommandLine commandLine, Set<String> settings)
             throws IOException, TikaException {
         Node properties = null;
         try {
@@ -201,6 +202,10 @@ public class TikaServerConfig {
                 loadServerConfig(child, config, settings);
             }
         }
+        //override a few things in config file
+        if (commandLine.hasOption("noFork")) {
+            config.setNoFork(true);
+        }
         return config;
     }
 
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 390d731..6f30eac 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -27,12 +27,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -57,10 +51,6 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.digestutils.BouncyCastleDigester;
 import org.apache.tika.parser.digestutils.CommonsDigester;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.server.core.resource.AsyncEmitter;
-import org.apache.tika.server.core.resource.AsyncParser;
 import org.apache.tika.server.core.resource.AsyncResource;
 import org.apache.tika.server.core.resource.DetectorResource;
 import org.apache.tika.server.core.resource.EmitterResource;
@@ -102,6 +92,7 @@ public class TikaServerProcess {
         options.addOption("i", "id", true,
                 "id to use for server in server status endpoint");
         options.addOption("?", "help", false, "this help message");
+        options.addOption("noFork", false, "if launched in no fork mode");
         options.addOption("forkedStatusFile", true,
                 "Not allowed in -noFork: temporary file used to communicate " +
                         "with forking process -- do not use this! " +
@@ -114,44 +105,40 @@ public class TikaServerProcess {
         return options;
     }
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
         LOG.info("Starting {} server", new Tika());
+        AsyncResource asyncResource = null;
         try {
             Options options = getOptions();
             CommandLineParser cliParser = new DefaultParser();
             CommandLine line = cliParser.parse(options, args);
             TikaServerConfig tikaServerConfig = TikaServerConfig.load(line);
             LOG.debug("forked config: {}", tikaServerConfig);
-            mainLoop(tikaServerConfig);
+            if (tikaServerConfig.isEnableUnsecureFeatures()) {
+                final AsyncResource localAsyncResource =
+                        new AsyncResource(tikaServerConfig.getConfigPath());
+                Runtime.getRuntime().addShutdownHook(new Thread() { public void run() {
+                        try {
+                            localAsyncResource.shutdownNow();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    } } );
+                asyncResource = localAsyncResource;
+            }
+
+
+            ServerDetails serverDetails = initServer(tikaServerConfig, asyncResource);
+            startServer(serverDetails);
+
         } catch (Exception e) {
             LOG.error("Can't start: ", e);
             System.exit(-1);
         }
     }
 
-    private static void mainLoop(TikaServerConfig tikaServerConfig) throws Exception {
-        AsyncResource asyncResource = null;
-        ArrayBlockingQueue<FetchEmitTuple> asyncFetchEmitQueue = null;
-        ArrayBlockingQueue<EmitData> asyncEmitData = null;
-        int numAsyncParserThreads = 10;
-        if (tikaServerConfig.isEnableUnsecureFeatures()) {
-            asyncResource = new AsyncResource();
-            asyncFetchEmitQueue = asyncResource.getFetchEmitQueue(10000);
-            asyncEmitData = asyncResource.getEmitDataQueue(1000);
-        }
-
-        ServerDetails serverDetails = initServer(tikaServerConfig, asyncResource);
-        ExecutorService executorService = Executors.newFixedThreadPool(numAsyncParserThreads + 1);
-        ExecutorCompletionService<Integer> executorCompletionService =
-                new ExecutorCompletionService<>(executorService);
+    private static void startServer(ServerDetails serverDetails) throws Exception {
 
-        if (asyncFetchEmitQueue != null) {
-            executorCompletionService.submit(new AsyncEmitter(asyncEmitData));
-            for (int i = 0; i < numAsyncParserThreads; i++) {
-                executorCompletionService
-                        .submit(new AsyncParser(asyncFetchEmitQueue, asyncEmitData));
-            }
-        }
         try {
             //start the server
             Server server = serverDetails.sf.create();
@@ -159,15 +146,7 @@ public class TikaServerProcess {
             LOG.warn("exception starting server", e);
             System.exit(DO_NOT_RESTART_EXIT_VALUE);
         }
-
         LOG.info("Started Apache Tika server {} at {}", serverDetails.serverId, serverDetails.url);
-
-        while (true) {
-            Future<Integer> future = executorCompletionService.poll(1, TimeUnit.MINUTES);
-            if (future != null) {
-                LOG.warn("Daemon should not stop: " + future.get());
-            }
-        }
     }
 
     //This returns the server, configured and ready to be started.
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
deleted file mode 100644
index 773297d..0000000
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.server.core.resource;
-
-import java.io.InputStream;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.MultivaluedHashMap;
-
-import org.apache.cxf.jaxrs.impl.UriInfoImpl;
-import org.apache.cxf.message.MessageImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.utils.StringUtils;
-
-/**
- * Worker thread that takes {@link FetchEmitTuple} off the queue, parses
- * the file and puts the {@link EmitData} on the emitDataQueue for the {@link AsyncEmitter}.
- */
-public class AsyncParser implements Callable<Integer> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncParser.class);
-
-    private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTupleQueue;
-    private final ArrayBlockingQueue<EmitData> emitDataQueue;
-
-    public AsyncParser(ArrayBlockingQueue<FetchEmitTuple> queue,
-                       ArrayBlockingQueue<EmitData> emitData) {
-        this.fetchEmitTupleQueue = queue;
-        this.emitDataQueue = emitData;
-    }
-
-    @Override
-    public Integer call() throws Exception {
-        while (true) {
-            FetchEmitTuple request = fetchEmitTupleQueue.poll(1, TimeUnit.MINUTES);
-            if (request != null) {
-                EmitData emitData = processTuple(request);
-                boolean shouldEmit = checkForParseException(request, emitData);
-                if (shouldEmit) {
-                    boolean offered = emitDataQueue.offer(emitData, 10, TimeUnit.MINUTES);
-                    if (!offered) {
-                        //TODO: deal with this
-                        LOG.warn("Failed to add ({}) " + "to emit queue after 10 minutes.",
-                                request.getFetchKey().getFetchKey());
-                    }
-                }
-            } else {
-                LOG.trace("Nothing on the async queue");
-            }
-        }
-    }
-
-    private boolean checkForParseException(FetchEmitTuple request, EmitData emitData) {
-        if (emitData == null || emitData.getMetadataList() == null ||
-                emitData.getMetadataList().size() == 0) {
-            LOG.warn("empty or null emit data ({})", request.getFetchKey().getFetchKey());
-            return false;
-        }
-        boolean shouldEmit = true;
-        Metadata container = emitData.getMetadataList().get(0);
-        String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
-        if (stack != null) {
-            LOG.warn("fetchKey ({}) container parse exception ({})",
-                    request.getFetchKey().getFetchKey(),
-                    stack);
-            if (request.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
-                shouldEmit = false;
-            }
-        }
-
-        for (int i = 1; i < emitData.getMetadataList().size(); i++) {
-            Metadata m = emitData.getMetadataList().get(i);
-            String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
-            if (embeddedStack != null) {
-                LOG.warn("fetchKey ({}) embedded parse exception ({})",
-                        request.getFetchKey().getFetchKey(), embeddedStack);
-            }
-        }
-        return shouldEmit;
-    }
-
-    private EmitData processTuple(FetchEmitTuple t) {
-        Metadata userMetadata = t.getMetadata();
-        Metadata metadata = new Metadata();
-        String fetcherName = t.getFetchKey().getFetcherName();
-        String fetchKey = t.getFetchKey().getFetchKey();
-        List<Metadata> metadataList = null;
-        try (InputStream stream = TikaResource.getConfig().getFetcherManager()
-                .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
-            metadataList = RecursiveMetadataResource
-                    .parseMetadata(stream, metadata, new MultivaluedHashMap<>(),
-                            new UriInfoImpl(new MessageImpl()), t.getHandlerConfig());
-        } catch (SecurityException e) {
-            throw e;
-        } catch (Exception e) {
-            LOG.warn(t.toString(), e);
-        }
-
-        injectUserMetadata(userMetadata, metadataList);
-        EmitKey emitKey = t.getEmitKey();
-        if (StringUtils.isBlank(emitKey.getEmitKey())) {
-            emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
-        }
-        return new EmitData(emitKey, metadataList);
-    }
-
-    private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
-        for (String n : userMetadata.names()) {
-            //overwrite whatever was there
-            metadataList.get(0).set(n, null);
-            for (String val : userMetadata.getValues(n)) {
-                metadataList.get(0).add(n, val);
-            }
-        }
-    }
-}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index dea21c4..023291f 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -23,13 +23,9 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
 import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -40,11 +36,14 @@ import javax.ws.rs.core.UriInfo;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
 
+import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
 import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.async.AsyncProcessor;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.EmitterManager;
@@ -58,8 +57,14 @@ public class AsyncResource {
 
     private static final int DEFAULT_FETCH_EMIT_QUEUE_SIZE = 10000;
     long maxQueuePauseMs = 60000;
+    private final AsyncProcessor asyncProcessor;
     private ArrayBlockingQueue<FetchEmitTuple> queue;
 
+    public AsyncResource(java.nio.file.Path tikaConfigPath)
+            throws TikaException, IOException, SAXException {
+        this.asyncProcessor = new AsyncProcessor(tikaConfigPath);
+    }
+
     public ArrayBlockingQueue<FetchEmitTuple> getFetchEmitQueue(int queueSize) {
         this.queue = new ArrayBlockingQueue<>(queueSize);
         return queue;
@@ -106,26 +111,12 @@ public class AsyncResource {
             }
         }
         Instant start = Instant.now();
-        long elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
-        List<FetchEmitTuple> notAdded = new ArrayList<>();
-        int addedCount = 0;
-        for (FetchEmitTuple t : request.getTuples()) {
-            boolean offered = false;
-            while (!offered && elapsed < maxQueuePauseMs) {
-                offered = queue.offer(t, 10, TimeUnit.MILLISECONDS);
-                elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
-            }
-            if (!offered) {
-                notAdded.add(t);
-            } else {
-                addedCount++;
-            }
+        boolean offered = asyncProcessor.offer(request.getTuples(), maxQueuePauseMs);
+        if (offered) {
+            return ok(request.getTuples().size());
+        } else {
+            return throttle(request.getTuples().size());
         }
-
-        if (notAdded.size() > 0) {
-            return throttle(notAdded, addedCount);
-        }
-        return ok(request.getTuples().size());
     }
 
     private Map<String, Object> ok(int size) {
@@ -135,16 +126,10 @@ public class AsyncResource {
         return map;
     }
 
-    private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
-        List<String> fetchKeys = new ArrayList<>();
-        for (FetchEmitTuple t : notAdded) {
-            fetchKeys.add(t.getFetchKey().getFetchKey());
-        }
+    private Map<String, Object> throttle(int requestSize) {
         Map<String, Object> map = new HashMap<>();
         map.put("status", "throttled");
-        map.put("added", added);
-        map.put("skipped", notAdded.size());
-        map.put("skippedFetchKeys", fetchKeys);
+        map.put("msg", "not able to receive request of size " + requestSize + " at this time");
         return map;
     }
 
@@ -162,4 +147,8 @@ public class AsyncResource {
         }
     }
 
+    public void shutdownNow() throws Exception {
+        asyncProcessor.close();
+    }
+
 }
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 44547c5..540e25f 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.tika.server.core;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -53,7 +54,7 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
 
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
-    private static final int NUM_FILES = 1000;
+    private static final int NUM_FILES = 450;
     private static final String EMITTER_NAME = "fse";
     private static final String FETCHER_NAME = "fsf";
     private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION =
@@ -63,7 +64,9 @@ public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
     private static String TIKA_CONFIG_XML;
     private static Path TIKA_CONFIG;
     private static List<String> FILE_LIST = new ArrayList<>();
-    private static String[] FILES = new String[]{"hello_world.xml", "null_pointer.xml"
+    private static String[] FILES = new String[]{
+            "hello_world.xml",
+            "null_pointer.xml"
             // "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
     };
 
@@ -131,21 +134,31 @@ public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
         Thread serverThread = new Thread() {
             @Override
             public void run() {
-                TikaServerCli.main(new String[]{"-p", INTEGRATION_TEST_PORT, "-config",
+                TikaServerCli.main(new String[]{
+                        //for debugging/development, use no fork; otherwise go with the default
+                        //"-noFork",
+                        "-p", INTEGRATION_TEST_PORT, "-config",
                         TIKA_CONFIG.toAbsolutePath().toString()});
             }
         };
         serverThread.start();
 
         try {
+            long start = System.currentTimeMillis();
+
             JsonNode response = sendAsync(FILE_LIST);
+            String status = response.get("status").asText();
+            if (! "ok".equals(status)) {
+                fail("bad status: '" + status + "' -> " + response.toPrettyString());
+            }
             int expected = (ON_PARSE_EXCEPTION == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
                     FILE_LIST.size() : FILE_LIST.size() / 2;
             int targets = 0;
             while (targets < FILE_LIST.size()) {
                 targets = countTargets();
-                Thread.sleep(1000);
+                Thread.sleep(100);
             }
+            System.out.println("elapsed : " + (System.currentTimeMillis() - start));
         } finally {
             serverThread.interrupt();
         }
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
index 2cb2a2c..a67788d 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
@@ -22,6 +22,10 @@ import static org.junit.Assert.assertTrue;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
 import org.junit.Test;
 
 import org.apache.tika.config.TikaConfigTest;
@@ -31,8 +35,11 @@ public class TikaServerConfigTest {
     @Test
     public void testBasic() throws Exception {
         Set<String> settings = new HashSet<>();
+        CommandLineParser parser = new DefaultParser();
+        CommandLine emptyCommandLine = parser.parse(new Options(), new String[]{});
         TikaServerConfig config = TikaServerConfig
                 .load(TikaConfigTest.class.getResourceAsStream("/configs/tika-config-server.xml"),
+                        emptyCommandLine,
                         settings);
         assertEquals(-1, config.getMaxRestarts());
         assertEquals(54321, config.getTaskTimeoutMillis());