You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/02/02 02:57:34 UTC

[tika] branch TIKA-3288 updated: TIKA-3288 -- WIP do not merge.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-3288 by this push:
     new aa9ecd0  TIKA-3288 -- WIP do not merge.
aa9ecd0 is described below

commit aa9ecd049e4779ed9fbce995a953f50b969ba5ab
Author: tballison <ta...@apache.org>
AuthorDate: Mon Feb 1 21:57:21 2021 -0500

    TIKA-3288 -- WIP do not merge.
---
 .../tika/pipes/emitter/solr/SolrEmitter.java       |  78 +++++++++++-----
 .../apache/tika/pipes/emitter/solr/TestBasic.java  |  48 ++++++++--
 .../org/apache/tika/client/HttpClientUtil.java     |  28 ++++++
 .../org/apache/tika/server/core/TikaServerCli.java |   2 -
 .../apache/tika/server/core/TikaServerProcess.java |  16 ++--
 .../tika/server/core/resource/EmitterResource.java | 102 ++++++++++++++-------
 .../core/resource/RecursiveMetadataResource.java   |  19 ++--
 .../server/core/TikaServerIntegrationTest.java     |  73 +++++++++------
 8 files changed, 250 insertions(+), 116 deletions(-)

diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 4c15889..09ec607 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -34,11 +34,18 @@ import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.io.StringWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.zip.GZIPOutputStream;
 
 public class SolrEmitter extends AbstractEmitter implements Initializable {
 
@@ -51,6 +58,8 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     private static final String ATTACHMENTS = "attachments";
     private static final String UPDATE_PATH = "/update";
     private static final Logger LOG = LoggerFactory.getLogger(SolrEmitter.class);
+    //one day this will be allowed?
+    private final boolean gzipJson = false;
 
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
     private String url;
@@ -71,16 +80,24 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             LOG.warn("metadataList is null or empty");
             return;
         }
-        StringWriter writer = new StringWriter();
-        JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer);
-        jsonGenerator.writeStartArray();
-        jsonify(jsonGenerator, emitKey, metadataList);
-        jsonGenerator.writeEndArray();
-        String json = writer.toString();
-        LOG.debug("emitting json:"+json);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        Writer writer = gzipJson ?
+                new BufferedWriter(
+                        new OutputStreamWriter(
+                                new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+                new BufferedWriter(
+                        new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+        try (
+        JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartArray();
+            jsonify(jsonGenerator, emitKey, metadataList);
+            jsonGenerator.writeEndArray();
+        }
+        LOG.debug("emitting json ({})",
+                new String(bos.toByteArray(), StandardCharsets.UTF_8));
         try {
             HttpClientUtil.postJson(httpClient,
-                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), json);
+                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), bos.toByteArray(), gzipJson);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("can't post", e);
         }
@@ -93,18 +110,26 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             LOG.warn("batch is null or empty");
             return;
         }
-        StringWriter writer = new StringWriter();
-        JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer);
-        jsonGenerator.writeStartArray();
-        for (EmitData d : batch) {
-            jsonify(jsonGenerator, d.getKey(), d.getMetadataList());
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        Writer writer = gzipJson ?
+                new BufferedWriter(
+                        new OutputStreamWriter(
+                                new GZIPOutputStream(bos), StandardCharsets.UTF_8)) :
+            new BufferedWriter(
+                new OutputStreamWriter(bos, StandardCharsets.UTF_8));
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartArray();
+            for (EmitData d : batch) {
+                jsonify(jsonGenerator, d.getEmitKey().getKey(), d.getMetadataList());
+            }
+            jsonGenerator.writeEndArray();
         }
-        jsonGenerator.writeEndArray();
-        String json = writer.toString();
-        LOG.debug("emitting json:"+json);
+        LOG.debug("emitting json ({})",
+                new String(bos.toByteArray(), StandardCharsets.UTF_8));
         try {
             HttpClientUtil.postJson(httpClient,
-                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(), json);
+                    url+UPDATE_PATH+"?commitWithin="+getCommitWithin(),
+                    bos.toByteArray(), gzipJson);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("can't post", e);
         }
@@ -128,7 +153,7 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
             parent.set(getContentField(), sb.toString());
             jsonify(parent, jsonGenerator);
         } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
