You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/02/02 22:44:47 UTC

[tika] branch TIKA-3288 updated: TIKA-3288 -- tests pass. More work remains with unit tests and documentation, but I think we're good.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-3288 by this push:
     new 24a37e8  TIKA-3288 -- tests pass.  More work remains with unit tests and documentation, but I think we're good.
24a37e8 is described below

commit 24a37e8eb4fcd8ed377f90c08ea91f408e6fdfe9
Author: tballison <ta...@apache.org>
AuthorDate: Tue Feb 2 17:44:33 2021 -0500

    TIKA-3288 -- tests pass.  More work remains with unit tests and documentation, but I think we're good.
---
 tika-serialization/pom.xml                         |   1 +
 .../tika/metadata/serialization/JsonEmitData.java  |  45 ++++++
 .../metadata/serialization/JsonFetchEmitTuple.java |  45 ++++--
 .../serialization/JsonFetchEmitTupleList.java      |  68 +++++++++
 .../tika/metadata/serialization/JsonMetadata.java  |   2 +-
 .../metadata/serialization/JsonMetadataList.java   |   2 +-
 .../serialization/JsonFetchEmitTupleListTest.java  |  62 ++++++++
 .../serialization/JsonMetadataListTest.java        |   4 +-
 tika-server/tika-server-client/pom.xml             |  10 +-
 .../org/apache/tika/server/client/TikaClient.java  |  55 ++-----
 .../apache/tika/server/client/TikaClientCLI.java   |  39 ++++-
 .../apache/tika/server/client/TikaHttpClient.java  |  38 +++--
 .../apache/tika/server/core/TikaServerProcess.java |  50 +++---
 .../tika/server/core/resource/AsyncEmitter.java    | 170 ++++++++++++---------
 .../tika/server/core/resource/AsyncParser.java     | 108 +++++++++++++
 .../tika/server/core/resource/AsyncRequest.java    |  24 ++-
 .../tika/server/core/resource/AsyncResource.java   |  94 +++++++++---
 .../server/core/resource/MetadataResource.java     |   2 +-
 .../core/resource/RecursiveMetadataResource.java   |   4 +-
 .../tika/server/core/resource/TikaResource.java    |  13 +-
 .../server/core/resource/UnpackerResource.java     |   2 +-
 ...st.java => TikaServerAsyncIntegrationTest.java} | 158 +++++++------------
 .../core/TikaServerEmitterIntegrationTest.java     |  90 +++++------
 23 files changed, 711 insertions(+), 375 deletions(-)

diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index a8a7e09..64e3aa2 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -48,6 +48,7 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-core</artifactId>
       <version>${project.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
new file mode 100644
index 0000000..21b6377
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+
+import java.io.IOException;
+import java.io.Writer;
+
+public class JsonEmitData {
+
+    public static void toJson(EmitData emitData, Writer writer) throws IOException {
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartObject();
+            EmitKey key = emitData.getEmitKey();
+            jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITTER, key.getEmitterName());
+            jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITKEY, key.getKey());
+            jsonGenerator.writeFieldName("data");
+            jsonGenerator.writeStartArray();
+            for (Metadata m : emitData.getMetadataList()) {
+                JsonMetadata.writeMetadataObject(m, jsonGenerator, false);
+            }
+            jsonGenerator.writeEndArray();
+            jsonGenerator.writeEndObject();
+        }
+    }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 2dfcd3b..345873f 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -28,14 +28,15 @@ import org.apache.tika.utils.StringUtils;
 
 import java.io.IOException;
 import java.io.Reader;
+import java.io.StringWriter;
 import java.io.Writer;
 
 public class JsonFetchEmitTuple {
 
     private static final String FETCHER = "fetcher";
     private static final String FETCHKEY = "fetchKey";
-    private static final String EMITTER = "emitter";
-    private static final String EMITKEY = "emitKey";
+    public static final String EMITTER = "emitter";
+    public static final String EMITKEY = "emitKey";
     private static final String METADATAKEY = "metadata";
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
@@ -48,8 +49,11 @@ public class JsonFetchEmitTuple {
     }
 
 
-    private static FetchEmitTuple parseFetchEmitTuple(JsonParser jParser) throws IOException {
+    static FetchEmitTuple parseFetchEmitTuple(JsonParser jParser) throws IOException {
         JsonToken token = jParser.nextToken();
+        if (token == JsonToken.START_OBJECT) {
+            token = jParser.nextToken();
+        }
         String fetcherName = null;
         String fetchKey = null;
         String emitterName = null;
@@ -92,21 +96,32 @@ public class JsonFetchEmitTuple {
         return jParser.getValueAsString();
     }
 
+    public static String toJson(FetchEmitTuple t) throws IOException {
+        StringWriter writer = new StringWriter();
+        toJson(t, writer);
+        return writer.toString();
+    }
+
     public static void toJson(FetchEmitTuple t, Writer writer) throws IOException {
 
         try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
-            jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
-            jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
-            if (!StringUtils.isBlank(t.getEmitKey().getKey())) {
-                jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getKey());
-            }
-            if (t.getMetadata().size() > 0) {
-                jsonGenerator.writeFieldName(METADATAKEY);
-                JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator, false);
-            }
-            jsonGenerator.writeEndObject();
+            writeTuple(t, jsonGenerator);
         }
     }
