You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/02/01 22:46:46 UTC

[tika] branch TIKA-3288 created (now 73b0287)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git.


      at 73b0287  TIKA-3288 -- WIP do not merge.

This branch includes the following new commits:

     new 73b0287  TIKA-3288 -- WIP do not merge.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[tika] 01/01: TIKA-3288 -- WIP do not merge.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 73b0287742c54f21d3a2a6fa442b058d363de68a
Author: tballison <ta...@apache.org>
AuthorDate: Mon Feb 1 17:46:32 2021 -0500

    TIKA-3288 -- WIP do not merge.
---
 .../apache/tika/pipes/emitter/AbstractEmitter.java |  33 ++++++
 .../org/apache/tika/pipes/emitter/EmitData.java    |  32 ++++++
 .../org/apache/tika/pipes/emitter/EmitKey.java     |   2 +-
 .../org/apache/tika/pipes/emitter/Emitter.java     |  10 +-
 .../apache/tika/pipes/emitter/EmptyEmitter.java    |   7 +-
 tika-pipes/tika-emitters/tika-emitter-solr/pom.xml |   5 +
 .../tika/pipes/emitter/solr/SolrEmitter.java       |  92 +++++++++------
 tika-pipes/tika-httpclient-commons/pom.xml         |   5 -
 .../apache/tika/pipes/PipeIntegrationTests.java    |   2 +-
 .../org/apache/tika/server/client/TikaClient.java  |   2 +-
 .../org/apache/tika/server/core/ServerStatus.java  |   2 +-
 .../tika/server/core/ServerStatusWatcher.java      |   2 +-
 .../org/apache/tika/server/core/TikaServerCli.java |   5 +
 .../apache/tika/server/core/TikaServerProcess.java |  80 ++++++++++++--
 .../tika/server/core/resource/AsyncEmitter.java    | 123 +++++++++++++++++++++
 .../tika/server/core/resource/AsyncRequest.java    |  23 ++++
 .../tika/server/core/resource/AsyncResource.java   | 123 +++++++++++++++++++++
 .../server/core/TikaServerIntegrationTest.java     |  10 +-
 18 files changed, 489 insertions(+), 69 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 1117537..a54b708 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -17,6 +17,10 @@
 package org.apache.tika.pipes.emitter;
 
 import org.apache.tika.config.Field;
