You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/02/02 22:44:47 UTC
[tika] branch TIKA-3288 updated: TIKA-3288 -- tests pass. More work
remains with unit tests and documentation, but I think we're good.
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3288
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-3288 by this push:
new 24a37e8 TIKA-3288 -- tests pass. More work remains with unit tests and documentation, but I think we're good.
24a37e8 is described below
commit 24a37e8eb4fcd8ed377f90c08ea91f408e6fdfe9
Author: tballison <ta...@apache.org>
AuthorDate: Tue Feb 2 17:44:33 2021 -0500
TIKA-3288 -- tests pass. More work remains with unit tests and documentation, but I think we're good.
---
tika-serialization/pom.xml | 1 +
.../tika/metadata/serialization/JsonEmitData.java | 45 ++++++
.../metadata/serialization/JsonFetchEmitTuple.java | 45 ++++--
.../serialization/JsonFetchEmitTupleList.java | 68 +++++++++
.../tika/metadata/serialization/JsonMetadata.java | 2 +-
.../metadata/serialization/JsonMetadataList.java | 2 +-
.../serialization/JsonFetchEmitTupleListTest.java | 62 ++++++++
.../serialization/JsonMetadataListTest.java | 4 +-
tika-server/tika-server-client/pom.xml | 10 +-
.../org/apache/tika/server/client/TikaClient.java | 55 ++-----
.../apache/tika/server/client/TikaClientCLI.java | 39 ++++-
.../apache/tika/server/client/TikaHttpClient.java | 38 +++--
.../apache/tika/server/core/TikaServerProcess.java | 50 +++---
.../tika/server/core/resource/AsyncEmitter.java | 170 ++++++++++++---------
.../tika/server/core/resource/AsyncParser.java | 108 +++++++++++++
.../tika/server/core/resource/AsyncRequest.java | 24 ++-
.../tika/server/core/resource/AsyncResource.java | 94 +++++++++---
.../server/core/resource/MetadataResource.java | 2 +-
.../core/resource/RecursiveMetadataResource.java | 4 +-
.../tika/server/core/resource/TikaResource.java | 13 +-
.../server/core/resource/UnpackerResource.java | 2 +-
...st.java => TikaServerAsyncIntegrationTest.java} | 158 +++++++------------
.../core/TikaServerEmitterIntegrationTest.java | 90 +++++------
23 files changed, 711 insertions(+), 375 deletions(-)
diff --git a/tika-serialization/pom.xml b/tika-serialization/pom.xml
index a8a7e09..64e3aa2 100644
--- a/tika-serialization/pom.xml
+++ b/tika-serialization/pom.xml
@@ -48,6 +48,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>tika-core</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
new file mode 100644
index 0000000..21b6377
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonEmitData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+
+import java.io.IOException;
+import java.io.Writer;
+
+public class JsonEmitData {
+
+ public static void toJson(EmitData emitData, Writer writer) throws IOException {
+ try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+ jsonGenerator.writeStartObject();
+ EmitKey key = emitData.getEmitKey();
+ jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITTER, key.getEmitterName());
+ jsonGenerator.writeStringField(JsonFetchEmitTuple.EMITKEY, key.getKey());
+ jsonGenerator.writeFieldName("data");
+ jsonGenerator.writeStartArray();
+ for (Metadata m : emitData.getMetadataList()) {
+ JsonMetadata.writeMetadataObject(m, jsonGenerator, false);
+ }
+ jsonGenerator.writeEndArray();
+ jsonGenerator.writeEndObject();
+ }
+ }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 2dfcd3b..345873f 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -28,14 +28,15 @@ import org.apache.tika.utils.StringUtils;
import java.io.IOException;
import java.io.Reader;
+import java.io.StringWriter;
import java.io.Writer;
public class JsonFetchEmitTuple {
private static final String FETCHER = "fetcher";
private static final String FETCHKEY = "fetchKey";
- private static final String EMITTER = "emitter";
- private static final String EMITKEY = "emitKey";
+ public static final String EMITTER = "emitter";
+ public static final String EMITKEY = "emitKey";
private static final String METADATAKEY = "metadata";
public static FetchEmitTuple fromJson(Reader reader) throws IOException {
@@ -48,8 +49,11 @@ public class JsonFetchEmitTuple {
}
- private static FetchEmitTuple parseFetchEmitTuple(JsonParser jParser) throws IOException {
+ static FetchEmitTuple parseFetchEmitTuple(JsonParser jParser) throws IOException {
JsonToken token = jParser.nextToken();
+ if (token == JsonToken.START_OBJECT) {
+ token = jParser.nextToken();
+ }
String fetcherName = null;
String fetchKey = null;
String emitterName = null;
@@ -92,21 +96,32 @@ public class JsonFetchEmitTuple {
return jParser.getValueAsString();
}
+ public static String toJson(FetchEmitTuple t) throws IOException {
+ StringWriter writer = new StringWriter();
+ toJson(t, writer);
+ return writer.toString();
+ }
+
public static void toJson(FetchEmitTuple t, Writer writer) throws IOException {
try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
- jsonGenerator.writeStartObject();
- jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
- jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
- jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
- if (!StringUtils.isBlank(t.getEmitKey().getKey())) {
- jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getKey());
- }
- if (t.getMetadata().size() > 0) {
- jsonGenerator.writeFieldName(METADATAKEY);
- JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator, false);
- }
- jsonGenerator.writeEndObject();
+ writeTuple(t, jsonGenerator);
}
}
+
+ static void writeTuple(FetchEmitTuple t, JsonGenerator jsonGenerator) throws IOException {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField(FETCHER, t.getFetchKey().getFetcherName());
+ jsonGenerator.writeStringField(FETCHKEY, t.getFetchKey().getKey());
+ jsonGenerator.writeStringField(EMITTER, t.getEmitKey().getEmitterName());
+ if (!StringUtils.isBlank(t.getEmitKey().getKey())) {
+ jsonGenerator.writeStringField(EMITKEY, t.getEmitKey().getKey());
+ }
+ if (t.getMetadata().size() > 0) {
+ jsonGenerator.writeFieldName(METADATAKEY);
+ JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator, false);
+ }
+ jsonGenerator.writeEndObject();
+
+ }
}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
new file mode 100644
index 0000000..4791aee
--- /dev/null
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.StringUtils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JsonFetchEmitTupleList {
+
+ public static List<FetchEmitTuple> fromJson(Reader reader) throws IOException {
+ JsonParser jParser = new JsonFactory().createParser(reader);
+ JsonToken token = jParser.nextToken();
+ if (token != JsonToken.START_ARRAY) {
+ throw new IOException("require start array, but see: "+token.name());
+ }
+ List<FetchEmitTuple> list = new ArrayList<>();
+ while (token != JsonToken.END_ARRAY) {
+ list.add(JsonFetchEmitTuple.parseFetchEmitTuple(jParser));
+ token = jParser.nextToken();
+ }
+ return list;
+ }
+
+ public static String toJson(List<FetchEmitTuple> list) throws IOException {
+ StringWriter writer = new StringWriter();
+ toJson(list, writer);
+ return writer.toString();
+ }
+
+ public static void toJson(List<FetchEmitTuple> list, Writer writer) throws IOException {
+
+ try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+ jsonGenerator.writeStartArray();
+ for (FetchEmitTuple t : list) {
+ JsonFetchEmitTuple.writeTuple(t, jsonGenerator);
+ }
+ jsonGenerator.writeEndArray();
+ }
+ }
+}
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
index 63de2a5..b33c2a9 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadata.java
@@ -82,7 +82,7 @@ public class JsonMetadata {
*
* @param reader reader to read from
* @return Metadata or null if nothing could be read from the reader
- * @throws TikaException in case of parse failure by Gson or IO failure with Reader
+ * @throws IOException in case of parse failure or IO failure with Reader
*/
public static Metadata fromJson(Reader reader) throws IOException {
Metadata m = null;
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
index 9aa9be3..85f444e 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonMetadataList.java
@@ -65,7 +65,7 @@ public class JsonMetadataList {
*
* @param reader
* @return Metadata or null if nothing could be read from the reader
- * @throws org.apache.tika.exception.TikaException in case of parse failure by Gson or IO failure with Reader
+ * @throws IOException in case of parse failure or IO failure with Reader
*/
public static List<Metadata> fromJson(Reader reader) throws IOException {
List<Metadata> ms = null;
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
new file mode 100644
index 0000000..80c37f4
--- /dev/null
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.metadata.serialization;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.junit.Test;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class JsonFetchEmitTupleListTest {
+
+ @Test
+ public void testBasic() throws Exception {
+ List<FetchEmitTuple> list = new ArrayList<>();
+ for (int i = 0; i < 57; i++) {
+ list.add(getFetchEmitTuple(i));
+ }
+ StringWriter writer = new StringWriter();
+ JsonFetchEmitTupleList.toJson(list, writer);
+
+ Reader reader = new StringReader(writer.toString());
+ List<FetchEmitTuple> deserialized = JsonFetchEmitTupleList.fromJson(reader);
+ assertEquals(list, deserialized);
+ }
+
+ private FetchEmitTuple getFetchEmitTuple(int i) {
+ Metadata m = new Metadata();
+ m.add("m1", "v1-"+i);
+ m.add("m1", "v1-"+i);
+ m.add("m2", "v2-"+i);
+ m.add("m2", "v3-"+i);
+ m.add("m3", "v4-"+i);
+
+ return new FetchEmitTuple(
+ new FetchKey("fetcher-"+i, "fetchkey-"+i),
+ new EmitKey("emitter-"+i, "emitKey-"+i),
+ m);
+ }
+}
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
index 768cd35..543bbcd 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonMetadataListTest.java
@@ -93,7 +93,7 @@ public class JsonMetadataListTest {
@Test
public void testPrettyPrint() throws Exception {
Metadata m1 = new Metadata();
- m1.add("tika:content", "this is the content");
+ m1.add(TikaCoreProperties.TIKA_CONTENT, "this is the content");
m1.add("zk1", "v1");
m1.add("zk1", "v2");
m1.add("zk1", "v3");
@@ -121,7 +121,7 @@ public class JsonMetadataListTest {
assertTrue(writer.toString().startsWith("[ {\n" +
" \"zk1\" : [ \"v1\", \"v2\", \"v3\", \"v4\", \"v4\" ],\n" +
" \"zk2\" : \"v1\",\n" +
- " \"tika:content\" : \"this is the content\"\n" +
+ " \"X-TIKA:content\" : \"this is the content\"\n" +
"},"));
diff --git a/tika-server/tika-server-client/pom.xml b/tika-server/tika-server-client/pom.xml
index d3daf68..920b644 100644
--- a/tika-server/tika-server-client/pom.xml
+++ b/tika-server/tika-server-client/pom.xml
@@ -32,16 +32,16 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-serialization</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpcomponents.version}</version>
</dependency>
<dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>${gson.version}</version>
- </dependency>
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index 73ff3f4..336cb7d 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -16,27 +16,21 @@
*/
package org.apache.tika.server.client;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import java.io.IOException;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TikaClient {
- private static final Gson GSON = new GsonBuilder().create();
-
private final Random random = new Random();
- private final TikaConfig tikaConfig;
private List<TikaHttpClient> clients;
@@ -45,11 +39,11 @@ public class TikaClient {
for (String url : tikaServers) {
clients.add(TikaHttpClient.get(url));
}
- return new TikaClient(tikaConfig, clients);
+ return new TikaClient(clients);
}
- private TikaClient(TikaConfig tikaConfig, List<TikaHttpClient> clients) {
- this.tikaConfig = tikaConfig;
+ private TikaClient(List<TikaHttpClient> clients) {
+
this.clients = clients;
}
@@ -57,37 +51,20 @@ public class TikaClient {
}*/
- public TikaEmitterResult parse(FetchEmitTuple fetchEmit, Metadata metadata)
+ public TikaEmitterResult parse(FetchEmitTuple fetchEmit)
throws IOException, TikaException {
TikaHttpClient client = getHttpClient();
- String jsonRequest = jsonifyRequest(fetchEmit, metadata);
- return client.postJson(jsonRequest);
-
+ StringWriter writer = new StringWriter();
+ JsonFetchEmitTuple.toJson(fetchEmit, writer);
+ return client.postJson(writer.toString());
}
- private String jsonifyRequest(FetchEmitTuple fetchEmit, Metadata metadata) {
- JsonObject root = new JsonObject();
- root.add("fetcher", new JsonPrimitive(fetchEmit.getFetchKey().getFetcherName()));
- root.add("fetchKey", new JsonPrimitive(fetchEmit.getFetchKey().getKey()));
- root.add("emitter", new JsonPrimitive(fetchEmit.getEmitKey().getEmitterName()));
- root.add("emitKey", new JsonPrimitive(fetchEmit.getEmitKey().getKey()));
- if (metadata.size() > 0) {
- JsonObject m = new JsonObject();
- for (String n : metadata.names()) {
- String[] vals = metadata.getValues(n);
- if (vals.length == 1) {
- m.add(n, new JsonPrimitive(vals[0]));
- } else if (vals.length > 1) {
- JsonArray arr = new JsonArray();
- for (int i = 0; i < vals.length; i++) {
- arr.add(vals[i]);
- }
- m.add(n, arr);
- }
- }
- root.add("metadata", m);
- }
- return GSON.toJson(root);
+ public TikaEmitterResult parseAsync(List<FetchEmitTuple> tuples)
+ throws IOException, TikaException {
+ StringWriter writer = new StringWriter();
+ JsonFetchEmitTupleList.toJson(tuples, writer);
+ TikaHttpClient client = getHttpClient();
+ return client.postJsonAsync(writer.toString());
}
private TikaHttpClient getHttpClient() {
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 4b438e2..d760ab9 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -27,6 +27,7 @@ import org.xml.sax.SAXException;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -108,6 +109,42 @@ public class TikaClientCLI {
servers, numThreads);
}
+ private class AsyncFetchWorker implements Callable<Integer> {
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final TikaClient client;
+ public AsyncFetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
+ this.queue = queue;
+ this.client = client;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ List<FetchEmitTuple> localCache = new ArrayList<>();
+ while (true) {
+
+ FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+ if (t == null) {
+ send(localCache);
+ throw new TimeoutException("exceeded maxWaitMs");
+ }
+ if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ send(localCache);
+ return 1;
+ }
+ if (localCache.size() > 20) {
+ LOGGER.debug("about to send: {}", localCache.size());
+ send(localCache);
+ localCache.clear();
+ }
+ localCache.add(t);
+ }
+ }
+
+ private void send(List<FetchEmitTuple> localCache) {
+
+ }
+ }
+
private class FetchWorker implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;
@@ -130,7 +167,7 @@ public class TikaClientCLI {
}
try {
LOGGER.debug("about to parse: {}", t.getFetchKey());
- client.parse(t, t.getMetadata());
+ client.parse(t);
} catch (IOException e) {
LOGGER.warn(t.getFetchKey().toString(), e);
} catch (TikaException e) {
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
index 621f5db..591064a 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
@@ -18,7 +18,6 @@ package org.apache.tika.server.client;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
-import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
@@ -26,6 +25,7 @@ import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,12 +39,14 @@ import java.nio.charset.StandardCharsets;
*/
class TikaHttpClient {
- private static final String ENDPOINT = "emit";
+ private static final String EMIT_ENDPOINT = "emit";
private static final String TIKA_ENDPOINT = "tika";
+ private static final String ASYNC_ENDPOINT = "async";
private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClient.class);
private final HttpHost httpHost;
private final HttpClient httpClient;
- private final String endPointUrl;
+ private final String emitEndPointUrl;
+ private final String asyncEndPointUrl;
private final String tikaUrl;
private int maxRetries = 3;
//if can't make contact with Tika server, max wait time in ms
@@ -53,38 +55,46 @@ class TikaHttpClient {
private long pulseWaitForTikaMs = 1000;
static TikaHttpClient get(String baseUrl) throws TikaClientConfigException {
- String endPointUrl = baseUrl.endsWith("/") ? baseUrl+ENDPOINT : baseUrl+"/"+ENDPOINT;
- String tikaUrl = baseUrl.endsWith("/") ? baseUrl+TIKA_ENDPOINT : baseUrl+"/"+TIKA_ENDPOINT;
URI uri;
try {
- uri = new URI(endPointUrl);
+ uri = new URI(baseUrl);
} catch (URISyntaxException e) {
throw new TikaClientConfigException("bad URI", e);
}
HttpHost httpHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
//TODO: need to add other configuration stuff? proxy, username, password, timeouts...
HttpClient client = HttpClients.createDefault();
- return new TikaHttpClient(endPointUrl, tikaUrl, httpHost, client);
+ return new TikaHttpClient(baseUrl, httpHost, client);
}
/**
*
- * @param endPointUrl full url to the tika-server including endpoint
- * @param tikaUrl url to /tika endpoint to use to check on server status
+ * @param baseUrl url to base endpoint
* @param httpHost
* @param httpClient
*/
- private TikaHttpClient(String endPointUrl, String tikaUrl, HttpHost httpHost, HttpClient httpClient) {
- this.endPointUrl = endPointUrl;
- this.tikaUrl = tikaUrl;
+ private TikaHttpClient(String baseUrl, HttpHost httpHost, HttpClient httpClient) {
+ if (! baseUrl.endsWith("/")) {
+ baseUrl += "/";
+ }
+ this.emitEndPointUrl = baseUrl+EMIT_ENDPOINT;
+ this.asyncEndPointUrl = baseUrl+ASYNC_ENDPOINT;
+ this.tikaUrl = baseUrl+TIKA_ENDPOINT;
this.httpHost = httpHost;
this.httpClient = httpClient;
}
+ public TikaEmitterResult postJsonAsync(String jsonRequest) {
+ return postJson(asyncEndPointUrl, jsonRequest);
+ }
public TikaEmitterResult postJson(String jsonRequest) {
- System.out.println("NED:"+endPointUrl);
- HttpPost post = new HttpPost(endPointUrl);
+ return postJson(emitEndPointUrl, jsonRequest);
+ }
+
+ private TikaEmitterResult postJson(String endPoint, String jsonRequest) {
+ System.out.println("NED:"+endPoint);
+ HttpPost post = new HttpPost(endPoint);
ByteArrayEntity entity = new ByteArrayEntity(jsonRequest.getBytes(StandardCharsets.UTF_8));
post.setEntity(entity);
post.setHeader("Content-Type", "application/json");
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 1bacbc5..5135f55 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -37,7 +37,10 @@ import org.apache.tika.config.TikaConfig;
import org.apache.tika.parser.DigestingParser;
import org.apache.tika.parser.digestutils.BouncyCastleDigester;
import org.apache.tika.parser.digestutils.CommonsDigester;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.server.core.resource.AsyncEmitter;
+import org.apache.tika.server.core.resource.AsyncParser;
import org.apache.tika.server.core.resource.AsyncResource;
import org.apache.tika.server.core.resource.DetectorResource;
import org.apache.tika.server.core.resource.EmitterResource;
@@ -151,37 +154,42 @@ public class TikaServerProcess {
private static void mainLoop(CommandLine commandLine, Options options) throws Exception {
AsyncResource asyncResource = null;
- ArrayBlockingQueue asyncQueue = null;
- int numAsyncThreads = 10;
+ ArrayBlockingQueue<FetchEmitTuple> asyncFetchEmitQueue = null;
+ ArrayBlockingQueue<EmitData> asyncEmitData = null;
+ int numAsyncParserThreads = 10;
if (commandLine.hasOption(ENABLE_UNSECURE_FEATURES)) {
asyncResource = new AsyncResource();
- asyncQueue = asyncResource.getQueue(numAsyncThreads);
+ asyncFetchEmitQueue = asyncResource.getFetchEmitQueue(10000);
+ asyncEmitData = asyncResource.getEmitDataQueue(1000);
}
ServerDetails serverDetails = initServer(commandLine, asyncResource);
- ExecutorService executorService = Executors.newFixedThreadPool(numAsyncThreads+1);
+ ExecutorService executorService = Executors.newFixedThreadPool(numAsyncParserThreads+1);
ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
- executorCompletionService.submit(new ServerThread(serverDetails));
- if (asyncQueue != null) {
- for (int i = 0; i < numAsyncThreads; i++) {
- executorCompletionService.submit(new AsyncEmitter(asyncQueue));
+ if (asyncFetchEmitQueue != null) {
+ executorCompletionService.submit(new AsyncEmitter(asyncEmitData));
+ for (int i = 0; i < numAsyncParserThreads; i++) {
+ executorCompletionService.submit(new AsyncParser(asyncFetchEmitQueue, asyncEmitData));
}
}
+ //start the server
+ Server server = serverDetails.sf.create();
+ LOG.info("Started Apache Tika server {} at {}",
+ serverDetails.serverId,
+ serverDetails.url);
+
while (true) {
Future<Integer> future = executorCompletionService.poll(1, TimeUnit.MINUTES);
if (future != null) {
- System.out.println("future val: " + future.get());
+ LOG.warn("future val: " + future.get());
}
}
}
- //This starts the server in this process. This can
- //be either a direct call from -noFork or the process that is forked
- //in the 2.0 default mode.
+ //This returns the server, configured and ready to be started.
private static ServerDetails initServer(CommandLine line,
AsyncResource asyncResource) throws Exception {
-
String host = null;
if (line.hasOption("host")) {
@@ -459,22 +467,6 @@ public class TikaServerProcess {
return serverTimeouts;
}
- private static class ServerThread implements Callable<Integer> {
- private final ServerDetails serverDetails;
- public ServerThread(ServerDetails serverDetails) {
- this.serverDetails = serverDetails;
- }
-
- @Override
- public Integer call() throws Exception {
-
- Server server = serverDetails.sf.create();
- LOG.info("Started Apache Tika server {} at {}",
- serverDetails.serverId,
- serverDetails.url);
- return 2;
- }
- }
private static class ServerDetails {
JAXRSServerFactoryBean sf;
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
index f473699..315927a 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
@@ -1,122 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.server.core.resource;
-import org.apache.cxf.jaxrs.impl.UriInfoImpl;
-import org.apache.cxf.message.MessageImpl;
-import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.apache.tika.utils.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.core.MultivaluedHashMap;
import java.io.IOException;
-import java.io.InputStream;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
- * Worker thread that takes ASyncRequests off the queue and
- * processes them.
+ * Worker thread that takes EmitData off the queue, batches it
+ * and tries to emit it as a batch
*/
public class AsyncEmitter implements Callable<Integer> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
+ //TODO -- need to configure these
+ private final long emitWithinMs = 1000;
+
+ private long maxEstimatedBytes = 10_000_000;
- private long maxCacheSizeBytes = 10_000_000;
+ private final ArrayBlockingQueue<EmitData> emitDataQueue;
- private final ArrayBlockingQueue<AsyncRequest> queue;
+ Instant lastEmitted = Instant.now();
- public AsyncEmitter(ArrayBlockingQueue<AsyncRequest> queue) {
- this.queue = queue;
+ public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData) {
+ this.emitDataQueue = emitData;
}
@Override
public Integer call() throws Exception {
+ EmitDataCache cache = new EmitDataCache(maxEstimatedBytes);
+
while (true) {
- AsyncRequest request = queue.poll(1, TimeUnit.MINUTES);
- if (request != null) {
- processTuple(request);
+ EmitData emitData = emitDataQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (emitData != null) {
+ //this can block on emitAll
+ cache.add(emitData);
} else {
LOG.trace("Nothing on the async queue");
}
+ LOG.debug("cache size: ({}) bytes and count: {}",
+ cache.estimatedSize, cache.size);
+ long elapsed = ChronoUnit.MILLIS.between(lastEmitted, Instant.now());
+ if (elapsed > emitWithinMs) {
+ LOG.debug("{} elapsed > {}, going to emitAll",
+ elapsed, emitWithinMs);
+ //this can block
+ cache.emitAll();
+ }
}
}
- private void processTuple(AsyncRequest request) {
- LOG.debug("Starting request id ({}) of size ({})",
- request.getId(), request.getTuples().size());
- List<EmitData> cachedEmitData = new ArrayList<>();
- Emitter emitter = TikaResource.getConfig()
- .getEmitterManager()
- .getEmitter(
- request.getTuples().get(0).getEmitKey().getEmitterName());
- long currSize = 0;
- for (FetchEmitTuple t : request.getTuples()) {
- EmitData emitData = processTuple(t);
- long estimated = AbstractEmitter.estimateSizeInBytes(
- emitData.getEmitKey().getKey(), emitData.getMetadataList());
- if (estimated + currSize > maxCacheSizeBytes) {
- tryToEmit(emitter, cachedEmitData, request);
- cachedEmitData.clear();
- }
- cachedEmitData.add(emitData);
- currSize += estimated;
+ private class EmitDataCache {
+ private final long maxBytes;
+
+ long estimatedSize = 0;
+ int size = 0;
+ Map<String, List<EmitData>> map = new HashMap<>();
+
+ public EmitDataCache(long maxBytes) {
+ this.maxBytes = maxBytes;
}
- tryToEmit(emitter, cachedEmitData, request);
- cachedEmitData.clear();
- LOG.debug("Completed request id ({})",
- request.getId(), request.getTuples().size());
- }
- private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData,
- AsyncRequest request) {
- try {
- emitter.emit(cachedEmitData);
- } catch (IOException|TikaEmitterException e) {
- LOG.warn("async id ({}) emitter class ({}): {}",
- request.getId(), emitter.getClass(),
- ExceptionUtils.getStackTrace(e));
+ void updateEstimatedSize(long newBytes) {
+ estimatedSize += newBytes;
}
- }
- private EmitData processTuple(FetchEmitTuple t) {
- Metadata userMetadata = t.getMetadata();
- Metadata metadata = new Metadata();
- String fetcherName = t.getFetchKey().getFetcherName();
- String fetchKey = t.getFetchKey().getKey();
- List<Metadata> metadataList = null;
- try (InputStream stream =
- TikaResource.getConfig().getFetcherManager()
- .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
-
- metadataList = RecursiveMetadataResource.parseMetadata(
- stream,
- metadata,
- new MultivaluedHashMap<>(),
- new UriInfoImpl(new MessageImpl()), "text");
- } catch (SecurityException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn(t.toString(), e);
+ void add(EmitData data) {
+ size++;
+ long sz = AbstractEmitter.estimateSizeInBytes(data.getEmitKey().getKey(), data.getMetadataList());
+ if (estimatedSize + sz > maxBytes) {
+ LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
+ (estimatedSize+sz), maxBytes);
+ emitAll();
+ }
+ List<EmitData> cached = map.get(data.getEmitKey().getEmitterName());
+ if (cached == null) {
+ cached = new ArrayList<>();
+ map.put(data.getEmitKey().getEmitterName(), cached);
+ }
+ updateEstimatedSize(sz);
+ cached.add(data);
}
- injectUserMetadata(userMetadata, metadataList);
- return new EmitData(t.getEmitKey(), metadataList);
- }
- private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
- for (String n : userMetadata.names()) {
- //overwrite whatever was there
- metadataList.get(0).set(n, null);
- for (String val : userMetadata.getValues(n)) {
- metadataList.get(0).add(n, val);
+ private void emitAll() {
+ int emitted = 0;
+ LOG.debug("about to emit {}", size);
+ for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
+ Emitter emitter = TikaResource.getConfig()
+ .getEmitterManager().getEmitter(e.getKey());
+ tryToEmit(emitter, e.getValue());
+ emitted += e.getValue().size();
+ }
+ LOG.debug("emitted: {}", emitted);
+ estimatedSize = 0;
+ size = 0;
+ map.clear();
+ lastEmitted = Instant.now();
+ }
+
+ private void tryToEmit(Emitter emitter, List<EmitData> cachedEmitData) {
+
+ try {
+ emitter.emit(cachedEmitData);
+ } catch (IOException|TikaEmitterException e) {
+ LOG.warn("emitter class ({}): {}", emitter.getClass(),
+ ExceptionUtils.getStackTrace(e));
}
}
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
new file mode 100644
index 0000000..46098e9
--- /dev/null
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server.core.resource;
+
+import org.apache.cxf.jaxrs.impl.UriInfoImpl;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedHashMap;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Worker thread that takes {@link FetchEmitTuple} off the queue, parses
+ * the file and puts the {@link EmitData} on the queue for the {@link AsyncEmitter}.
+ *
+ */
+public class AsyncParser implements Callable<Integer> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncParser.class);
+
+ private final ArrayBlockingQueue<FetchEmitTuple> queue;
+ private final ArrayBlockingQueue<EmitData> emitDataQueue;
+
+ public AsyncParser(ArrayBlockingQueue<FetchEmitTuple> queue,
+ ArrayBlockingQueue<EmitData> emitData) {
+ this.queue = queue;
+ this.emitDataQueue = emitData;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ int parsed = 0;
+ while (true) {
+ FetchEmitTuple request = queue.poll(1, TimeUnit.MINUTES);
+ if (request != null) {
+ EmitData emitData = processTuple(request);
+ boolean offered = emitDataQueue.offer(emitData, 10, TimeUnit.MINUTES);
+ parsed++;
+ if (! offered) {
+ //TODO: deal with this
+ }
+ } else {
+ LOG.trace("Nothing on the async queue");
+ }
+ }
+ }
+
+ private EmitData processTuple(FetchEmitTuple t) {
+ Metadata userMetadata = t.getMetadata();
+ Metadata metadata = new Metadata();
+ String fetcherName = t.getFetchKey().getFetcherName();
+ String fetchKey = t.getFetchKey().getKey();
+ List<Metadata> metadataList = null;
+ try (InputStream stream =
+ TikaResource.getConfig().getFetcherManager()
+ .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
+ metadataList = RecursiveMetadataResource.parseMetadata(
+ stream,
+ metadata,
+ new MultivaluedHashMap<>(),
+ new UriInfoImpl(new MessageImpl()), "text");
+ } catch (SecurityException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn(t.toString(), e);
+ }
+ injectUserMetadata(userMetadata, metadataList);
+ EmitKey emitKey = t.getEmitKey();
+ if (StringUtils.isBlank(emitKey.getKey())) {
+ emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+ }
+ return new EmitData(emitKey, metadataList);
+ }
+
+ private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
+ for (String n : userMetadata.names()) {
+ //overwrite whatever was there
+ metadataList.get(0).set(n, null);
+ for (String val : userMetadata.getValues(n)) {
+ metadataList.get(0).add(n, val);
+ }
+ }
+ }
+}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
index f46105c..9e3566c 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.tika.server.core.resource;
import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
@@ -5,18 +21,12 @@ import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import java.util.List;
public class AsyncRequest {
- private final String id;
private final List<FetchEmitTuple> tuples;
- public AsyncRequest(String id, List<FetchEmitTuple> tuples) {
- this.id = id;
+ public AsyncRequest(List<FetchEmitTuple> tuples) {
this.tuples = tuples;
}
- public String getId() {
- return id;
- }
-
public List<FetchEmitTuple> getTuples() {
return tuples;
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 240933f..248ebb0 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -17,9 +17,10 @@
package org.apache.tika.server.core.resource;
-import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
+import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetcher.FetchKey;
@@ -28,6 +29,7 @@ import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.BadRequestException;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -35,10 +37,17 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
import java.io.InputStream;
-import java.util.Collections;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -48,14 +57,18 @@ public class AsyncResource {
private static final Logger LOG = LoggerFactory.getLogger(AsyncResource.class);
- private static final int DEFAULT_QUEUE_SIZE = 100;
- private int queueSize = DEFAULT_QUEUE_SIZE;
- private ArrayBlockingQueue<AsyncRequest> queue;
+ private static final int DEFAULT_FETCH_EMIT_QUEUE_SIZE = 10000;
+ long maxQueuePauseMs = 60000;
+ private ArrayBlockingQueue<FetchEmitTuple> queue;
- public ArrayBlockingQueue<AsyncRequest> getQueue(int numThreads) {
- this.queue = new ArrayBlockingQueue<>(queueSize+numThreads);
+ public ArrayBlockingQueue<FetchEmitTuple> getFetchEmitQueue(int queueSize) {
+ this.queue = new ArrayBlockingQueue<>(queueSize);
return queue;
}
+
+ public ArrayBlockingQueue<EmitData> getEmitDataQueue(int size) {
+ return new ArrayBlockingQueue<>(size);
+ }
/**
* The client posts a json request. At a minimum, this must be a
* json object that contains an emitter and a fetcherString key with
@@ -74,14 +87,18 @@ public class AsyncResource {
*/
@POST
@Produces("application/json")
- public Map<String, String> post(InputStream is,
+ public Map<String, Object> post(InputStream is,
@Context HttpHeaders httpHeaders,
@Context UriInfo info
) throws Exception {
AsyncRequest request = deserializeASyncRequest(is);
- FetcherManager fetcherManager = TikaConfig.getDefaultConfig().getFetcherManager();
- EmitterManager emitterManager = TikaConfig.getDefaultConfig().getEmitterManager();
+
+ //make sure that there are no problems with
+ //the requested fetchers and emitters
+ //throw early
+ FetcherManager fetcherManager = TikaResource.getConfig().getFetcherManager();
+ EmitterManager emitterManager = TikaResource.getConfig().getEmitterManager();
for (FetchEmitTuple t : request.getTuples()) {
if (! fetcherManager.getSupported().contains(t.getFetchKey().getFetcherName())) {
return badFetcher(t.getFetchKey());
@@ -90,34 +107,61 @@ public class AsyncResource {
return badEmitter(t.getEmitKey());
}
}
+ Instant start = Instant.now();
+ long elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
+ List<FetchEmitTuple> notAdded = new ArrayList<>();
+ int addedCount = 0;
+ for (FetchEmitTuple t : request.getTuples()) {
+ boolean offered = false;
+ while (!offered && elapsed < maxQueuePauseMs) {
+ offered = queue.offer(t, 10, TimeUnit.MILLISECONDS);
+ elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
+ }
+ if (! offered) {
+ notAdded.add(t);
+ } else {
+ addedCount++;
+ }
+ }
- //parameterize
- boolean offered = queue.offer(request, 60, TimeUnit.SECONDS);
- if (! offered) {
- return throttleResponse();
+ if (notAdded.size() > 0) {
+ return throttle(notAdded, addedCount);
}
- return ok(request.getId(), request.getTuples().size());
+ return ok(request.getTuples().size());
}
- private Map<String, String> ok(String id, int size) {
- return null;
+ private Map<String, Object> ok(int size) {
+ Map<String, Object> map = new HashMap<>();
+ map.put("status", "ok");
+ map.put("added", size);
+ return map;
}
- private Map<String, String> throttleResponse() {
- Map<String, String> map = new HashMap<>();
+ private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
+ List<String> fetchKeys = new ArrayList<>();
+ for (FetchEmitTuple t : notAdded) {
+ fetchKeys.add(t.getFetchKey().getKey());
+ }
+ Map<String, Object> map = new HashMap<>();
+ map.put("status", "throttled");
+ map.put("added", added);
+ map.put("skipped", notAdded.size());
+ map.put("skippedFetchKeys", fetchKeys);
return map;
}
- private Map<String, String> badEmitter(EmitKey emitKey) {
- return null;
+ private Map<String, Object> badEmitter(EmitKey emitKey) {
+ throw new BadRequestException("can't find emitter for "+emitKey.getEmitterName());
}
- private Map<String, String> badFetcher(FetchKey fetchKey) {
- return null;
+ private Map<String, Object> badFetcher(FetchKey fetchKey) {
+ throw new BadRequestException("can't find fetcher for " + fetchKey.getFetcherName());
}
- private AsyncRequest deserializeASyncRequest(InputStream is) {
- return new AsyncRequest("", Collections.EMPTY_LIST);
+ private AsyncRequest deserializeASyncRequest(InputStream is) throws IOException {
+ try (Reader reader = new InputStreamReader(is, StandardCharsets.UTF_8)) {
+ return new AsyncRequest(JsonFetchEmitTupleList.fromJson(reader));
+ }
}
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java
index 8668b16..7a634fd 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/MetadataResource.java
@@ -132,7 +132,7 @@ public class MetadataResource {
//no need to parse embedded docs
context.set(DocumentSelector.class, metadata1 -> false);
- TikaResource.logRequest(LOG, info, metadata);
+ TikaResource.logRequest(LOG, "/meta", metadata);
TikaResource.parse(parser, LOG, info.getPath(), is,
new LanguageHandler() {
public void endDocument() {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index e01a4fa..f6f340c 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -144,7 +144,7 @@ public class RecursiveMetadataResource {
fillMetadata(parser, metadata, httpHeaders);
fillParseContext(httpHeaders, metadata, context);
- TikaResource.logRequest(LOG, info, metadata);
+ TikaResource.logRequest(LOG, "/rmeta", metadata);
int writeLimit = -1;
if (httpHeaders.containsKey("writeLimit")) {
@@ -162,7 +162,7 @@ public class RecursiveMetadataResource {
new BasicContentHandlerFactory(type, writeLimit), maxEmbeddedResources,
TikaResource.getConfig().getMetadataFilter());
try {
- TikaResource.parse(wrapper, LOG, info.getPath(), is, handler, metadata, context);
+ TikaResource.parse(wrapper, LOG, "/rmeta", is, handler, metadata, context);
} catch (TikaServerParseException e) {
//do nothing
} catch (SecurityException|WebApplicationException e) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
index 46cf093..d2c33e9 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/TikaResource.java
@@ -354,11 +354,12 @@ public class TikaResource {
}
}
- public static void logRequest(Logger logger, UriInfo info, Metadata metadata) {
+ public static void logRequest(Logger logger, String endpoint, Metadata metadata) {
+
if (metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE) == null) {
- logger.info("{} (autodetecting type)", info.getPath());
+ logger.info("{} (autodetecting type)", endpoint);
} else {
- logger.info("{} ({})", info.getPath(), metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE));
+ logger.info("{} ({})", endpoint, metadata.get(org.apache.tika.metadata.HttpHeaders.CONTENT_TYPE));
}
}
@@ -403,7 +404,7 @@ public class TikaResource {
fillMetadata(parser, metadata, httpHeaders);
fillParseContext(httpHeaders, metadata, context);
- logRequest(LOG, info, metadata);
+ logRequest(LOG, "/tika", metadata);
return new StreamingOutput() {
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
@@ -432,7 +433,7 @@ public class TikaResource {
fillMetadata(parser, metadata, httpHeaders);
fillParseContext(httpHeaders, metadata, context);
- logRequest(LOG, info, metadata);
+ logRequest(LOG, "/tika", metadata);
return new StreamingOutput() {
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
@@ -489,7 +490,7 @@ public class TikaResource {
fillParseContext(httpHeaders, metadata, context);
- logRequest(LOG, info, metadata);
+ logRequest(LOG, "/tika", metadata);
return new StreamingOutput() {
public void write(OutputStream outputStream)
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java
index ed0320b..a3d733a 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/UnpackerResource.java
@@ -126,7 +126,7 @@ public class UnpackerResource {
fillMetadata(parser, metadata, httpHeaders.getRequestHeaders());
fillParseContext(httpHeaders.getRequestHeaders(), metadata, pc);
- TikaResource.logRequest(LOG, info, metadata);
+ TikaResource.logRequest(LOG, "/unpack", metadata);
//even though we aren't currently parsing embedded documents,
//we need to add this to allow for "inline" use of other parsers.
pc.set(Parser.class, parser);
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
similarity index 58%
copy from tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
copy to tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index ecf02de..2d188d9 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -17,15 +17,19 @@
package org.apache.tika.server.core;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,37 +40,35 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
+@Ignore("useful for development...need to turn it into a real unit test")
+public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(TikaServerEmitterIntegrationTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
private static Path TMP_DIR;
private static Path TMP_OUTPUT_DIR;
private static String TIKA_CONFIG_XML;
private static Path TIKA_CONFIG;
+ private static final int NUM_FILES = 8034;
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
+ private static List<String> FILE_LIST = new ArrayList<>();
private static String[] FILES = new String[] {
"hello_world.xml",
- "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
+ // "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
};
@BeforeClass
@@ -77,10 +79,16 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
Files.createDirectories(inputDir);
Files.createDirectories(TMP_OUTPUT_DIR);
- for (String mockFile : FILES) {
- Files.copy(TikaEmitterTest.class.getResourceAsStream(
- "/test-documents/mock/"+mockFile),
- inputDir.resolve(mockFile));
+ for (int i = 0; i < NUM_FILES; i++) {
+ for (String mockFile : FILES) {
+ String targetName = i+"-"+mockFile;
+ Path target = inputDir.resolve(targetName);
+ FILE_LIST.add(targetName);
+ Files.copy(TikaEmitterTest.class.getResourceAsStream(
+ "/test-documents/mock/" + mockFile),
+ target);
+
+ }
}
TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
@@ -135,7 +143,7 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
TikaServerCli.main(
new String[]{
"-enableUnsecureFeatures",
- "-maxFiles", "2000",
+ "-maxFiles", "10000",
"-p", INTEGRATION_TEST_PORT,
"-tmpFilePrefix", "basic-",
"-config", TIKA_CONFIG.toAbsolutePath().toString()
@@ -143,89 +151,27 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
}
};
serverThread.start();
- try {
- testOne("hello_world.xml", true);
- } finally {
- serverThread.interrupt();
- }
- }
-
- @Test(expected = ProcessingException.class)
- public void testSystemExit() throws Exception {
- Thread serverThread = new Thread() {
- @Override
- public void run() {
- TikaServerCli.main(
- new String[]{
- "-enableUnsecureFeatures",
- "-maxFiles", "2000",
- "-p", INTEGRATION_TEST_PORT,
- "-tmpFilePrefix", "basic-",
- "-config", TIKA_CONFIG.toAbsolutePath().toString()
- });
- }
- };
- serverThread.start();
try {
- testOne("system_exit.xml", false);
- } finally {
- serverThread.interrupt();
- }
- }
-
- @Test
- public void testOOM() throws Exception {
-
- Thread serverThread = new Thread() {
- @Override
- public void run() {
- TikaServerCli.main(
- new String[]{
- "-enableUnsecureFeatures",
- "-JXmx128m",
- "-maxFiles", "2000",
- "-p", INTEGRATION_TEST_PORT,
- "-tmpFilePrefix", "basic-",
- "-config", TIKA_CONFIG.toAbsolutePath().toString()
- });
+ JsonNode response = sendAsync(FILE_LIST);
+ System.out.println(response);
+ int targets = 0;
+ while (targets < NUM_FILES) {
+ System.out.println("targets "+targets);
+ targets = countTargets();
+ Thread.sleep(1000);
}
- };
- serverThread.start();
- try {
- JsonNode response = testOne("real_oom.xml", false);
- assertContains("heap space", response.get("parse_error").asText());
+ System.out.println("TARGETS: "+targets);
} finally {
serverThread.interrupt();
}
}
- @Test(expected = ProcessingException.class)
- public void testTimeout() throws Exception {
-
- Thread serverThread = new Thread() {
- @Override
- public void run() {
- TikaServerCli.main(
- new String[]{
- "-enableUnsecureFeatures",
- "-JXmx128m",
- "-taskTimeoutMillis", "2000", "-taskPulseMillis", "100",
- "-p", INTEGRATION_TEST_PORT,
- "-tmpFilePrefix", "basic-",
- "-config", TIKA_CONFIG.toAbsolutePath().toString()
- });
- }
- };
- serverThread.start();
- try {
- JsonNode response = testOne("heavy_hang_30000.xml", false);
- assertContains("heap space", response.get("parse_error").asText());
- } finally {
- serverThread.interrupt();
- }
+ private int countTargets() {
+ return TMP_OUTPUT_DIR.toFile().listFiles().length;
}
+
private void awaitServerStartup() throws Exception {
Instant started = Instant.now();
long elapsed = Duration.between(started, Instant.now()).toMillis();
@@ -241,7 +187,7 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
}
LOG.debug("tika test client failed to connect to server with status: {}", response.getStatus());
- } catch (javax.ws.rs.ProcessingException e) {
+ } catch (ProcessingException e) {
LOG.debug("tika test client failed to connect to server", e);
}
@@ -252,28 +198,28 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
elapsed + " ms");
}
- private JsonNode testOne(String fileName, boolean shouldFileExist) throws Exception {
+ private JsonNode sendAsync(List<String> fileNames) throws Exception {
awaitServerStartup();
+ List<FetchEmitTuple> tuples = new ArrayList<>();
+ for (String f : fileNames) {
+ tuples.add(getFetchEmitTuple(f));
+ }
+ String json = JsonFetchEmitTupleList.toJson(tuples);
+
Response response = WebClient
- .create(endPoint + "/emit")
+ .create(endPoint + "/async")
.accept("application/json")
- .post(getJsonString(fileName));
- if (shouldFileExist) {
- Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
- assertTrue(Files.size(targFile) > 1);
- }
+ .post(json);
+ System.out.println("status: " + response.getStatus());
Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
return new ObjectMapper().readTree(reader);
}
- private String getJsonString(String fileName) throws IOException {
- StringWriter writer = new StringWriter();
- try (JsonGenerator generator = new JsonFactory().createGenerator(writer)) {
- generator.writeStartObject();
- generator.writeStringField("fetcher", FETCHER_NAME);
- generator.writeStringField("fetchKey", fileName);
- generator.writeStringField("emitter", EMITTER_NAME);
- }
- return writer.toString();
+ private FetchEmitTuple getFetchEmitTuple(String fileName) throws IOException {
+ return new FetchEmitTuple(
+ new FetchKey(FETCHER_NAME, fileName),
+ new EmitKey(EMITTER_NAME, ""),
+ new Metadata()
+ );
}
}
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index ecf02de..04488d1 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -17,12 +17,15 @@
package org.apache.tika.server.core;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -36,21 +39,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
-import java.util.List;
-import java.util.Random;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
@@ -64,7 +63,7 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
- private static String[] FILES = new String[] {
+ private static String[] FILES = new String[]{
"hello_world.xml",
"heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
};
@@ -79,30 +78,30 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
for (String mockFile : FILES) {
Files.copy(TikaEmitterTest.class.getResourceAsStream(
- "/test-documents/mock/"+mockFile),
+ "/test-documents/mock/" + mockFile),
inputDir.resolve(mockFile));
}
TIKA_CONFIG = TMP_DIR.resolve("tika-config.xml");
- TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"+
- "<properties>"+
- "<fetchers>"+
- "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">"+
- "<params>"+
- "<param name=\"name\" type=\"string\">"+FETCHER_NAME+"</param>"+
- "<param name=\"basePath\" type=\"string\">"+inputDir.toAbsolutePath()+"</param>"+
- "</params>"+
- "</fetcher>"+
- "</fetchers>"+
- "<emitters>"+
- "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">"+
- "<params>"+
- "<param name=\"name\" type=\"string\">"+EMITTER_NAME+"</param>"+
+ TIKA_CONFIG_XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
+ "<properties>" +
+ "<fetchers>" +
+ "<fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+ "<params>" +
+ "<param name=\"name\" type=\"string\">" + FETCHER_NAME + "</param>" +
+ "<param name=\"basePath\" type=\"string\">" + inputDir.toAbsolutePath() + "</param>" +
+ "</params>" +
+ "</fetcher>" +
+ "</fetchers>" +
+ "<emitters>" +
+ "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" +
+ "<params>" +
+ "<param name=\"name\" type=\"string\">" + EMITTER_NAME + "</param>" +
- "<param name=\"basePath\" type=\"string\">"+ TMP_OUTPUT_DIR.toAbsolutePath()+"</param>"+
- "</params>"+
- "</emitter>"+
- "</emitters>"+
+ "<param name=\"basePath\" type=\"string\">" + TMP_OUTPUT_DIR.toAbsolutePath() + "</param>" +
+ "</params>" +
+ "</emitter>" +
+ "</emitters>" +
"</properties>";
FileUtils.write(TIKA_CONFIG.toFile(), TIKA_CONFIG_XML, UTF_8);
@@ -144,7 +143,10 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
};
serverThread.start();
try {
- testOne("hello_world.xml", true);
+ JsonNode node = testOne("hello_world.xml", true);
+ assertEquals("ok", node.get("status").asText());
+ } catch (Exception e) {
+ fail("shouldn't have an exception" + e.getMessage());
} finally {
serverThread.interrupt();
}
@@ -220,7 +222,6 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
serverThread.start();
try {
JsonNode response = testOne("heavy_hang_30000.xml", false);
- assertContains("heap space", response.get("parse_error").asText());
} finally {
serverThread.interrupt();
}
@@ -229,14 +230,14 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
private void awaitServerStartup() throws Exception {
Instant started = Instant.now();
long elapsed = Duration.between(started, Instant.now()).toMillis();
- WebClient client = WebClient.create(endPoint+"/tika").accept("text/plain");
+ WebClient client = WebClient.create(endPoint + "/tika").accept("text/plain");
while (elapsed < MAX_WAIT_MS) {
try {
Response response = client.get();
if (response.getStatus() == 200) {
elapsed = Duration.between(started, Instant.now()).toMillis();
LOG.info("client observes server successfully started after " +
- elapsed+ " ms");
+ elapsed + " ms");
return;
}
LOG.debug("tika test client failed to connect to server with status: {}", response.getStatus());
@@ -258,22 +259,23 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
.create(endPoint + "/emit")
.accept("application/json")
.post(getJsonString(fileName));
- if (shouldFileExist) {
- Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
- assertTrue(Files.size(targFile) > 1);
+ if (response.getStatus() == 200) {
+ if (shouldFileExist) {
+ Path targFile = TMP_OUTPUT_DIR.resolve(fileName + ".json");
+ assertTrue(Files.size(targFile) > 1);
+ }
+ Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
+ return new ObjectMapper().readTree(reader);
}
- Reader reader = new InputStreamReader((InputStream) response.getEntity(), UTF_8);
- return new ObjectMapper().readTree(reader);
+ return null;
}
private String getJsonString(String fileName) throws IOException {
- StringWriter writer = new StringWriter();
- try (JsonGenerator generator = new JsonFactory().createGenerator(writer)) {
- generator.writeStartObject();
- generator.writeStringField("fetcher", FETCHER_NAME);
- generator.writeStringField("fetchKey", fileName);
- generator.writeStringField("emitter", EMITTER_NAME);
- }
- return writer.toString();
+ FetchEmitTuple t = new FetchEmitTuple(
+ new FetchKey(FETCHER_NAME, fileName),
+ new EmitKey(EMITTER_NAME, ""),
+ new Metadata()
+ );
+ return JsonFetchEmitTuple.toJson(t);
}
}