-            jsonify(metadataList.get(0), jsonGenerator);
+            jsonify(metadataList.get(0), jsonGenerator, false);
             jsonGenerator.writeArrayFieldStart(ATTACHMENTS);
 
             for (int i = 1; i < metadataList.size(); i++) {
@@ -137,17 +162,16 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
                 jsonify(m, jsonGenerator);
             }
             jsonGenerator.writeEndArray();
+            jsonGenerator.writeEndObject();
         } else {
             throw new IllegalArgumentException("I don't yet support this attachment strategy: "
                     + attachmentStrategy);
         }
     }
 
-
-    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator) throws IOException {
+    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, boolean writeEndObject) throws IOException {
         jsonGenerator.writeStartObject();
         for (String n : metadata.names()) {
-
             String[] vals = metadata.getValues(n);
             if (vals.length == 0) {
                 continue;
@@ -155,11 +179,19 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
                 jsonGenerator.writeStringField(n, vals[0]);
             } else if (vals.length > 1) {
                 jsonGenerator.writeArrayFieldStart(n);
-                jsonGenerator.writeArray(vals, 0, vals.length);
+                for (String val : vals) {
+                    jsonGenerator.writeString(val);
+                }
                 jsonGenerator.writeEndArray();
             }
         }
-        jsonGenerator.writeEndObject();
+        if (writeEndObject) {
+            jsonGenerator.writeEndObject();
+        }
+    }
+
+    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator) throws IOException {
+        jsonify(metadata, jsonGenerator, true);
     }
 
     /**
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
index 14a091a..069e7a2 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
@@ -18,6 +18,9 @@ package org.apache.tika.pipes.emitter.solr;
 
 
 import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
@@ -36,23 +39,50 @@ public class TestBasic {
         TikaConfig tikaConfig = new TikaConfig(
                 TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
         Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr1");
+        List<Metadata> metadataList = getParentChild(tikaConfig,
+                "id1", 2);
+
+        emitter.emit("1", metadataList);
+    }
+
+    @Test
+    public void testBatch() throws Exception {
+        TikaConfig tikaConfig = new TikaConfig(
+                TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
+        Emitter emitter = tikaConfig.getEmitterManager().getEmitter("solr2");
+        List<EmitData> emitData = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            List<Metadata> metadataList = getParentChild(tikaConfig,
+                    "batch_"+i, 4);
+            emitData.add(new EmitData(
+                    new EmitKey(emitter.getName(),  "batch_"+i),
+                    metadataList));
+        }
+        emitter.emit(emitData);
+    }
+
+    private List<Metadata> getParentChild(TikaConfig tikaConfig,
+                                          String id, int numChildren) throws TikaException {
         List<Metadata> metadataList = new ArrayList<>();
+        MetadataFilter filter = tikaConfig.getMetadataFilter();
+
         Metadata m1 = new Metadata();
+        m1.set("id", id);
         m1.set(Metadata.CONTENT_LENGTH, "314159");
         m1.set(TikaCoreProperties.TIKA_CONTENT, "the quick brown");
         m1.set(TikaCoreProperties.TITLE, "this is the first title");
         m1.add(TikaCoreProperties.CREATOR, "firstAuthor");
         m1.add(TikaCoreProperties.CREATOR, "secondAuthor");
-
-        Metadata m2 = new Metadata();
-        m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH, "/path_to_this.txt");
-        m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy");
-        MetadataFilter filter = tikaConfig.getMetadataFilter();
         filter.filter(m1);
-        filter.filter(m2);
         metadataList.add(m1);
-        metadataList.add(m2);
-
-        emitter.emit("1", metadataList);
+        for (int i = 1; i < numChildren; i++ ) {
+            Metadata m2 = new Metadata();
+            m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH, "/path_to_this.txt");
+            m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy " + i);
+            filter.filter(m2);
+            metadataList.add(m2);
+        }
+        return metadataList;
     }
+
 }
diff --git a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
index 780179c..11ebb3c 100644
--- a/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
+++ b/tika-pipes/tika-httpclient-commons/src/main/java/org/apache/tika/client/HttpClientUtil.java
@@ -20,6 +20,7 @@ import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.util.EntityUtils;
 
 import java.io.IOException;
@@ -30,7 +31,9 @@ public class HttpClientUtil {
     public static boolean postJson(HttpClient client, String url, String json) throws IOException,
             TikaClientException {
         HttpPost post = new HttpPost(url);
+        post.setHeader("Content-Encoding", "gzip");
         ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
+
         post.setEntity(entity);
         post.setHeader("Content-Type", "application/json");
         HttpResponse response = client.execute(post);
@@ -48,4 +51,29 @@ public class HttpClientUtil {
         return true;
     }
 
+    public static boolean postJson(HttpClient client, String url,
+                                   byte[] bytes, boolean gzipped) throws IOException,
+            TikaClientException {
+        HttpPost post = new HttpPost(url);
+        if (gzipped) {
+            post.setHeader("Content-Encoding", "gzip");
+        }
+        ByteArrayEntity entity = new ByteArrayEntity(bytes);
+
+        post.setEntity(entity);
+        post.setHeader("Content-Type", "application/json");
+        HttpResponse response = client.execute(post);
+
+
+        if (response.getStatusLine().getStatusCode() != 200) {
+            String msg = EntityUtils.toString(response.getEntity());
+            throw new TikaClientException("Bad status: " +
+                    response.getStatusLine().getStatusCode() + " : "+
+                    msg);
+        } else {
+            String msg = EntityUtils.toString(response.getEntity());
+            System.out.println("httputil: " + msg);
+        }
+        return true;
+    }
 }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index 4ca9bb4..14a2ff4 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -142,8 +142,6 @@ public class TikaServerCli {
             } catch (InterruptedException e) {
                 e.printStackTrace();
                 //swallow
-            } catch (Exception e) {
-                e.printStackTrace();
             }
         }
     }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 94f2190..1bacbc5 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -80,6 +80,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class TikaServerProcess {
 
@@ -137,7 +139,6 @@ public class TikaServerProcess {
         LOG.info("Starting {} server", new Tika());
         try {
             Options options = getOptions();
-
             CommandLineParser cliParser = new DefaultParser();
             CommandLine line = cliParser.parse(options, args);
             mainLoop(line, options);
@@ -152,7 +153,7 @@ public class TikaServerProcess {
         AsyncResource asyncResource = null;
         ArrayBlockingQueue asyncQueue = null;
         int numAsyncThreads = 10;
-        if (commandLine.hasOption("unsecureFeatures")) {
+        if (commandLine.hasOption(ENABLE_UNSECURE_FEATURES)) {
             asyncResource = new AsyncResource();
             asyncQueue = asyncResource.getQueue(numAsyncThreads);
         }
@@ -168,7 +169,10 @@ public class TikaServerProcess {
             }
         }
         while (true) {
-
+            Future<Integer> future = executorCompletionService.poll(1, TimeUnit.MINUTES);
+            if (future != null) {
+                System.out.println("future val: " + future.get());
+            }
         }
     }
 
@@ -465,12 +469,10 @@ public class TikaServerProcess {
         public Integer call() throws Exception {
 
             Server server = serverDetails.sf.create();
-            System.err.println("started : "+serverDetails.serverId);
-                LOG.info("Started Apache Tika server {} at {}",
+            LOG.info("Started Apache Tika server {} at {}",
                         serverDetails.serverId,
                         serverDetails.url);
-            System.err.println("returning : "+serverDetails.serverId);
-                return 2;
+            return 2;
         }
     }
 
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 9689ba8..3e5263d 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -17,15 +17,18 @@
 
 package org.apache.tika.server.core.resource;
 
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -95,7 +99,7 @@ public class EmitterResource {
                             httpHeaders.getRequestHeaders(), info, "text");
         }
         emitKey = StringUtils.isBlank(emitKey) ? fetchKey : emitKey;
-        return emit(emitterName, emitKey, metadataList);
+        return emit(new EmitKey(emitterName, emitKey), metadataList);
     }
 
     /**
@@ -129,7 +133,7 @@ public class EmitterResource {
                 RecursiveMetadataResource.parseMetadata(is,
                         metadata,
                         httpHeaders.getRequestHeaders(), info, "text");
-        return emit(emitterName, emitKey, metadataList);
+        return emit(new EmitKey(emitterName, emitKey), metadataList);
     }
 
     /**
@@ -154,52 +158,84 @@ public class EmitterResource {
                                          @Context HttpHeaders httpHeaders,
                                          @Context UriInfo info
     ) throws Exception {
-        JsonObject root = null;
-        try (Reader reader = new InputStreamReader(is, StandardCharsets.UTF_8)) {
-            root = JsonParser.parseReader(reader).getAsJsonObject();
-        }
-        String fetcherName = root.get("fetcher").getAsString();
-        String fetchKey = root.get("fetchKey").getAsString();
-        String emitKey = (root.has("emitKey")) ?
-                root.get("emitKey").getAsString() : fetchKey;
-        String emitterName = root.get("emitter").getAsString();
+
+        JsonFactory jfactory = new JsonFactory();
+        JsonParser jParser = jfactory.createParser(is);
+        FetchEmitTuple t = deserializeTuple(jParser);
         Metadata metadata = new Metadata();
 
 
         List<Metadata> metadataList = null;
         try (InputStream stream =
                      TikaResource.getConfig().getFetcherManager()
-                             .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
+                             .getFetcher(t.getFetchKey().getFetcherName())
+                             .fetch(t.getFetchKey().getKey(), metadata)) {
 
             metadataList = RecursiveMetadataResource.parseMetadata(
                     stream,
                     metadata,
                     httpHeaders.getRequestHeaders(), info, "text");
         } catch (Error error) {
-            return returnError(emitterName, error);
+            return returnError(t.getEmitKey().getEmitterName(), error);
         }
 
-        injectUserMetadata(metadataList.get(0), root.getAsJsonObject());
+        injectUserMetadata(t.getMetadata(), metadataList.get(0));
 
         for (String n : metadataList.get(0).names()) {
             LOG.debug("post parse/pre emit metadata {}: {}",
                     n, metadataList.get(0).get(n));
         }
-        return emit(emitterName, emitKey, metadataList);
+        return emit(t.getEmitKey(), metadataList);
+    }
+
+    private FetchEmitTuple deserializeTuple(JsonParser jParser) throws IOException {
+        String fetcherName = null;
+        String fetchKey = null;
+        String emitterName = null;
+        String emitKey = null;
+
+        Metadata metadata = null;
+        while (jParser.nextToken() != JsonToken.END_OBJECT) {
+            String currentName = jParser.getCurrentName();
+            if ("fetcherName".equals(currentName)) {
+                fetcherName = currentName;
+            } else if ("fetchKey".equals(currentName)) {
+                fetchKey = currentName;
+            } else if ("emitterName".equals(currentName)) {
+                emitterName = currentName;
+            } else if ("emitKey".equals(currentName)) {
+                emitKey = currentName;
+            } else if ("metadata".equals(currentName)) {
+                metadata = deserializeMetadata(jParser);
+            }
+        }
+        return new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+                new EmitKey(emitterName, emitKey), metadata);
     }
 
-    private void injectUserMetadata(Metadata metadata, JsonObject root) {
-        if (root.has("metadata")) {
-            JsonObject meta = root.getAsJsonObject("metadata");
-            for (String k : meta.keySet()) {
-                JsonElement val = meta.get(k);
-                if (val.isJsonArray()) {
-                    for (JsonElement v : val.getAsJsonArray()) {
-                        metadata.add(k, v.getAsString());
-                    }
-                } else {
-                    metadata.set(k, val.getAsString());
+    private Metadata deserializeMetadata(JsonParser jParser) throws IOException {
+        Metadata metadata = new Metadata();
+
+        while (jParser.nextToken() != JsonToken.END_OBJECT) {
+            String key = jParser.getCurrentName();
+            JsonToken token = jParser.nextToken();
+            if (jParser.isExpectedStartArrayToken()) {
+                List<String> vals = new ArrayList<>();
+                while (jParser.nextToken() != JsonToken.END_ARRAY) {
+                    metadata.add(key, jParser.getText());
                 }
+            } else {
+                metadata.set(key, token.asString());
+            }
+        }
+        return metadata;
+
+    }
+    private void injectUserMetadata(Metadata userMetadata, Metadata metadata) {
+        for (String n : userMetadata.names()) {
+            metadata.set(n, null);
+            for (String v : userMetadata.getValues(n)) {
+                metadata.add(n, v);
             }
         }
     }
@@ -213,12 +249,12 @@ public class EmitterResource {
         return statusMap;
     }
 
-    private Map<String, String> emit(String emitterName, String emitKey, List<Metadata> metadataList) throws TikaException {
-        Emitter emitter = TikaResource.getConfig().getEmitterManager().getEmitter(emitterName);
+    private Map<String, String> emit(EmitKey emitKey, List<Metadata> metadataList) throws TikaException {
+        Emitter emitter = TikaResource.getConfig().getEmitterManager().getEmitter(emitKey.getEmitterName());
         String status = "ok";
         String exceptionMsg = "";
         try {
-            emitter.emit(emitKey, metadataList);
+            emitter.emit(emitKey.getKey(), metadataList);
         } catch (IOException | TikaEmitterException e) {
             LOG.warn("problem with emitting", e);
             status = "emitter_exception";
@@ -226,7 +262,7 @@ public class EmitterResource {
         }
         Map<String, String> statusMap = new HashMap<>();
         statusMap.put("status", status);
-        statusMap.put("emitter", emitterName);
+        statusMap.put("emitter", emitKey.getEmitterName());
         if (exceptionMsg.length() > 0) {
             statusMap.put("emitter_exception", exceptionMsg);
         }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index 298d848..505a5e2 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -23,6 +23,7 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedMap;
@@ -62,7 +63,7 @@ public class RecursiveMetadataResource {
      * just the immediate children.
      * <p>
      * The extracted text content is stored with the key
-     * {@link RecursiveParserWrapper#TIKA_CONTENT}.
+     * {@link org.apache.tika.metadata.TikaCoreProperties#TIKA_CONTENT}.
      * <p>
      * Specify the handler for the content (xml, html, text, ignore)
      * in the path:<br/>
@@ -98,7 +99,7 @@ public class RecursiveMetadataResource {
      * just the immediate children.
      * <p>
      * The extracted text content is stored with the key
-     * {@link RecursiveParserWrapper#TIKA_CONTENT}.
+     * {@link org.apache.tika.metadata.TikaCoreProperties#TIKA_CONTENT}.
      * <p>
      * Specify the handler for the content (xml, html, text, ignore)
      * in the path:<br/>
@@ -161,19 +162,13 @@ public class RecursiveMetadataResource {
                 TikaResource.getConfig().getMetadataFilter());
 		try {
             TikaResource.parse(wrapper, LOG, info.getPath(), is, handler, metadata, context);
-        } catch (SecurityException e) {
+        } catch (SecurityException|WebApplicationException e) {
 		    throw e;
         } catch (Exception e) {
-		    //swallow it and report it via the metadata list
+		    //we shouldn't get here?
+		    e.printStackTrace();
         }
-		/*
-		    We used to have this non-functional bit of code...refactor to add it back and make it work?
-						new LanguageHandler() {
-					public void endDocument() {
-						metadata.set("language", getLanguage().getLanguage());
-					}
-				},
-		 */
+
 		return handler.getMetadataList();
 	}
 
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
index 16699ed..1e3d4cc 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.tika.server.core;
 
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import org.apache.commons.io.IOUtils;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.tika.TikaTest;
@@ -32,10 +33,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.ProcessingException;
 import javax.ws.rs.core.Response;