+
+    static void writeTuple(FetchEmitTuple t, JsonGenerator jsonGenerator) throws IOException {
+        jsonGenerator.writeStartObject();
+        jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
+        jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
+        jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
+        if (!StringUtils.isBlank(t.getEmitKey().getKey())) {
+            jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getKey());
+        }
+        if (t.getMetadata().size() > 0) {
+            jsonGenerator.writeFieldName(METADATAKEY);
+            JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator, false);
+        }
+        jsonGenerator.writeEndObject();
+
+    }
 }
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
new file mode 100644
index 0000000..4791aee
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.StringUtils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JsonFetchEmitTupleList {
+
+    public static List<FetchEmitTuple> fromJson(Reader reader) throws IOException {
+        JsonParser jParser = new JsonFactory().createParser(reader);
+        JsonToken token = jParser.nextToken();
+        if (token != JsonToken.START_ARRAY) {
+            throw new IOException("require start array, but see: "+token.name());
+        }
+        List<FetchEmitTuple> list = new ArrayList<>();
+        while (token != JsonToken.END_ARRAY) {
+            list.add(JsonFetchEmitTuple.parseFetchEmitTuple(jParser));
+            token = jParser.nextToken();
+        }
+        return list;
+    }
+
+    public static String toJson(List<FetchEmitTuple> list) throws IOException {
+        StringWriter writer = new StringWriter();
+        toJson(list, writer);
+        return writer.toString();
+    }
+
+    public static void toJson(List<FetchEmitTuple> list, Writer writer) throws IOException {
+
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartArray();
+            for (FetchEmitTuple t : list) {
+                JsonFetchEmitTuple.writeTuple(t, jsonGenerator);
+            }
+            jsonGenerator.writeEndArray();
+        }
+    }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index 63de2a5..b33c2a9 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -82,7 +82,7 @@ public class JsonMetadata {
      *
      * @param reader reader to read from
      * @return Metadata or null if nothing could be read from the reader
-     * @throws TikaException in case of parse failure by Gson or IO failure with Reader
+     * @throws IOException in case of parse failure or IO failure with Reader
      */
     public static Metadata fromJson(Reader reader) throws IOException {
         Metadata m = null;
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
index 9aa9be3..85f444e 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
@@ -65,7 +65,7 @@ public class JsonMetadataList {
      *
      * @param reader
      * @return Metadata or null if nothing could be read from the reader
-     * @throws org.apache.tika.exception.TikaException in case of parse failure by Gson or IO failure with Reader
+     * @throws IOException in case of parse failure or IO failure with Reader
      */
     public static List<Metadata> fromJson(Reader reader) throws IOException {
         List<Metadata> ms = null;
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
new file mode 100644
index 0000000..80c37f4
--- /dev/null
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.junit.Test;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class JsonFetchEmitTupleListTest {
+
+    @Test
+    public void testBasic() throws Exception {
+        List<FetchEmitTuple> list = new ArrayList<>();
+        for (int i = 0; i < 57; i++) {
+            list.add(getFetchEmitTuple(i));
+        }
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTupleList.toJson(list, writer);
+
+        Reader reader = new StringReader(writer.toString());
+        List<FetchEmitTuple> deserialized = JsonFetchEmitTupleList.fromJson(reader);
+        assertEquals(list, deserialized);
+    }
+
+    private FetchEmitTuple getFetchEmitTuple(int i) {
+        Metadata m = new Metadata();
+        m.add("m1", "v1-"+i);
+        m.add("m1", "v1-"+i);
+        m.add("m2", "v2-"+i);
+        m.add("m2", "v3-"+i);
+        m.add("m3", "v4-"+i);
+
+        return new FetchEmitTuple(
+                new FetchKey("fetcher-"+i, "fetchkey-"+i),
+                new EmitKey("emitter-"+i, "emitKey-"+i),
+                m);
+    }
+}
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
index 768cd35..543bbcd 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
@@ -93,7 +93,7 @@ public class JsonMetadataListTest {
     @Test
     public void testPrettyPrint() throws Exception {
         Metadata m1 = new Metadata();
-        m1.add("tika:content", "this is the content");
+        m1.add(TikaCoreProperties.TIKA_CONTENT, "this is the content");
         m1.add("zk1", "v1");
         m1.add("zk1", "v2");
         m1.add("zk1", "v3");
@@ -121,7 +121,7 @@ public class JsonMetadataListTest {
         assertTrue(writer.toString().startsWith("[ {\n" +
                 "  \"zk1\" : [ \"v1\", \"v2\", \"v3\", \"v4\", \"v4\" ],\n" +
                 "  \"zk2\" : \"v1\",\n" +
-                "  \"tika:content\" : \"this is the content\"\n" +
+                "  \"X-TIKA:content\" : \"this is the content\"\n" +
                 "},"));
 
 
diff --git a/tika-server/tika-server-client/pom.xml b/tika-server/tika-server-client/pom.xml
index d3daf68..920b644 100644
--- a/tika-server/tika-server-client/pom.xml
+++ b/tika-server/tika-server-client/pom.xml
@@ -32,16 +32,16 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tika-serialization</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
             <version>${httpcomponents.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>${gson.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
         </dependency>
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index 73ff3f4..336cb7d 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -16,27 +16,21 @@
  */
 package org.apache.tika.server.client;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 import java.io.IOException;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
 public class TikaClient {
 
-    private static final Gson GSON = new GsonBuilder().create();
-
     private final Random random = new Random();
-    private final TikaConfig tikaConfig;
     private List<TikaHttpClient> clients;
 
 
@@ -45,11 +39,11 @@ public class TikaClient {
         for (String url : tikaServers) {
             clients.add(TikaHttpClient.get(url));
         }
-        return new TikaClient(tikaConfig, clients);
+        return new TikaClient(clients);
     }
 
-    private TikaClient(TikaConfig tikaConfig, List<TikaHttpClient> clients) {
-        this.tikaConfig = tikaConfig;
+    private TikaClient(List<TikaHttpClient> clients) {
+
         this.clients = clients;
     }
 
@@ -57,37 +51,20 @@ public class TikaClient {
 
     }*/
 
-    public TikaEmitterResult parse(FetchEmitTuple fetchEmit, Metadata metadata)
+    public TikaEmitterResult parse(FetchEmitTuple fetchEmit)
             throws IOException, TikaException {
         TikaHttpClient client = getHttpClient();
-        String jsonRequest = jsonifyRequest(fetchEmit, metadata);
-        return client.postJson(jsonRequest);
-
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTuple.toJson(fetchEmit, writer);
+        return client.postJson(writer.toString());
     }
 
-    private String jsonifyRequest(FetchEmitTuple fetchEmit, Metadata metadata) {
-        JsonObject root = new JsonObject();
-        root.add("fetcher", new JsonPrimitive(fetchEmit.getFetchKey().getFetcherName()));
-        root.add("fetchKey", new JsonPrimitive(fetchEmit.getFetchKey().getKey()));
-        root.add("emitter", new JsonPrimitive(fetchEmit.getEmitKey().getEmitterName()));
-        root.add("emitKey", new JsonPrimitive(fetchEmit.getEmitKey().getKey()));
-        if (metadata.size() > 0) {
-            JsonObject m = new JsonObject();
-            for (String n : metadata.names()) {
-                String[] vals = metadata.getValues(n);
-                if (vals.length == 1) {
-                    m.add(n, new JsonPrimitive(vals[0]));
-                } else if (vals.length > 1) {
-                    JsonArray arr = new JsonArray();
-                    for (int i = 0; i < vals.length; i++) {
-                        arr.add(vals[i]);
-                    }
-                    m.add(n, arr);
-                }
-            }
-            root.add("metadata", m);
-        }
-        return GSON.toJson(root);
+    public TikaEmitterResult parseAsync(List<FetchEmitTuple> tuples)
+            throws IOException, TikaException {
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTupleList.toJson(tuples, writer);
+        TikaHttpClient client = getHttpClient();
+        return client.postJsonAsync(writer.toString());
     }
 
     private TikaHttpClient getHttpClient() {
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 4b438e2..d760ab9 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -27,6 +27,7 @@ import org.xml.sax.SAXException;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -108,6 +109,42 @@ public class TikaClientCLI {
                 servers, numThreads);
     }
 
+    private class AsyncFetchWorker implements Callable<Integer> {
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
+        private final TikaClient client;
+        public AsyncFetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
+            this.queue = queue;
+            this.client = client;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            List<FetchEmitTuple> localCache = new ArrayList<>();
+            while (true) {
+
+                FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+                if (t == null) {
+                    send(localCache);
+                    throw new TimeoutException("exceeded maxWaitMs");
+                }
+                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                    send(localCache);
+                    return 1;
+                }
+                if (localCache.size() > 20) {
+                    LOGGER.debug("about to send: {}", localCache.size());
+                    send(localCache);
+                    localCache.clear();
+                }
+                localCache.add(t);
+            }
+        }
+
+        private void send(List<FetchEmitTuple> localCache) {
+
+        }
+    }
+
     private class FetchWorker implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final TikaClient client;
@@ -130,7 +167,7 @@ public class TikaClientCLI {
                 }
                 try {
                     LOGGER.debug("about to parse: {}", t.getFetchKey());
-                    client.parse(t, t.getMetadata());
+                    client.parse(t);
                 } catch (IOException e) {
                     LOGGER.warn(t.getFetchKey().toString(), e);
                 } catch (TikaException e) {
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
index 621f5db..591064a 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
@@ -18,7 +18,6 @@ package org.apache.tika.server.client;
 
 import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
-import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
@@ -26,6 +25,7 @@ import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +39,14 @@ import java.nio.charset.StandardCharsets;
  */
 class TikaHttpClient {
 
-    private static final String ENDPOINT = "emit";
+    private static final String EMIT_ENDPOINT = "emit";
     private static final String TIKA_ENDPOINT = "tika";
+    private static final String ASYNC_ENDPOINT = "async";
     private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClient.class);
     private final HttpHost httpHost;
     private final HttpClient httpClient;
-    private final String endPointUrl;
+    private final String emitEndPointUrl;
+    private final String asyncEndPointUrl;
     private final String tikaUrl;
     private int maxRetries = 3;
     //if can't make contact with Tika server, max wait time in ms
@@ -53,38 +55,46 @@ class TikaHttpClient {
     private long pulseWaitForTikaMs = 1000;
 
     static TikaHttpClient get(String baseUrl) throws TikaClientConfigException {
-        String endPointUrl = baseUrl.endsWith("/") ? baseUrl+ENDPOINT : baseUrl+"/"+ENDPOINT;
-        String tikaUrl = baseUrl.endsWith("/") ? baseUrl+TIKA_ENDPOINT : baseUrl+"/"+TIKA_ENDPOINT;
         URI uri;
         try {
-            uri = new URI(endPointUrl);
+            uri = new URI(baseUrl);
         } catch (URISyntaxException e) {
             throw new TikaClientConfigException("bad URI", e);
         }
         HttpHost httpHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
         //TODO: need to add other configuration stuff? proxy, username, password, timeouts...
         HttpClient client = HttpClients.createDefault();
-        return new TikaHttpClient(endPointUrl, tikaUrl, httpHost, client);
+        return new TikaHttpClient(baseUrl, httpHost, client);
     }
 
     /**
      *
-     * @param endPointUrl full url to the tika-server including endpoint
-     * @param tikaUrl url to /tika endpoint to use to check on server status
+     * @param baseUrl url to base endpoint
      * @param httpHost
      * @param httpClient
      */
-    private TikaHttpClient(String endPointUrl, String tikaUrl, HttpHost httpHost, HttpClient httpClient) {
-        this.endPointUrl = endPointUrl;
-        this.tikaUrl = tikaUrl;
+    private TikaHttpClient(String baseUrl, HttpHost httpHost, HttpClient httpClient) {
+        if (! baseUrl.endsWith("/")) {
+            baseUrl += "/";
+        }
+        this.emitEndPointUrl = baseUrl+EMIT_ENDPOINT;
+        this.asyncEndPointUrl = baseUrl+ASYNC_ENDPOINT;
+        this.tikaUrl = baseUrl+TIKA_ENDPOINT;
         this.httpHost = httpHost;
         this.httpClient = httpClient;
     }
 
+    public TikaEmitterResult postJsonAsync(String jsonRequest) {
+        return postJson(asyncEndPointUrl, jsonRequest);
+    }
 
     public TikaEmitterResult postJson(String jsonRequest) {
-        System.out.println("NED:"+endPointUrl);
-        HttpPost post = new HttpPost(endPointUrl);
+        return postJson(emitEndPointUrl, jsonRequest);
+    }
+
+    private TikaEmitterResult postJson(String endPoint, String jsonRequest) {
+        System.out.println("NED:"+endPoint);
+        HttpPost post = new HttpPost(endPoint);
         ByteArrayEntity entity = new ByteArrayEntity(jsonRequest.getBytes(StandardCharsets.UTF_8));
         post.setEntity(entity);
         post.setHeader("Content-Type", "application/json");
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 1bacbc5..5135f55 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -37,7 +37,10 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.digestutils.BouncyCastleDigester;
 import org.apache.tika.parser.digestutils.CommonsDigester;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.server.core.resource.AsyncEmitter;
+import org.apache.tika.server.core.resource.AsyncParser;
 import org.apache.tika.server.core.resource.AsyncResource;
 import org.apache.tika.server.core.resource.DetectorResource;
 import org.apache.tika.server.core.resource.EmitterResource;
@@ -151,37 +154,42 @@ public class TikaServerProcess {
 
     private static void mainLoop(CommandLine commandLine, Options options) throws Exception {
         AsyncResource asyncResource = null;
-        ArrayBlockingQueue asyncQueue = null;
-        int numAsyncThreads = 10;
+        ArrayBlockingQueue<FetchEmitTuple> asyncFetchEmitQueue = null;
+        ArrayBlockingQueue<EmitData> asyncEmitData = null;
+        int numAsyncParserThreads = 10;
         if (commandLine.hasOption(ENABLE_UNSECURE_FEATURES)) {
             asyncResource = new AsyncResource();
-            asyncQueue = asyncResource.getQueue(numAsyncThreads);
+            asyncFetchEmitQueue = asyncResource.getFetchEmitQueue(10000);
+            asyncEmitData = asyncResource.getEmitDataQueue(1000);
         }
 
         ServerDetails serverDetails = initServer(commandLine, asyncResource);
-        ExecutorService executorService = Executors.newFixedThreadPool(numAsyncThreads+1);
+        ExecutorService executorService = Executors.newFixedThreadPool(numAsyncParserThreads+1);
         ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
 
-        executorCompletionService.submit(new ServerThread(serverDetails));
-        if (asyncQueue != null) {
-            for (int i = 0; i < numAsyncThreads; i++) {
-                executorCompletionService.submit(new AsyncEmitter(asyncQueue));
+        if (asyncFetchEmitQueue != null) {
+            executorCompletionService.submit(new AsyncEmitter(asyncEmitData));
+            for (int i = 0; i < numAsyncParserThreads; i++) {
+                executorCompletionService.submit(new AsyncParser(asyncFetchEmitQueue, asyncEmitData));
             }
         }
+        //start the server
+        Server server = serverDetails.sf.create();
+        LOG.info("Started Apache Tika server {} at {}",
+                serverDetails.serverId,
+                serverDetails.url);
+
         while (true) {
             Future<Integer> future = executorCompletionService.poll(1, TimeUnit.MINUTES);
             if (future != null) {
-                System.out.println("future val: " + future.get());
+                LOG.warn("future val: " + future.get());
             }
         }
     }
 
-    //This starts the server in this process.  This can
-    //be either a direct call from -noFork or the process that is forked
-    //in the 2.0 default mode.
+    //This returns the server, configured and ready to be started.
     private static ServerDetails initServer(CommandLine line,
                                      AsyncResource asyncResource) throws Exception {
-
         String host = null;
 
         if (line.hasOption("host")) {
@@ -459,22 +467,6 @@ public class TikaServerProcess {
         return serverTimeouts;
     }
 
-    private static class ServerThread implements Callable<Integer> {
-        private final ServerDetails serverDetails;
-        public ServerThread(ServerDetails serverDetails) {
-            this.serverDetails = serverDetails;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-
-            Server server = serverDetails.sf.create();
-            LOG.info("Started Apache Tika server {} at {}",
-                        serverDetails.serverId,
-                        serverDetails.url);
-            return 2;
-        }
-    }
 
     private static class ServerDetails {
         JAXRSServerFactoryBean sf;
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
index f473699..315927a 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
@@ -1,122 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.server.core.resource;
 
-import org.apache.cxf.jaxrs.impl.UriInfoImpl;
-import org.apache.cxf.message.MessageImpl;
-import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.utils.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.core.MultivaluedHashMap;
 import java.io.IOException;
-import java.io.InputStream;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Worker thread that takes ASyncRequests off the queue and
- * processes them.
+ * Worker thread that takes EmitData off the queue, batches it
+ * and tries to emit it as a batch
  */
 public class AsyncEmitter implements Callable<Integer> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
 
+    //TODO -- need to configure these
+    private final long emitWithinMs = 1000;
+
+    private long maxEstimatedBytes = 10_000_000;
 
-    private long maxCacheSizeBytes = 10_000_000;
+    private final ArrayBlockingQueue<EmitData> emitDataQueue;
 
-    private final ArrayBlockingQueue<AsyncRequest> queue;
+    Instant lastEmitted = Instant.now();
 
-    public AsyncEmitter(ArrayBlockingQueue<AsyncRequest> queue) {
-        this.queue = queue;
+    public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData) {
+        this.emitDataQueue = emitData;
     }
 
     @Override
     public Integer call() throws Exception {
+        EmitDataCache cache = new EmitDataCache(maxEstimatedBytes);
+
         while (true) {
-            AsyncRequest request = queue.poll(1, TimeUnit.MINUTES);
-            if (request != null) {
-                processTuple(request);
+            EmitData emitData = emitDataQueue.poll(100, TimeUnit.MILLISECONDS);
+            if (emitData != null) {
+                //this can block on emitAll
+                cache.add(emitData);
             } else {
                 LOG.trace("Nothing on the async queue");
             }
+            LOG.debug("cache size: ({}) bytes and count: {}",
+                    cache.estimatedSize, cache.size);
+            long elapsed = ChronoUnit.MILLIS.between(lastEmitted, Instant.now());
+            if (elapsed > emitWithinMs) {
+                LOG.debug("{} elapsed > {}, going to emitAll",
+                        elapsed, emitWithinMs);
+                //this can block
+                cache.emitAll();
+            }
         }
     }
 
-    private void processTuple(AsyncRequest request) {
-        LOG.debug("Starting request id ({}) of size ({})",
-                request.getId(), request.getTuples().size());
-        List<EmitData> cachedEmitData = new ArrayList<>();
-        Emitter emitter = TikaResource.getConfig()
-                .getEmitterManager()
-                .getEmitter(
-                    request.getTuples().get(0).getEmitKey().getEmitterName());
-        long currSize = 0;
-        for (FetchEmitTuple t : request.getTuples()) {
-            EmitData emitData = processTuple(t);
-            long estimated = AbstractEmitter.estimateSizeInBytes(
-                    emitData.getEmitKey().getKey(), emitData.getMetadataList());
-            if (estimated + currSize > maxCacheSizeBytes) {
-                tryToEmit(emitter, cachedEmitData, request);
-                cachedEmitData.clear();
-            }
-            cachedEmitData.add(emitData);
-            currSize += estimated;
+    private class EmitDataCache {
+        private final long maxBytes;
+
+        long estimatedSize = 0;
+        int size = 0;
+        Map<String, List<EmitData>> map = new HashMap<>();
+
+        public EmitDataCache(long maxBytes) {
+            this.maxBytes = maxBytes;
         }
-        tryToEmit(emitter, cachedEmitData, request);
-        cachedEmitData.clear();
-        LOG.debug("Completed request id ({})",
-                request.getId(), request.getTuples().size());
-    }
 
-    private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData,
-                           AsyncRequest request) {
-        try {
-            emitter.emit(cachedEmitData);
-        } catch (IOException|TikaEmitterException e) {
-            LOG.warn("async id ({}) emitter class ({}): {}",
-                    request.getId(), emitter.getClass(),
-                    ExceptionUtils.getStackTrace(e));
+        void updateEstimatedSize(long newBytes) {
+            estimatedSize += newBytes;
         }
-    }
 
-    private EmitData processTuple(FetchEmitTuple t) {
-        Metadata userMetadata = t.getMetadata();
-        Metadata metadata = new Metadata();
-        String fetcherName = t.getFetchKey().getFetcherName();
-        String fetchKey = t.getFetchKey().getKey();
-        List<Metadata> metadataList = null;
-        try (InputStream stream =
-                     TikaResource.getConfig().getFetcherManager()
-                             .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
-
-            metadataList = RecursiveMetadataResource.parseMetadata(
-                    stream,
-                    metadata,
-                    new MultivaluedHashMap<>(),
-                    new UriInfoImpl(new MessageImpl()), "text");
-        } catch (SecurityException e) {
-            throw e;
-        } catch (Exception e) {
-            LOG.warn(t.toString(), e);
+        void add(EmitData data) {
+            size++;
+            long sz = AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), data.getMetadataList());
+            if (estimatedSize + sz > maxBytes) {
+                LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
+                        (estimatedSize+sz), maxBytes);
+                emitAll();
+            }
+            List<EmitData> cached = map.get(data.getEmitKey().getEmitterName());
+            if (cached == null) {
+                cached = new ArrayList<>();
+                map.put(data.getEmitKey().getEmitterName(), cached);
+            }
+            updateEstimatedSize(sz);
+            cached.add(data);
         }
-        injectUserMetadata(userMetadata, metadataList);
-        return new EmitData(t.getEmitKey(), metadataList);
-    }
 
-    private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
-        for (String n : userMetadata.names()) {
-            //overwrite whatever was there
-            metadataList.get(0).set(n, null);
-            for (String val : userMetadata.getValues(n)) {
-                metadataList.get(0).add(n, val);
+        private void emitAll() {
+            int emitted = 0;
+            LOG.debug("about to emit {}", size);
+            for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
+                Emitter emitter = TikaResource.getConfig()
+                        .getEmitterManager().getEmitter(e.getKey());
+                tryToEmit(emitter, e.getValue());
+                emitted += e.getValue().size();
+            }
+            LOG.debug("emitted: {}", emitted);
+            estimatedSize = 0;
+            size = 0;
+            map.clear();
+            lastEmitted = Instant.now();
+        }
+
+        private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData) {
+
+            try {
+                emitter.emit(cachedEmitData);
+            } catch (IOException|TikaEmitterException e) {
+                LOG.warn("emitter class ({}): {}", emitter.getClass(),
+                        ExceptionUtils.getStackTrace(e));
             }
         }
     }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
new file mode 100644
index 0000000..46098e9
--- /dev/null
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server.core.resource;
+
+import org.apache.cxf.jaxrs.impl.UriInfoImpl;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedHashMap;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Worker thread that takes {@link FetchEmitTuple} off the queue, parses
+ * the file and puts the {@link EmitData} on the queue for the {@link AsyncEmitter}.
+ *
+ */
+public class AsyncParser implements Callable<Integer> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncParser.class);
+
+    private final ArrayBlockingQueue<FetchEmitTuple> queue;
+    private final ArrayBlockingQueue<EmitData> emitDataQueue;
+
+    public AsyncParser(ArrayBlockingQueue<FetchEmitTuple> queue,
+                       ArrayBlockingQueue<EmitData> emitData) {
+        this.queue = queue;
+        this.emitDataQueue = emitData;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        int parsed = 0;
+        while (true) {
+            FetchEmitTuple request = queue.poll(1, TimeUnit.MINUTES);
+            if (request != null) {
+                EmitData emitData = processTuple(request);
+                boolean offered = emitDataQueue.offer(emitData, 10, TimeUnit.MINUTES);
+                parsed++;
+                if (! offered) {
+                    //TODO: deal with this
+                }
+            } else {
+                LOG.trace("Nothing on the async queue");
+            }
+        }
+    }
+
+    private EmitData processTuple(FetchEmitTuple t) {
+        Metadata userMetadata = t.getMetadata();
+        Metadata metadata = new Metadata();
+        String fetcherName = t.getFetchKey().getFetcherName();
+        String fetchKey = t.getFetchKey().getKey();
+        List<Metadata> metadataList = null;
+        try (InputStream stream =
+                     TikaResource.getConfig().getFetcherManager()
+                             .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
+            metadataList = RecursiveMetadataResource.parseMetadata(
+                    stream,
+                    metadata,
+                    new MultivaluedHashMap<>(),
+                    new UriInfoImpl(new MessageImpl()), "text");
+        } catch (SecurityException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn(t.toString(), e);
+        }
+        injectUserMetadata(userMetadata, metadataList);
+        EmitKey emitKey = t.getEmitKey();
+        if (StringUtils.isBlank(emitKey.getKey())) {
+            emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+        }
+        return new EmitData(emitKey, metadataList);
+    }
+
+    private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
+        for (String n : userMetadata.names()) {
+            //overwrite whatever was there
+            metadataList.get(0).set(n, null);
+            for (String val : userMetadata.getValues(n)) {
+                metadataList.get(0).add(n, val);
+            }
+        }
+    }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
index f46105c..9e3566c 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.tika.server.core.resource;
 
 import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
@@ -5,18 +21,12 @@ import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import java.util.List;
 
 public class AsyncRequest {
-    private final String id;
     private final List<FetchEmitTuple> tuples;
 
-    public AsyncRequest(String id, List<FetchEmitTuple> tuples) {
-        this.id = id;
+    public AsyncRequest(List<FetchEmitTuple> tuples) {
         this.tuples = tuples;
     }
 
-    public String getId() {
-        return id;
-    }
-
     public List<FetchEmitTuple> getTuples() {
         return tuples;
     }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 240933f..248ebb0 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -17,9 +17,10 @@
 
 package org.apache.tika.server.core.resource;
 
-import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.FetchKey;
@@ -28,6 +29,7 @@ import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.BadRequestException;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -35,10 +37,17 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
 import java.io.InputStream;
 
-import java.util.Collections;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -48,14 +57,18 @@ public class AsyncResource {
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncResource.class);
 
-    private static final int DEFAULT_QUEUE_SIZE = 100;
-    private int queueSize = DEFAULT_QUEUE_SIZE;
-    private ArrayBlockingQueue<AsyncRequest> queue;
+    private static final int DEFAULT_FETCH_EMIT_QUEUE_SIZE = 10000;
+    long maxQueuePauseMs = 60000;
+    private ArrayBlockingQueue<FetchEmitTuple> queue;
 
-    public ArrayBlockingQueue<AsyncRequest> getQueue(int numThreads) {
-        this.queue = new ArrayBlockingQueue<>(queueSize+numThreads);
+    public ArrayBlockingQueue<FetchEmitTuple> getFetchEmitQueue(int queueSize) {
+        this.queue = new ArrayBlockingQueue<>(queueSize);
         return queue;
     }
+
+    public ArrayBlockingQueue<EmitData> getEmitDataQueue(int size) {
+        return new ArrayBlockingQueue<>(size);
+    }
     /**
      * The client posts a json request.  At a minimum, this must be a
      * json object that contains an emitter and a fetcherString key with
@@ -74,14 +87,18 @@ public class AsyncResource {
      */
     @POST
     @Produces("application/json")
-    public Map<String, String> post(InputStream is,
+    public Map<String, Object> post(InputStream is,
                                          @Context HttpHeaders httpHeaders,
                                          @Context UriInfo info
     ) throws Exception {
 
         AsyncRequest request = deserializeASyncRequest(is);
-        FetcherManager fetcherManager = TikaConfig.getDefaultConfig().getFetcherManager();
-        EmitterManager emitterManager = TikaConfig.getDefaultConfig().getEmitterManager();
+
+        //make sure that there are no problems with
+        //the requested fetchers and emitters
+        //throw early
+        FetcherManager fetcherManager = TikaResource.getConfig().getFetcherManager();
+        EmitterManager emitterManager = TikaResource.getConfig().getEmitterManager();
         for (FetchEmitTuple t : request.getTuples()) {
             if (! fetcherManager.getSupported().contains(t.getFetchKey().getFetcherName())) {
                 return badFetcher(t.getFetchKey());
@@ -90,34 +107,61 @@ public class AsyncResource {
                 return badEmitter(t.getEmitKey());
             }
         }
+        Instant start = Instant.now();
+        long elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
+        List<FetchEmitTuple> notAdded = new ArrayList<>();
+        int addedCount = 0;
+        for (FetchEmitTuple t : request.getTuples()) {
+            boolean offered = false;
+            while (!offered && elapsed < maxQueuePauseMs) {
+                offered = queue.offer(t, 10, TimeUnit.MILLISECONDS);
+                elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
+            }
+            if (! offered) {
+                notAdded.add(t);
+            } else {
+                addedCount++;
+            }
+        }
 
-        //parameterize
-        boolean offered = queue.offer(request, 60, TimeUnit.SECONDS);
-        if (! offered) {
-            return throttleResponse();
+        if (notAdded.size() > 0) {
+            return throttle(notAdded, addedCount);
         }
-        return ok(request.getId(), request.getTuples().size());
+        return ok(request.getTuples().size());
     }
 
-    private Map<String, String> ok(String id, int size) {
-        return null;
+    private Map<String, Object> ok(int size) {
+        Map<String, Object> map = new HashMap<>();
+        map.put("status", "ok");
+        map.put("added", size);
+        return map;
     }
 
-    private Map<String, String> throttleResponse() {
-        Map<String, String> map = new HashMap<>();
+    private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
+        List<String> fetchKeys = new ArrayList<>();
+        for (FetchEmitTuple t : notAdded) {
+            fetchKeys.add(t.getFetchKey().getKey());
+        }
+        Map<String, Object> map = new HashMap<>();
+        map.put("status", "throttled");
+        map.put("added", added);
+        map.put("skipped", notAdded.size());
+        map.put("skippedFetchKeys", fetchKeys);
         return map;
     }
 
-    private Map<String, String> badEmitter(EmitKey emitKey) {
-        return null;
+    private Map<String, Object> badEmitter(EmitKey emitKey) {
+        throw new BadRequestException("can't find emitter for "+emitKey.getEmitterName());
     }
 
-    private Map<String, String> badFetcher(FetchKey fetchKey) {
-        return null;
+    private Map<String, Object> badFetcher(FetchKey fetchKey) {
+        throw new BadRequestException("can't find fetcher for " + fetchKey.getFetcherName());
     }
 
-    private AsyncRequest deserializeASyncRequest(InputStream is) {
-        return new AsyncRequest("", Collections.EMPTY_LIST);
+    private AsyncRequest deserializeASyncRequest(InputStream is) throws IOException {
+        try (Reader reader = new InputStreamReader(is, StandardCharsets.UTF_8)) {
+            return new AsyncRequest(JsonFetchEmitTupleList.fromJson(reader));
+        }
     }
 
 }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java
index 8668b16..7a634fd 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java
@@ -132,7 +132,7 @@ public class MetadataResource {
         //no need to parse embedded docs
         context.set(DocumentSelector.class, metadata1 -> false);
 
-        TikaResource.logRequest(LOG, info, metadata);
+        TikaResource.logRequest(LOG, "/meta", metadata);
         TikaResource.parse(parser, LOG, info.getPath(), is,
                 new LanguageHandler() {
                     public void endDocument() {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index e01a4fa..f6f340c 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -144,7 +144,7 @@ public class RecursiveMetadataResource {
 
         fillMetadata(parser, metadata, httpHeaders);
         fillParseContext(httpHeaders, metadata, context);
-        TikaResource.logRequest(LOG, info, metadata);
+        TikaResource.logRequest(LOG, "/rmeta", metadata);
 
         int writeLimit = -1;
         if (httpHeaders.containsKey("writeLimit")) {
@@ -162,7 +162,7 @@ public class RecursiveMetadataResource {
                 new BasicContentHandlerFactory(type, writeLimit), maxEmbeddedResources,
                 TikaResource.getConfig().getMetadataFilter());
         try {
-            TikaResource.parse(wrapper, LOG, info.getPath(), is, handler, metadata, context);
+            TikaResource.parse(wrapper, LOG, "/rmeta", is, handler, metadata, context);
         } catch (TikaServerParseException e) {
             //do nothing
         } catch (SecurityException|WebApplicationException e) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
index 46cf093..d2c33e9 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
@@ -354,11 +354,12 @@ public class TikaResource {
         }
     }
 
-    public static void logRequest(Logger logger, UriInfo info, Metadata metadata) {
+    public static void logRequest(Logger logger, String endpoint, Metadata metadata) {
+
         if (metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE) == null) {
-            logger.info("{} (autodetecting type)", info.getPath());
+            logger.info("{} (autodetecting type)", endpoint);
         } else {
-            logger.info("{} ({})", info.getPath(), metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE));
+            logger.info("{} ({})", endpoint, metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE));
         }
     }
 
@@ -403,7 +404,7 @@ public class TikaResource {
         fillMetadata(parser, metadata, httpHeaders);
         fillParseContext(httpHeaders, metadata, context);
 
-        logRequest(LOG, info, metadata);
+        logRequest(LOG, "/tika", metadata);
 
         return new StreamingOutput() {
             public void write(OutputStream outputStream) throws IOException, WebApplicationException {
@@ -432,7 +433,7 @@ public class TikaResource {
         fillMetadata(parser, metadata, httpHeaders);
         fillParseContext(httpHeaders, metadata, context);
 
-        logRequest(LOG, info, metadata);
+        logRequest(LOG, "/tika", metadata);
 
         return new StreamingOutput() {
             public void write(OutputStream outputStream) throws IOException, WebApplicationException {
@@ -489,7 +490,7 @@ public class TikaResource {
         fillParseContext(httpHeaders, metadata, context);
 
 
-        logRequest(LOG, info, metadata);
+        logRequest(LOG, "/tika", metadata);
 
         return new StreamingOutput() {
             public void write(OutputStream outputStream)
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java
index ed0320b..a3d733a 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java
@@ -126,7 +126,7 @@ public class UnpackerResource {
         fillMetadata(parser, metadata, httpHeaders.getRequestHeaders());
         fillParseContext(httpHeaders.getRequestHeaders(), metadata, pc);
 
-        TikaResource.logRequest(LOG, info, metadata);
+        TikaResource.logRequest(LOG, "/unpack", metadata);
         //even though we aren't currently parsing embedded documents,
         //we need to add this to allow for "inline" use of other parsers.
         pc.set(Parser.class, parser);
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
similarity index 58%
copy from tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
copy to tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index ecf02de..2d188d9 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -17,15 +17,19 @@
 package org.apache.tika.server.core;
 
 
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.FileUtils;
 import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,37 +40,35 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
-public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
+@Ignore("useful for development...need to turn it into a real unit test")
+public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
 
-    private static final Logger LOG = LoggerFactory.getLogger(TikaServerEmitterIntegrationTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
 
     private static Path TMP_DIR;
     private static Path TMP_OUTPUT_DIR;
     private static String TIKA_CONFIG_XML;
     private static Path TIKA_CONFIG;
 
+    private static final int NUM_FILES = 8034;
     private static final String EMITTER_NAME = "fse";
     private static final String FETCHER_NAME = "fsf";
 
+    private static List<String> FILE_LIST = new ArrayList<>();
     private static String[] FILES = new String[] {
             "hello_world.xml",
-            "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
+           // "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
     };
 
     @BeforeClass
@@ -77,10 +79,16 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
         Files.createDirectories(inputDir);
         Files.createDirectories(TMP_OUTPUT_DIR);
 
-        for (String mockFile : FILES) {
-            Files.copy(TikaEmitterTest.class.getResourceAsStream(
-                    "/test-documents/mock/"+mockFile),
-                    inputDir.resolve(mockFile));
+        for (int i = 0; i < NUM_FILES; i++) {
+            for (String mockFile : FILES) {
+                String targetName = i+"-"+mockFile;
+                Path target = inputDir.resolve(targetName);
+                FILE_LIST.add(targetName);
+                Files.copy(TikaEmitterTest.class.getResourceAsStream(
+                        "/test-documents/mock/" + mockFile),
+                        target);
+
+            }
         }
         TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
 
@@ -135,7 +143,7 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
                 TikaServerCli.main(
                         new String[]{
                                 "-enableUnsecureFeatures",
-                                "-maxFiles", "2000",
+                                "-maxFiles", "10000",
                                 "-p", INTEGRATION_TEST_PORT,
                                 "-tmpFilePrefix", "basic-",
                                 "-config", TIKA_CONFIG.toAbsolutePath().toString()
@@ -143,89 +151,27 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
             }
         };
         serverThread.start();
-        try {
-            testOne("hello_world.xml", true);
-        } finally {
-            serverThread.interrupt();
-        }
-    }
-
-    @Test(expected = ProcessingException.class)
-    public void testSystemExit() throws Exception {
 
-        Thread serverThread = new Thread() {
-            @Override
-            public void run() {
-                TikaServerCli.main(
-                        new String[]{
-                                "-enableUnsecureFeatures",
-                                "-maxFiles", "2000",
-                                "-p", INTEGRATION_TEST_PORT,
-                                "-tmpFilePrefix", "basic-",
-                                "-config", TIKA_CONFIG.toAbsolutePath().toString()
-                        });
-            }
-        };
-        serverThread.start();
         try {
-            testOne("system_exit.xml", false);
-        } finally {
-            serverThread.interrupt();
-        }
-    }
-
-    @Test
-    public void testOOM() throws Exception {
-
-        Thread serverThread = new Thread() {
-            @Override
-            public void run() {
-                TikaServerCli.main(
-                        new String[]{
-                                "-enableUnsecureFeatures",
-                                "-JXmx128m",
-                                "-maxFiles", "2000",
-                                "-p", INTEGRATION_TEST_PORT,
-                                "-tmpFilePrefix", "basic-",
-                                "-config", TIKA_CONFIG.toAbsolutePath().toString()
-                        });
+            JsonNode response = sendAsync(FILE_LIST);
+            System.out.println(response);
+            int targets = 0;
+            while (targets < NUM_FILES) {
+                System.out.println("targets "+targets);
+                targets = countTargets();
+                Thread.sleep(1000);
             }
-        };
-        serverThread.start();
-        try {
-            JsonNode response = testOne("real_oom.xml", false);
-            assertContains("heap space", response.get("parse_error").asText());
+            System.out.println("TARGETS: "+targets);
         } finally {
             serverThread.interrupt();
         }
     }
 
-    @Test(expected = ProcessingException.class)
-    public void testTimeout() throws Exception {
-
-        Thread serverThread = new Thread() {
-            @Override
-            public void run() {
-                TikaServerCli.main(
-                        new String[]{
-                                "-enableUnsecureFeatures",
-                                "-JXmx128m",
-                                "-taskTimeoutMillis", "2000", "-taskPulseMillis", "100",
-                                "-p", INTEGRATION_TEST_PORT,
-                                "-tmpFilePrefix", "basic-",
-                                "-config", TIKA_CONFIG.toAbsolutePath().toString()
-                        });
-            }
-        };
-        serverThread.start();
-        try {
-            JsonNode response = testOne("heavy_hang_30000.xml", false);
-            assertContains("heap space", response.get("parse_error").asText());
-        } finally {
-            serverThread.interrupt();
-        }
+    private int countTargets() {
+        return TMP_OUTPUT_DIR.toFile().listFiles().length;
     }
 
+
     private void awaitServerStartup() throws Exception {
         Instant started = Instant.now();
         long elapsed = Duration.between(started, Instant.now()).toMillis();
@@ -241,7 +187,7 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
                 }
                 LOG.debug("tika test client failed to connect to server with status: {}", response.getStatus());
 
-            } catch (javax.ws.rs.ProcessingException e) {
+            } catch (ProcessingException e) {
                 LOG.debug("tika test client failed to connect to server", e);
             }
 
@@ -252,28 +198,28 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
                 elapsed + " ms");
     }
 
-    private JsonNode testOne(String fileName, boolean shouldFileExist) throws Exception {
+    private JsonNode sendAsync(List<String> fileNames) throws Exception {
         awaitServerStartup();
+        List<FetchEmitTuple> tuples = new ArrayList<>();
+        for (String f : fileNames) {
+            tuples.add(getFetchEmitTuple(f));
+        }
+        String json = JsonFetchEmitTupleList.toJson(tuples);
+
         Response response = WebClient
-                .create(endPoint + "/emit")
+                .create(endPoint + "/async")
                 .accept("application/json")
-                .post(getJsonString(fileName));
-        if (shouldFileExist) {
-            Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
-            assertTrue(Files.size(targFile) > 1);
-        }
+                .post(json);
+        System.out.println("status: " + response.getStatus());
         Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
         return new ObjectMapper().readTree(reader);
     }
 
-    private String getJsonString(String fileName) throws IOException {
-        StringWriter writer = new StringWriter();
-        try (JsonGenerator generator = new JsonFactory().createGenerator(writer)) {
-            generator.writeStartObject();
-            generator.writeStringField("fetcher", FETCHER_NAME);
-            generator.writeStringField("fetchKey", fileName);
-            generator.writeStringField("emitter", EMITTER_NAME);
-        }
-        return writer.toString();
+    private FetchEmitTuple getFetchEmitTuple(String fileName) throws IOException {
+        return new FetchEmitTuple(
+                new FetchKey(FETCHER_NAME, fileName),
+                new EmitKey(EMITTER_NAME, ""),
+                new Metadata()
+        );
     }
 }
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index ecf02de..04488d1 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -17,12 +17,15 @@
 package org.apache.tika.server.core;
 
 
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.FileUtils;
 import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -36,21 +39,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.List;
-import java.util.Random;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
 
@@ -64,7 +63,7 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
     private static final String EMITTER_NAME = "fse";
     private static final String FETCHER_NAME = "fsf";
 
-    private static String[] FILES = new String[] {
+    private static String[] FILES = new String[]{
             "hello_world.xml",
             "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
     };
@@ -79,30 +78,30 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
 
         for (String mockFile : FILES) {
             Files.copy(TikaEmitterTest.class.getResourceAsStream(
-                    "/test-documents/mock/"+mockFile),
+                    "/test-documents/mock/" + mockFile),
                     inputDir.resolve(mockFile));
         }
         TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
 
-        TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"+
-                "<properties>"+
-                "<fetchers>"+
-                "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">"+
-                "<params>"+
-                "<param name=\"name\" type=\"string\">"+FETCHER_NAME+"</param>"+
-                "<param name=\"basePath\" type=\"string\">"+inputDir.toAbsolutePath()+"</param>"+
-                "</params>"+
-                "</fetcher>"+
-                "</fetchers>"+
-                "<emitters>"+
-                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">"+
-                "<params>"+
-                "<param name=\"name\" type=\"string\">"+EMITTER_NAME+"</param>"+
+        TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
+                "<properties>" +
+                "<fetchers>" +
+                "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+                "<params>" +
+                "<param name=\"name\" type=\"string\">" + FETCHER_NAME + "</param>" +
+                "<param name=\"basePath\" type=\"string\">" + inputDir.toAbsolutePath() + "</param>" +
+                "</params>" +
+                "</fetcher>" +
+                "</fetchers>" +
+                "<emitters>" +
+                "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+                "<params>" +
+                "<param name=\"name\" type=\"string\">" + EMITTER_NAME + "</param>" +
 
-                "<param name=\"basePath\" type=\"string\">"+ TMP_OUTPUT_DIR.toAbsolutePath()+"</param>"+
-                "</params>"+
-                "</emitter>"+
-                "</emitters>"+
+                "<param name=\"basePath\" type=\"string\">" + TMP_OUTPUT_DIR.toAbsolutePath() + "</param>" +
+                "</params>" +
+                "</emitter>" +
+                "</emitters>" +
                 "</properties>";
 
         FileUtils.write(TIKA_CONFIG.toFile(), TIKA_CONFIG_XML, UTF_8);
@@ -144,7 +143,10 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
         };
         serverThread.start();
         try {
-            testOne("hello_world.xml", true);
+            JsonNode node = testOne("hello_world.xml", true);
+            assertEquals("ok", node.get("status").asText());
+        } catch (Exception e) {
+            fail("shouldn't have an exception" + e.getMessage());
         } finally {
             serverThread.interrupt();
         }
@@ -220,7 +222,6 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
         serverThread.start();
         try {
             JsonNode response = testOne("heavy_hang_30000.xml", false);
-            assertContains("heap space", response.get("parse_error").asText());
         } finally {
             serverThread.interrupt();
         }
@@ -229,14 +230,14 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
     private void awaitServerStartup() throws Exception {
         Instant started = Instant.now();
         long elapsed = Duration.between(started, Instant.now()).toMillis();
-        WebClient client = WebClient.create(endPoint+"/tika").accept("text/plain");
+        WebClient client = WebClient.create(endPoint + "/tika").accept("text/plain");
         while (elapsed < MAX_WAIT_MS) {
             try {
                 Response response = client.get();
                 if (response.getStatus() == 200) {
                     elapsed = Duration.between(started, Instant.now()).toMillis();
                     LOG.info("client observes server successfully started after " +
-                            elapsed+ " ms");
+                            elapsed + " ms");
                     return;
                 }
                 LOG.debug("tika test client failed to connect to server with status: {}", response.getStatus());
@@ -258,22 +259,23 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
                 .create(endPoint + "/emit")
                 .accept("application/json")
                 .post(getJsonString(fileName));
-        if (shouldFileExist) {
-            Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
-            assertTrue(Files.size(targFile) > 1);
+        if (response.getStatus() == 200) {
+            if (shouldFileExist) {
+                Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
+                assertTrue(Files.size(targFile) > 1);
+            }
+            Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+            return new ObjectMapper().readTree(reader);
         }
-        Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
-        return new ObjectMapper().readTree(reader);
+        return null;
     }
 
     private String getJsonString(String fileName) throws IOException {
-        StringWriter writer = new StringWriter();
-        try (JsonGenerator generator = new JsonFactory().createGenerator(writer)) {
-            generator.writeStartObject();
-            generator.writeStringField("fetcher", FETCHER_NAME);
-            generator.writeStringField("fetchKey", fileName);
-            generator.writeStringField("emitter", EMITTER_NAME);
-        }
-        return writer.toString();
+        FetchEmitTuple t = new FetchEmitTuple(
+                new FetchKey(FETCHER_NAME, fileName),
+                new EmitKey(EMITTER_NAME, ""),
+                new Metadata()
+        );
+        return JsonFetchEmitTuple.toJson(t);
     }
 }