+import org.apache.tika.metadata.Metadata;
+
+import java.io.IOException;
+import java.util.List;
 
 public abstract class AbstractEmitter implements Emitter {
 
@@ -31,4 +35,33 @@ public abstract class AbstractEmitter implements Emitter {
     public String getName() {
         return name;
     }
+
+    /**
+     * The default behavior is to call {@link #emit(String, List)} on each item.
+     * Some implementations, e.g. Solr/ES/vespa, can benefit from subclassing this and
+     * emitting a bunch of docs at once.
+     *
+     * @param emitData
+     * @throws IOException
+     * @throws TikaEmitterException
+     */
+    @Override
+    public void emit(List<EmitData> emitData) throws IOException, TikaEmitterException {
+        for (EmitData d : emitData) {
+            emit(d.getEmitKey().getKey(), d.getMetadataList());
+        }
+    }
+
+    public static long estimateSizeInBytes(String id, List<Metadata> metadataList) {
+        long sz = 36 + id.length() * 2;
+        for (Metadata m : metadataList) {
+            for (String n : m.names()) {
+                sz += 36 + n.length() * 2;
+                for (String v : m.getValues(n)) {
+                    sz += 36 + v.length() * 2;
+                }
+            }
+        }
+        return sz;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
new file mode 100644
index 0000000..0063c6a
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -0,0 +1,32 @@
+package org.apache.tika.pipes.emitter;
+
+import org.apache.tika.metadata.Metadata;
+
+import java.util.List;
+
+public class EmitData {
+
+    private final EmitKey emitKey;
+    private final List<Metadata> metadataList;
+
+    public EmitData(EmitKey emitKey, List<Metadata> metadataList) {
+        this.emitKey = emitKey;
+        this.metadataList = metadataList;
+    }
+
+    public EmitKey getEmitKey() {
+        return emitKey;
+    }
+
+    public List<Metadata> getMetadataList() {
+        return metadataList;
+    }
+
+    @Override
+    public String toString() {
+        return "EmitData{" +
+                "emitKey=" + emitKey +
+                ", metadataList=" + metadataList +
+                '}';
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index 47a8ee7..aa53dfd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -30,7 +30,7 @@ public class EmitKey {
         return emitterName;
     }
 
-    public String getEmitKey() {
+    public String getKey() {
         return emitKey;
     }
 
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index 4f6bdae..dbea669 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -27,14 +27,8 @@ public interface Emitter {
 
     void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException;
 
+    void emit(List<EmitData> emitData) throws IOException, TikaEmitterException;
+    //TODO -- add this later for xhtml?
     //void emit(String txt, Metadata metadata) throws IOException, TikaException;
 
-   /*
-    TODO we can add this later?
-    void addBatch(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException;
-
-    void executeBatch() throws IOException, TikaEmitterException;
-
-    */
-
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index 8c0ebda..5f83db7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -16,13 +16,13 @@
  */
 package org.apache.tika.pipes.emitter;
 
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 
 import java.io.IOException;
 import java.util.List;
 
 public class EmptyEmitter implements Emitter {
+
     @Override
     public String getName() {
         return "empty";
@@ -32,4 +32,9 @@ public class EmptyEmitter implements Emitter {
     public void emit(String emitKey, List<Metadata> metadataList) throws IOException, TikaEmitterException {
 
     }
+
+    @Override
+    public void emit(List<EmitData> emitData) throws IOException, TikaEmitterException {
+
+    }
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index 1edfff0..e36d1c8 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -42,6 +42,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
         </dependency>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 5830b14..4c15889 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,9 +16,8 @@
  */
 package org.apache.tika.pipes.emitter.solr;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.http.client.HttpClient;
 import org.apache.tika.client.HttpClientFactory;
 import org.apache.tika.client.HttpClientUtil;
@@ -28,7 +27,7 @@ import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
@@ -36,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -48,7 +48,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         PARENT_CHILD,
         //anything else?
     }
-    private static final Gson GSON = new Gson();
     private static final String ATTACHMENTS = "attachments";
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class);
@@ -61,14 +60,23 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     private HttpClientFactory httpClientFactory;
     private HttpClient httpClient;
 
+    public SolrEmitter() throws TikaConfigException {
+        httpClientFactory = new HttpClientFactory();
+    }
     @Override
     public void emit(String emitKey, List<Metadata> metadataList) throws IOException,
             TikaEmitterException {
+
         if (metadataList == null || metadataList.size() == 0) {
             LOG.warn("metadataList is null or empty");
             return;
         }
-        String json = jsonify(emitKey, metadataList);
+        StringWriter writer = new StringWriter();
+        JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer);
+        jsonGenerator.writeStartArray();
+        jsonify(jsonGenerator, emitKey, metadataList);
+        jsonGenerator.writeEndArray();
+        String json = writer.toString();
         LOG.debug("emitting json:"+json);
         try {
             HttpClientUtil.postJson(httpClient,
@@ -78,10 +86,35 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         }
     }
 
-    private String jsonify(String emitKey, List<Metadata> metadataList) {
+    @Override
+    public void emit(List<EmitData> batch) throws IOException,
+            TikaEmitterException {
+        if (batch == null || batch.size() == 0) {
+            LOG.warn("batch is null or empty");
+            return;
+        }
+        StringWriter writer = new StringWriter();
+        JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer);
+        jsonGenerator.writeStartArray();
+        for (EmitData d : batch) {
+            jsonify(jsonGenerator, d.getKey(), d.getMetadataList());
+        }
+        jsonGenerator.writeEndArray();
+        String json = writer.toString();
+        LOG.debug("emitting json:"+json);
+        try {
+            HttpClientUtil.postJson(httpClient,
+                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), json);
+        } catch (TikaClientException e) {
+            throw new TikaEmitterException("can't post", e);
+        }
+    }
+
+    private void jsonify(JsonGenerator jsonGenerator, String emitKey, List<Metadata> metadataList) throws IOException {
         metadataList.get(0).set(idField, emitKey);
-        if (attachmentStrategy == AttachmentStrategy.SKIP) {
-            return toJsonString(jsonify(metadataList.get(0)));
+        if (attachmentStrategy == AttachmentStrategy.SKIP ||
+            metadataList.size() == 1) {
+            jsonify(metadataList.get(0), jsonGenerator);
         } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
             //this only handles text for now, not xhtml
             StringBuilder sb = new StringBuilder();
@@ -93,53 +126,40 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             }
             Metadata parent = metadataList.get(0);
             parent.set(getContentField(), sb.toString());
-            return toJsonString(jsonify(parent));
+            jsonify(parent, jsonGenerator);
         } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
-            if (metadataList.size() == 1) {
-                JsonObject obj = jsonify(metadataList.get(0));
-                return toJsonString(obj);
-            }
-            JsonObject parent = jsonify(metadataList.get(0));
-            JsonArray children = new JsonArray();
+            jsonify(metadataList.get(0), jsonGenerator);
+            jsonGenerator.writeArrayFieldStart(ATTACHMENTS);
+
             for (int i = 1; i < metadataList.size(); i++) {
                 Metadata m = metadataList.get(i);
                 m.set(idField, UUID.randomUUID().toString());
-                children.add(jsonify(m));
+                jsonify(m, jsonGenerator);
             }
-            parent.add(ATTACHMENTS, children);
-            return toJsonString(parent);
+            jsonGenerator.writeEndArray();
         } else {
             throw new IllegalArgumentException("I don't yet support this attachment strategy: "
                     + attachmentStrategy);
         }
     }
 
-    private String toJsonString(JsonObject obj) {
-        //wrap the document into an array
-        //so that Solr correctly interprets this as
-        //upload docs vs a command.
-        JsonArray docs = new JsonArray();
-        docs.add(obj);
-        return GSON.toJson(docs);
-    }
 
-    private JsonObject jsonify(Metadata metadata) {
-        JsonObject obj = new JsonObject();
+    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator) throws IOException {
+        jsonGenerator.writeStartObject();
         for (String n : metadata.names()) {
+
             String[] vals = metadata.getValues(n);
             if (vals.length == 0) {
                 continue;
             } else if (vals.length == 1) {
-                obj.addProperty(n, vals[0]);
+                jsonGenerator.writeStringField(n, vals[0]);
             } else if (vals.length > 1) {
-                JsonArray valArr = new JsonArray();
-                for (int i = 0; i < vals.length; i++) {
-                    valArr.add(vals[i]);
-                }
-                obj.add(n, valArr);
+                jsonGenerator.writeArrayFieldStart(n);
+                jsonGenerator.writeArray(vals, 0, vals.length);
+                jsonGenerator.writeEndArray();
             }
         }
-        return obj;
+        jsonGenerator.writeEndObject();
     }
 
     /**
diff --git a/tika-pipes/tika-httpclient-commons/pom.xml b/tika-pipes/tika-httpclient-commons/pom.xml
index ef7797a..c33d3fb 100644
--- a/tika-pipes/tika-httpclient-commons/pom.xml
+++ b/tika-pipes/tika-httpclient-commons/pom.xml
@@ -42,11 +42,6 @@
             <artifactId>httpclient</artifactId>
             <version>${httpcomponents.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>${gson.version}</version>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 7df2bb4..3831f07 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -218,7 +218,7 @@ public class PipeIntegrationTests {
             userMetadata.set("project", "my-project");
 
             try (InputStream is = fetcher.fetch(t.getFetchKey().getKey(), t.getMetadata())) {
-                emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
+                emitter.emit(t.getEmitKey().getKey(), is, userMetadata);
             }
         }
     }
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index 6e0e26e..73ff3f4 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -70,7 +70,7 @@ public class TikaClient {
         root.add("fetcher", new JsonPrimitive(fetchEmit.getFetchKey().getFetcherName()));
         root.add("fetchKey", new JsonPrimitive(fetchEmit.getFetchKey().getKey()));
         root.add("emitter", new JsonPrimitive(fetchEmit.getEmitKey().getEmitterName()));
-        root.add("emitKey", new JsonPrimitive(fetchEmit.getEmitKey().getEmitKey()));
+        root.add("emitKey", new JsonPrimitive(fetchEmit.getEmitKey().getKey()));
         if (metadata.size() > 0) {
             JsonObject m = new JsonObject();
             for (String n : metadata.names()) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
index a15f7a5..92068f5 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatus.java
@@ -41,7 +41,7 @@ public class ServerStatus {
     public enum STATUS {
         INITIALIZING(0),
         OPERATING(1),
-        HIT_MAX(2),
+        HIT_MAX_FILES(2),
         TIMEOUT(3),
         ERROR(4),
         PARENT_REQUESTED_SHUTDOWN(5),
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
index a2ad313..7ac0ffd 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/ServerStatusWatcher.java
@@ -136,7 +136,7 @@ public class ServerStatusWatcher implements Runnable {
         }
         long filesProcessed = serverStatus.getFilesProcessed();
         if (filesProcessed >= maxFiles) {
-            serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX);
+            serverStatus.setStatus(ServerStatus.STATUS.HIT_MAX_FILES);
         }
     }
 
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index dbcbeb1..4ca9bb4 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -140,7 +140,10 @@ public class TikaServerCli {
             try {
                 mainLoop(line, newArgs);
             } catch (InterruptedException e) {
+                e.printStackTrace();
                 //swallow
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
     }
@@ -173,11 +176,13 @@ public class TikaServerCli {
                     WatchDogResult result = future.get();
                     LOG.debug("main loop future: ({}); about to restart", result);
                     if (maxRestarts < 0 || result.getNumRestarts() < maxRestarts) {
+                        System.err.println("starting up again");
                         executorCompletionService.submit(
                                 new TikaServerWatchDog(args, result.getPort(),
                                 result.getId(),
                                 result.getNumRestarts(), serverTimeoutConfig));
                     } else {
+                        System.err.println("finished!");
                         LOG.warn("id {} with port {} has exceeded maxRestarts {}. Shutting down and not restarting.",
                                 result.getId(), result.getPort(), maxRestarts);
                         finished++;
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 0a8fd91..94f2190 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.cxf.binding.BindingFactoryManager;
+import org.apache.cxf.endpoint.Server;
 import org.apache.cxf.jaxrs.JAXRSBindingFactory;
 import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
 import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
@@ -36,6 +37,8 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.digestutils.BouncyCastleDigester;
 import org.apache.tika.parser.digestutils.CommonsDigester;
+import org.apache.tika.server.core.resource.AsyncEmitter;
+import org.apache.tika.server.core.resource.AsyncResource;
 import org.apache.tika.server.core.resource.DetectorResource;
 import org.apache.tika.server.core.resource.EmitterResource;
 import org.apache.tika.server.core.resource.LanguageResource;
@@ -72,13 +75,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class TikaServerProcess {
 
 
     //used in fork mode -- restart after processing this many files
     private static final long DEFAULT_MAX_FILES = 100000;
-
+    private static String ENABLE_UNSECURE_FEATURES = "enableUnsecureFeatures";
 
     private static final int DEFAULT_DIGEST_MARK_LIMIT = 20 * 1024 * 1024;
     public static final Set<String> LOG_LEVELS = new HashSet<>(Arrays.asList("debug", "info"));
@@ -132,7 +140,7 @@ public class TikaServerProcess {
 
             CommandLineParser cliParser = new DefaultParser();
             CommandLine line = cliParser.parse(options, args);
-            runServer(line, options);
+            mainLoop(line, options);
         } catch (Exception e) {
             e.printStackTrace();
             LOG.error("Can't start: ", e);
@@ -140,10 +148,35 @@ public class TikaServerProcess {
         }
     }
 
+    private static void mainLoop(CommandLine commandLine, Options options) throws Exception {
+        AsyncResource asyncResource = null;
+        ArrayBlockingQueue asyncQueue = null;
+        int numAsyncThreads = 10;
+        if (commandLine.hasOption("unsecureFeatures")) {
+            asyncResource = new AsyncResource();
+            asyncQueue = asyncResource.getQueue(numAsyncThreads);
+        }
+
+        ServerDetails serverDetails = initServer(commandLine, asyncResource);
+        ExecutorService executorService = Executors.newFixedThreadPool(numAsyncThreads+1);
+        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
+
+        executorCompletionService.submit(new ServerThread(serverDetails));
+        if (asyncQueue != null) {
+            for (int i = 0; i < numAsyncThreads; i++) {
+                executorCompletionService.submit(new AsyncEmitter(asyncQueue));
+            }
+        }
+        while (true) {
+
+        }
+    }
+
     //This starts the server in this process.  This can
     //be either a direct call from -noFork or the process that is forked
     //in the 2.0 default mode.
-    private static void runServer(CommandLine line, Options options) throws Exception {
+    private static ServerDetails initServer(CommandLine line,
+                                     AsyncResource asyncResource) throws Exception {
 
         String host = null;
 
@@ -224,12 +257,12 @@ public class TikaServerProcess {
         }
 
         InputStreamFactory inputStreamFactory = null;
-        if (line.hasOption("enableUnsecureFeatures")) {
+        if (line.hasOption(ENABLE_UNSECURE_FEATURES)) {
             inputStreamFactory = new FetcherStreamFactory(tika.getFetcherManager());
         } else {
             inputStreamFactory = new DefaultInputStreamFactory();
         }
-        logFetchersAndEmitters(line.hasOption("enableUnsecureFeatures"), tika);
+        logFetchersAndEmitters(line.hasOption(ENABLE_UNSECURE_FEATURES), tika);
         String serverId = line.hasOption("i") ? line.getOptionValue("i") : UUID.randomUUID().toString();
         LOG.debug("SERVER ID:" + serverId);
         ServerStatus serverStatus;
@@ -272,8 +305,9 @@ public class TikaServerProcess {
         rCoreProviders.add(new SingletonResourceProvider(new TikaDetectors()));
         rCoreProviders.add(new SingletonResourceProvider(new TikaParsers()));
         rCoreProviders.add(new SingletonResourceProvider(new TikaVersion()));
-        if (line.hasOption("enableUnsecureFeatures")) {
+        if (line.hasOption(ENABLE_UNSECURE_FEATURES)) {
             rCoreProviders.add(new SingletonResourceProvider(new EmitterResource()));
+            rCoreProviders.add(new SingletonResourceProvider(asyncResource));
         }
         rCoreProviders.addAll(loadResourceServices());
         if (line.hasOption("status")) {
@@ -315,11 +349,11 @@ public class TikaServerProcess {
         JAXRSBindingFactory factory = new JAXRSBindingFactory();
         factory.setBus(sf.getBus());
         manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory);
-        sf.create();
-        LOG.info("Started Apache Tika server {} at {}",
-                serverId,
-                url);
-
+        ServerDetails details = new ServerDetails();
+        details.sf = sf;
+        details.url = url;
+        details.serverId = serverId;
+        return details;
     }
 
     private static void logFetchersAndEmitters(boolean enableUnsecureFeatures, TikaConfig tika) {
@@ -421,4 +455,28 @@ public class TikaServerProcess {
         return serverTimeouts;
     }
 
+    private static class ServerThread implements Callable<Integer> {
+        private final ServerDetails serverDetails;
+        public ServerThread(ServerDetails serverDetails) {
+            this.serverDetails = serverDetails;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+
+            Server server = serverDetails.sf.create();
+            System.err.println("started : "+serverDetails.serverId);
+                LOG.info("Started Apache Tika server {} at {}",
+                        serverDetails.serverId,
+                        serverDetails.url);
+            System.err.println("returning : "+serverDetails.serverId);
+                return 2;
+        }
+    }
+
+    private static class ServerDetails {
+        JAXRSServerFactoryBean sf;
+        String serverId;
+        String url;
+    }
 }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
new file mode 100644
index 0000000..f473699
--- /dev/null
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
@@ -0,0 +1,123 @@
+package org.apache.tika.server.core.resource;
+
+import org.apache.cxf.jaxrs.impl.UriInfoImpl;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedHashMap;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Worker thread that takes ASyncRequests off the queue and
+ * processes them.
+ */
+public class AsyncEmitter implements Callable<Integer> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
+
+
+    private long maxCacheSizeBytes = 10_000_000;
+
+    private final ArrayBlockingQueue<AsyncRequest> queue;
+
+    public AsyncEmitter(ArrayBlockingQueue<AsyncRequest> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+        while (true) {
+            AsyncRequest request = queue.poll(1, TimeUnit.MINUTES);
+            if (request != null) {
+                processTuple(request);
+            } else {
+                LOG.trace("Nothing on the async queue");
+            }
+        }
+    }
+
+    private void processTuple(AsyncRequest request) {
+        LOG.debug("Starting request id ({}) of size ({})",
+                request.getId(), request.getTuples().size());
+        List<EmitData> cachedEmitData = new ArrayList<>();
+        Emitter emitter = TikaResource.getConfig()
+                .getEmitterManager()
+                .getEmitter(
+                    request.getTuples().get(0).getEmitKey().getEmitterName());
+        long currSize = 0;
+        for (FetchEmitTuple t : request.getTuples()) {
+            EmitData emitData = processTuple(t);
+            long estimated = AbstractEmitter.estimateSizeInBytes(
+                    emitData.getEmitKey().getKey(), emitData.getMetadataList());
+            if (estimated + currSize > maxCacheSizeBytes) {
+                tryToEmit(emitter, cachedEmitData, request);
+                cachedEmitData.clear();
+            }
+            cachedEmitData.add(emitData);
+            currSize += estimated;
+        }
+        tryToEmit(emitter, cachedEmitData, request);
+        cachedEmitData.clear();
+        LOG.debug("Completed request id ({})",
+                request.getId(), request.getTuples().size());
+    }
+
+    private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData,
+                           AsyncRequest request) {
+        try {
+            emitter.emit(cachedEmitData);
+        } catch (IOException|TikaEmitterException e) {
+            LOG.warn("async id ({}) emitter class ({}): {}",
+                    request.getId(), emitter.getClass(),
+                    ExceptionUtils.getStackTrace(e));
+        }
+    }
+
+    private EmitData processTuple(FetchEmitTuple t) {
+        Metadata userMetadata = t.getMetadata();
+        Metadata metadata = new Metadata();
+        String fetcherName = t.getFetchKey().getFetcherName();
+        String fetchKey = t.getFetchKey().getKey();
+        List<Metadata> metadataList = null;
+        try (InputStream stream =
+                     TikaResource.getConfig().getFetcherManager()
+                             .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
+
+            metadataList = RecursiveMetadataResource.parseMetadata(
+                    stream,
+                    metadata,
+                    new MultivaluedHashMap<>(),
+                    new UriInfoImpl(new MessageImpl()), "text");
+        } catch (SecurityException e) {
+            throw e;
+        } catch (Exception e) {
+            LOG.warn(t.toString(), e);
+        }
+        injectUserMetadata(userMetadata, metadataList);
+        return new EmitData(t.getEmitKey(), metadataList);
+    }
+
+    private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
+        for (String n : userMetadata.names()) {
+            //overwrite whatever was there
+            metadataList.get(0).set(n, null);
+            for (String val : userMetadata.getValues(n)) {
+                metadataList.get(0).add(n, val);
+            }
+        }
+    }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
new file mode 100644
index 0000000..f46105c
--- /dev/null
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
@@ -0,0 +1,23 @@
+package org.apache.tika.server.core.resource;
+
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+
+import java.util.List;
+
+public class AsyncRequest {
+    private final String id;
+    private final List<FetchEmitTuple> tuples;
+
+    public AsyncRequest(String id, List<FetchEmitTuple> tuples) {
+        this.id = id;
+        this.tuples = tuples;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public List<FetchEmitTuple> getTuples() {
+        return tuples;
+    }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
new file mode 100644
index 0000000..240933f
--- /dev/null
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.server.core.resource;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.FetcherManager;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+import java.io.InputStream;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+@Path("/async")
+public class AsyncResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncResource.class);
+
+    private static final int DEFAULT_QUEUE_SIZE = 100;
+    private int queueSize = DEFAULT_QUEUE_SIZE;
+    private ArrayBlockingQueue<AsyncRequest> queue;
+
+    public ArrayBlockingQueue<AsyncRequest> getQueue(int numThreads) {
+        this.queue = new ArrayBlockingQueue<>(queueSize+numThreads);
+        return queue;
+    }
+    /**
+     * The client posts a json request.  At a minimum, this must be a
+     * json object that contains an emitter and a fetcherString key with
+     * the key to fetch the inputStream. Optionally, it may contain a metadata
+     * object that will be used to populate the metadata key for pass
+     * through of metadata from the client.
+     * <p>
+     * The extracted text content is stored with the key
+     * {@link TikaCoreProperties#TIKA_CONTENT}
+     * <p>
+     * Must specify a fetcherString and an emitter in the posted json.
+     *
+     * @param info uri info
+     * @return InputStream that can be deserialized as a list of {@link Metadata} objects
+     * @throws Exception
+     */
+    @POST
+    @Produces("application/json")
+    public Map<String, String> post(InputStream is,
+                                         @Context HttpHeaders httpHeaders,
+                                         @Context UriInfo info
+    ) throws Exception {
+
+        AsyncRequest request = deserializeASyncRequest(is);
+        FetcherManager fetcherManager = TikaConfig.getDefaultConfig().getFetcherManager();
+        EmitterManager emitterManager = TikaConfig.getDefaultConfig().getEmitterManager();
+        for (FetchEmitTuple t : request.getTuples()) {
+            if (! fetcherManager.getSupported().contains(t.getFetchKey().getFetcherName())) {
+                return badFetcher(t.getFetchKey());
+            }
+            if (! emitterManager.getSupported().contains(t.getEmitKey().getEmitterName())) {
+                return badEmitter(t.getEmitKey());
+            }
+        }
+
+        //parameterize
+        boolean offered = queue.offer(request, 60, TimeUnit.SECONDS);
+        if (! offered) {
+            return throttleResponse();
+        }
+        return ok(request.getId(), request.getTuples().size());
+    }
+
+    private Map<String, String> ok(String id, int size) {
+        return null;
+    }
+
+    private Map<String, String> throttleResponse() {
+        Map<String, String> map = new HashMap<>();
+        return map;
+    }
+
+    private Map<String, String> badEmitter(EmitKey emitKey) {
+        return null;
+    }
+
+    private Map<String, String> badFetcher(FetchKey fetchKey) {
+        return null;
+    }
+
+    private AsyncRequest deserializeASyncRequest(InputStream is) {
+        return new AsyncRequest("", Collections.EMPTY_LIST);
+    }
+
+}
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
index 8d25d83..16699ed 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
@@ -64,7 +64,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
             public void run() {
                 TikaServerCli.main(
                         new String[]{
-                                "-maxFiles", "2000",
+                                "-maxFiles", "100",
                                 "-p", INTEGRATION_TEST_PORT,
                                 "-tmpFilePrefix", "basic-"
                         });
@@ -72,9 +72,12 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
         };
         serverThread.start();
         try {
-            testBaseline();
+            for (int i = 0; i < 500; i++) {
+                System.out.println("base "+i);
+                testBaseline();
+            }
         } finally {
-            serverThread.interrupt();
+            //serverThread.interrupt();
         }
     }
 
@@ -563,6 +566,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
                 .put(ClassLoader
                         .getSystemResourceAsStream(TEST_HELLO_WORLD));
         Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+        System.out.println(response.getStatus());
         List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
         assertEquals(1, metadataList.size());
         assertEquals("Nikolai Lobachevsky", metadataList.get(0).get("author"));