+import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.net.ConnectException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -55,7 +60,6 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
     private static final Logger LOG = LoggerFactory.getLogger(TikaServerIntegrationTest.class);
 
 
-
     @Test
     public void testBasic() throws Exception {
 
@@ -72,12 +76,9 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
         };
         serverThread.start();
         try {
-            for (int i = 0; i < 500; i++) {
-                System.out.println("base "+i);
-                testBaseline();
-            }
+            testBaseline();
         } finally {
-            //serverThread.interrupt();
+            serverThread.interrupt();
         }
     }
 
@@ -206,9 +207,9 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
 
     private String getServerId() throws Exception {
         Response response = WebClient
-                    .create(endPoint + STATUS_PATH)
-                    .accept("application/json")
-                    .get();
+                .create(endPoint + STATUS_PATH)
+                .accept("application/json")
+                .get();
         String jsonString =
                 CXFTestBase.getStringFromInputStream((InputStream) response.getEntity());
         JsonObject root = JsonParser.parseString(jsonString).getAsJsonObject();
@@ -379,10 +380,10 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
             awaitServerStartup();
 
             Response response = WebClient
-                .create(endPoint + META_PATH)
-                .accept("application/json")
-                .put(ClassLoader
-                        .getSystemResourceAsStream(TEST_STDOUT_STDERR));
+                    .create(endPoint + META_PATH)
+                    .accept("application/json")
+                    .put(ClassLoader
+                            .getSystemResourceAsStream(TEST_STDOUT_STDERR));
             Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
             List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
             assertEquals(1, metadataList.size());
@@ -439,7 +440,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
                                 "-p", INTEGRATION_TEST_PORT,
                                 "-taskTimeoutMillis", "10000", "-taskPulseMillis", "500",
                                 "-pingPulseMillis", "100", "-maxRestarts", "0",
-                                "-JDlog4j.configuration=file:"+ LOG_FILE.toAbsolutePath(),
+                                "-JDlog4j.configuration=file:" + LOG_FILE.toAbsolutePath(),
                                 "-tmpFilePrefix", "tika-server-stderrlogging"
                         });
             }
@@ -472,14 +473,14 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
     private void awaitServerStartup() throws Exception {
         Instant started = Instant.now();
         long elapsed = Duration.between(started, Instant.now()).toMillis();
-        WebClient client = WebClient.create(endPoint+"/tika").accept("text/plain");
+        WebClient client = WebClient.create(endPoint + "/tika").accept("text/plain");
         while (elapsed < MAX_WAIT_MS) {
             try {
                 Response response = client.get();
                 if (response.getStatus() == 200) {
                     elapsed = Duration.between(started, Instant.now()).toMillis();
                     LOG.info("client observes server successfully started after " +
-                            elapsed+ " ms");
+                            elapsed + " ms");
                     return;
                 }
                 LOG.debug("tika test client failed to connect to server with status: {}", response.getStatus());
@@ -516,7 +517,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
         awaitServerStartup();
         Random r = new Random();
         for (int i = 0; i < 100; i++) {
-            System.out.println("FILE # "+i);
+            System.out.println("FILE # " + i);
             boolean ex = false;
             Response response = null;
             String file = TEST_HELLO_WORLD;
@@ -528,7 +529,7 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
                 } else if (r.nextFloat() < 0.02) {
                     file = TEST_HEAVY_HANG;
                 }
-                System.out.println("about to process: "+file);
+                System.out.println("about to process: " + file);
                 response = WebClient
                         .create(endPoint + META_PATH)
                         .accept("application/json")
@@ -559,17 +560,29 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
     }
 
     private void testBaseline() throws Exception {
-        awaitServerStartup();
-        Response response = WebClient
-                .create(endPoint + META_PATH)
-                .accept("application/json")
-                .put(ClassLoader
-                        .getSystemResourceAsStream(TEST_HELLO_WORLD));
-        Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
-        System.out.println(response.getStatus());
-        List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
-        assertEquals(1, metadataList.size());
-        assertEquals("Nikolai Lobachevsky", metadataList.get(0).get("author"));
-        assertContains("hello world", metadataList.get(0).get("X-TIKA:content"));
+        int maxTries = 3;
+        int tries = 0;
+        while (++tries < maxTries) {
+            awaitServerStartup();
+            Response response = null;
+
+            try {
+                response = WebClient
+                        .create(endPoint + META_PATH)
+                        .accept("application/json")
+                        .put(ClassLoader
+                                .getSystemResourceAsStream(TEST_HELLO_WORLD));
+            } catch (ProcessingException e) {
+                continue;
+            }
+            if (response.getStatus() == 503) {
+                continue;
+            }
+            Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+            List<Metadata> metadataList = JsonMetadataList.fromJson(reader);
+            assertEquals(1, metadataList.size());
+            assertEquals("Nikolai Lobachevsky", metadataList.get(0).get("author"));
+            assertContains("hello world", metadataList.get(0).get("X-TIKA:content"));
+        }
     }
 }