You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/06/13 14:48:31 UTC
[tika] 02/02: TIKA-3790 -- actually implement tika server client via pipes (not yet async)
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 7877f9b33dbf242bc2fbc030f23309c025d5c71a
Author: tallison <ta...@apache.org>
AuthorDate: Mon Jun 13 10:48:19 2022 -0400
TIKA-3790 -- actually implement tika server client via pipes (not yet async)
---
CHANGES.txt | 4 +
.../java/org/apache/tika/pipes/PipesClient.java | 5 +-
.../java/org/apache/tika/pipes/PipesConfig.java | 10 +-
.../pipesiterator/fs/FileSystemPipesIterator.java | 2 -
tika-server/tika-server-client/pom.xml | 5 +
.../tika/server/client/TikaAsyncHttpClient.java} | 29 +++--
.../org/apache/tika/server/client/TikaClient.java | 28 ++---
.../apache/tika/server/client/TikaClientCLI.java | 121 +++++++--------------
...ikaHttpClient.java => TikaPipesHttpClient.java} | 68 ++++++------
.../tika/server/client/TikaServerClientConfig.java | 118 ++++++++++++++++++++
.../org/apache/tika/server/client/TestBasic.java | 19 +++-
.../resources/tika-config-simple-fs-emitter.xml | 85 ++++++++++-----
12 files changed, 308 insertions(+), 186 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index bc7c68f4d..40a8827f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,9 @@
Release 2.4.1 - ???
+ * Implement bulk upload in the OpenSearch emitter (TIKA-3791).
+
+ * Implement tika-server client via pipes mode (TIKA-3790).
+
* Custom embedded parsers and EmbeddedDocumentHandlers
can now add metadata to the container file's
metadata (TIKA-3789).
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 9bcd4f755..94cd40a5f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -167,7 +167,10 @@ public class PipesClient implements Closeable {
throw new InterruptedException("thread interrupt");
}
PipesResult result = readResults(t, start);
- LOG.info("finished reading result ");
+ if (LOG.isDebugEnabled()) {
+ long elapsed = System.currentTimeMillis() - readStart;
+ LOG.debug("finished reading result in {} ms", elapsed);
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("pipesClientId={}: timer -- read result: {} ms",
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
index 94ae135b8..06783d67c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -22,10 +22,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.tika.exception.TikaConfigException;
public class PipesConfig extends PipesConfigBase {
+ private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);
+
private long maxWaitForClientMillis = 60000;
public static PipesConfig load(Path tikaConfig) throws IOException, TikaConfigException {
@@ -34,8 +39,9 @@ public class PipesConfig extends PipesConfigBase {
Set<String> settings = pipesConfig.configure("pipes", is);
}
if (pipesConfig.getTikaConfig() == null) {
- throw new TikaConfigException("must specify at least a <tikaConfig> element in the " +
- "<params> of <pipes>");
+ LOG.debug("A separate tikaConfig was not specified in the <pipes/> element in the " +
+ "config file; will use {} for pipes", tikaConfig);
+ pipesConfig.setTikaConfig(tikaConfig);
}
return pipesConfig;
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
index 35f424a37..88f29c5aa 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
@@ -124,6 +124,4 @@ public class FileSystemPipesIterator extends PipesIterator implements Initializa
return FileVisitResult.CONTINUE;
}
}
-
-
}
diff --git a/tika-server/tika-server-client/pom.xml b/tika-server/tika-server-client/pom.xml
index 27c59598c..7e21a1361 100644
--- a/tika-server/tika-server-client/pom.xml
+++ b/tika-server/tika-server-client/pom.xml
@@ -37,6 +37,11 @@
<artifactId>tika-serialization</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-httpclient-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
diff --git a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaAsyncHttpClient.java
similarity index 57%
copy from tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
copy to tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaAsyncHttpClient.java
index d2b844edd..8da79c736 100644
--- a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaAsyncHttpClient.java
@@ -16,24 +16,23 @@
*/
package org.apache.tika.server.client;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.exception.TikaConfigException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+/**
+ * Low-level class to handle the http layer.
+ */
+class TikaAsyncHttpClient extends TikaPipesHttpClient {
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
+ private TikaAsyncHttpClient(String baseUrl, HttpClientFactory httpClientFactory)
+ throws TikaConfigException {
+ super(baseUrl, httpClientFactory);
+ }
-@Disabled("turn into actual unit test")
-public class TestBasic {
+ private final String endPoint = "async";
- @Test
- public void testBasic() throws Exception {
- Path p = Paths.get(
- TestBasic.class.getResource("/tika-config-simple-fs-emitter.xml").toURI());
- assertTrue(Files.isRegularFile(p));
- String[] args = new String[]{p.toAbsolutePath().toString(), "http://localhost:9998/", "fs"};
- TikaClientCLI.main(args);
+ String getEndpoint() {
+ return endPoint;
}
+
}
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index 1ebbcfd63..99e4f18e5 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -22,28 +22,28 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import org.apache.tika.config.TikaConfig;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
import org.apache.tika.pipes.FetchEmitTuple;
public class TikaClient {
private final Random random = new Random();
- private final List<TikaHttpClient> clients;
+ private final List<TikaPipesHttpClient> clients;
- private TikaClient(List<TikaHttpClient> clients) {
-
+ private TikaClient(List<TikaPipesHttpClient> clients) {
this.clients = clients;
}
- public static TikaClient get(TikaConfig tikaConfig, List<String> tikaServers)
- throws TikaClientConfigException {
+ public static TikaClient get(HttpClientFactory httpClientFactory, List<String> tikaServers)
+ throws TikaConfigException {
List clients = new ArrayList<>();
for (String url : tikaServers) {
- clients.add(TikaHttpClient.get(url));
+ //client factory is not thread safe, create a copy per client
+ clients.add(new TikaPipesHttpClient(url, httpClientFactory.copy()));
}
return new TikaClient(clients);
}
@@ -54,21 +54,13 @@ public class TikaClient {
}*/
public TikaEmitterResult parse(FetchEmitTuple fetchEmit) throws IOException, TikaException {
- TikaHttpClient client = getHttpClient();
+ TikaPipesHttpClient client = getHttpClient();
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(fetchEmit, writer);
return client.postJson(writer.toString());
}
- public TikaEmitterResult parseAsync(List<FetchEmitTuple> tuples)
- throws IOException, TikaException {
- StringWriter writer = new StringWriter();
- JsonFetchEmitTupleList.toJson(tuples, writer);
- TikaHttpClient client = getHttpClient();
- return client.postJsonAsync(writer.toString());
- }
-
- private TikaHttpClient getHttpClient() {
+ private TikaPipesHttpClient getHttpClient() {
if (clients.size() == 1) {
return clients.get(0);
}
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 8c5a1531a..ee54a9421 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -19,10 +19,7 @@ package org.apache.tika.server.client;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -37,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
-import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
@@ -47,49 +43,50 @@ public class TikaClientCLI {
private static final Logger LOGGER = LoggerFactory.getLogger(TikaClientCLI.class);
private static final int QUEUE_SIZE = 10000;
- private final long maxWaitMs = 300000;
-
public static void main(String[] args) throws Exception {
- //TODO -- add an actual commandline,
Path tikaConfigPath = Paths.get(args[0]);
- int numThreads = Integer.parseInt(args[1]);
- List<String> tikaServerUrls = Arrays.asList(args[2].split(","));
TikaClientCLI cli = new TikaClientCLI();
- cli.execute(tikaConfigPath, tikaServerUrls, numThreads);
+ cli.execute(tikaConfigPath);
}
- private void execute(Path tikaConfigPath, List<String> tikaServerUrls, int numThreads)
+ private void execute(Path tikaConfigPath)
throws TikaException, IOException, SAXException {
- TikaConfig config = new TikaConfig(tikaConfigPath);
-
- ExecutorService executorService = Executors.newFixedThreadPool(numThreads + 1);
- ExecutorCompletionService<Integer> completionService =
- new ExecutorCompletionService<>(executorService);
- final PipesIterator pipesIterator =
- PipesIterator.build(tikaConfigPath);
- final ArrayBlockingQueue<FetchEmitTuple> queue =
- new ArrayBlockingQueue<>(QUEUE_SIZE);
-
- completionService.submit(new PipesIteratorWrapper(pipesIterator, queue, numThreads));
- if (tikaServerUrls.size() == numThreads) {
- logDiffSizes(tikaServerUrls.size(), numThreads);
- for (int i = 0; i < numThreads; i++) {
+ TikaServerClientConfig clientConfig = TikaServerClientConfig.build(tikaConfigPath);
+
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(clientConfig.getNumThreads() + 1);
+
+ ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
+
+ final PipesIterator pipesIterator = PipesIterator.build(tikaConfigPath);
+
+ final ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+
+ completionService.submit(new PipesIteratorWrapper(pipesIterator, queue));
+
+ if (clientConfig.getTikaEndpoints().size() == clientConfig.getNumThreads()) {
+ logDiffSizes(clientConfig.getTikaEndpoints().size(), clientConfig.getNumThreads());
+ for (int i = 0; i < clientConfig.getNumThreads(); i++) {
TikaClient client =
- TikaClient.get(config, Collections.singletonList(tikaServerUrls.get(i)));
- completionService.submit(new FetchWorker(queue, client));
+ TikaClient.get(clientConfig.getHttpClientFactory(),
+ Collections.singletonList(clientConfig.getTikaEndpoints().get(i)));
+ completionService.submit(new FetchWorker(queue, client,
+ clientConfig.getMaxWaitMillis()));
}
} else {
- for (int i = 0; i < numThreads; i++) {
- TikaClient client = TikaClient.get(config, tikaServerUrls);
- completionService.submit(new FetchWorker(queue, client));
+ for (int i = 0; i < clientConfig.getNumThreads(); i++) {
+ TikaClient client = TikaClient.get(clientConfig.getHttpClientFactory(),
+ clientConfig.getTikaEndpoints());
+ completionService.submit(new FetchWorker(queue, client,
+ clientConfig.getMaxWaitMillis()));
}
}
int finished = 0;
- while (finished < numThreads + 1) {
+ while (finished < clientConfig.getNumThreads() + 1) {
Future<Integer> future = null;
try {
- future = completionService.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+ future = completionService.poll(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//stop the world
LOGGER.error("", e);
@@ -101,7 +98,7 @@ public class TikaClientCLI {
future.get();
} catch (InterruptedException | ExecutionException e) {
//stop the world
- LOGGER.error("", e);
+ LOGGER.error("critical main loop failure", e);
throw new RuntimeException(e);
}
}
@@ -113,55 +110,21 @@ public class TikaClientCLI {
"Each client will randomly select a server from this list", servers, numThreads);
}
- private class AsyncFetchWorker implements Callable<Integer> {
- private final ArrayBlockingQueue<FetchEmitTuple> queue;
- private final TikaClient client;
-
- public AsyncFetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
- this.queue = queue;
- this.client = client;
- }
-
- @Override
- public Integer call() throws Exception {
- List<FetchEmitTuple> localCache = new ArrayList<>();
- while (true) {
-
- FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
- if (t == null) {
- send(localCache);
- throw new TimeoutException("exceeded maxWaitMs");
- }
- if (t == PipesIterator.COMPLETED_SEMAPHORE) {
- send(localCache);
- return 1;
- }
- if (localCache.size() > 20) {
- LOGGER.debug("about to send: {}", localCache.size());
- send(localCache);
- localCache.clear();
- }
- localCache.add(t);
- }
- }
-
- private void send(List<FetchEmitTuple> localCache) {
-
- }
- }
-
private class FetchWorker implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;
- public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
+ private final long maxWaitMs;
+
+ public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue,
+ TikaClient client, long maxWaitMs) {
this.queue = queue;
this.client = client;
+ this.maxWaitMs = maxWaitMs;
}
@Override
public Integer call() throws Exception {
-
while (true) {
FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
@@ -169,6 +132,8 @@ public class TikaClientCLI {
throw new TimeoutException("exceeded maxWaitMs");
}
if (t == PipesIterator.COMPLETED_SEMAPHORE) {
+ //potentially blocks forever
+ queue.put(PipesIterator.COMPLETED_SEMAPHORE);
return 1;
}
try {
@@ -184,15 +149,10 @@ public class TikaClientCLI {
private static class PipesIteratorWrapper implements Callable<Integer> {
private final PipesIterator pipesIterator;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
- private final int numThreads;
-
public PipesIteratorWrapper(PipesIterator pipesIterator,
- ArrayBlockingQueue<FetchEmitTuple> queue,
- int numThreads) {
+ ArrayBlockingQueue<FetchEmitTuple> queue) {
this.pipesIterator = pipesIterator;
this.queue = queue;
- this.numThreads = numThreads;
-
}
@Override
@@ -201,9 +161,8 @@ public class TikaClientCLI {
//potentially blocks forever
queue.put(t);
}
- for (int i = 0; i < numThreads; i ++) {
- queue.put(PipesIterator.COMPLETED_SEMAPHORE);
- }
+ //potentially blocks forever
+ queue.put(PipesIterator.COMPLETED_SEMAPHORE);
return 1;
}
}
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaPipesHttpClient.java
similarity index 74%
rename from tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
rename to tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaPipesHttpClient.java
index 2c366a64d..4e712646d 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaPipesHttpClient.java
@@ -17,36 +17,34 @@
package org.apache.tika.server.client;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
-import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
/**
* Low-level class to handle the http layer.
*/
-class TikaHttpClient {
+class TikaPipesHttpClient {
- private static final String EMIT_ENDPOINT = "emit";
private static final String TIKA_ENDPOINT = "tika";
- private static final String ASYNC_ENDPOINT = "async";
- private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClient.class);
- private final HttpHost httpHost;
- private final HttpClient httpClient;
- private final String emitEndPointUrl;
- private final String asyncEndPointUrl;
+ private static final Logger LOGGER = LoggerFactory.getLogger(TikaPipesHttpClient.class);
+ private final String endPoint = "pipes";
+
+ private final HttpClientFactory httpClientFactory;
+ private HttpClient httpClient;
+ private final String endPointUrl;
private final String tikaUrl;
private final int maxRetries = 3;
//if can't make contact with Tika server, max wait time in ms
@@ -56,39 +54,37 @@ class TikaHttpClient {
/**
* @param baseUrl url to base endpoint
- * @param httpHost
- * @param httpClient
*/
- private TikaHttpClient(String baseUrl, HttpHost httpHost, HttpClient httpClient) {
+ TikaPipesHttpClient(String baseUrl, HttpClientFactory httpClientFactory)
+ throws TikaConfigException {
if (!baseUrl.endsWith("/")) {
baseUrl += "/";
}
- this.emitEndPointUrl = baseUrl + EMIT_ENDPOINT;
- this.asyncEndPointUrl = baseUrl + ASYNC_ENDPOINT;
+ this.endPointUrl = baseUrl + getEndpoint();
this.tikaUrl = baseUrl + TIKA_ENDPOINT;
- this.httpHost = httpHost;
- this.httpClient = httpClient;
+ this.httpClientFactory = httpClientFactory;
+ this.httpClient = getNewClient(baseUrl);
}
- static TikaHttpClient get(String baseUrl) throws TikaClientConfigException {
- URI uri;
- try {
- uri = new URI(baseUrl);
- } catch (URISyntaxException e) {
- throw new TikaClientConfigException("bad URI", e);
- }
- HttpHost httpHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
- //TODO: need to add other configuration stuff? proxy, username, password, timeouts...
- HttpClient client = HttpClients.createDefault();
- return new TikaHttpClient(baseUrl, httpHost, client);
+ String getEndpoint() {
+ return endPoint;
}
- public TikaEmitterResult postJsonAsync(String jsonRequest) {
- return postJson(asyncEndPointUrl, jsonRequest);
+ private HttpClient getNewClient(String baseUrl)
+ throws TikaConfigException {
+ if (httpClient instanceof CloseableHttpClient) {
+ try {
+ ((CloseableHttpClient) httpClient).close();
+ } catch (IOException e) {
+ LOGGER.warn("exception closing client", e);
+ }
+ }
+ return httpClientFactory.build();
}
+
public TikaEmitterResult postJson(String jsonRequest) {
- return postJson(emitEndPointUrl, jsonRequest);
+ return postJson(endPointUrl, jsonRequest);
}
private TikaEmitterResult postJson(String endPoint, String jsonRequest) {
@@ -103,7 +99,7 @@ class TikaHttpClient {
while (tries++ < maxRetries) {
HttpResponse response = null;
try {
- response = httpClient.execute(httpHost, post);
+ response = httpClient.execute(post);
} catch (IOException e) {
LOGGER.warn("Exception trying to parse", e);
waitForServer();
@@ -147,7 +143,7 @@ class TikaHttpClient {
HttpGet get = new HttpGet(tikaUrl);
try {
- HttpResponse response = httpClient.execute(httpHost, get);
+ HttpResponse response = httpClient.execute(get);
if (response.getStatusLine().getStatusCode() == 200) {
LOGGER.debug("server back up");
return;
@@ -164,7 +160,7 @@ class TikaHttpClient {
throw new TimeoutWaitingForTikaException("");
}
- private static class TimeoutWaitingForTikaException extends TikaException {
+ static class TimeoutWaitingForTikaException extends TikaException {
public TimeoutWaitingForTikaException(String msg) {
super(msg);
}
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaServerClientConfig.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaServerClientConfig.java
new file mode 100644
index 000000000..6fe64e973
--- /dev/null
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaServerClientConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.config.ConfigBase;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+
+public class TikaServerClientConfig extends ConfigBase implements Initializable {
+
+
+ public static TikaServerClientConfig build(Path configFile)
+ throws IOException, TikaConfigException {
+ try (InputStream is = Files.newInputStream(configFile)) {
+ return buildSingle("serverClientConfig", TikaServerClientConfig.class, is);
+ }
+ }
+
+ enum MODE {
+ PIPES, ASYNC
+ }
+
+ private HttpClientFactory httpClientFactory;
+ private int numThreads = 1;
+ private MODE mode = MODE.PIPES;
+ private List<String> tikaEndpoints = new ArrayList<>();
+
+ private long maxWaitMillis = 60000;
+
+ public long getMaxWaitMillis() {
+ return maxWaitMillis;
+ }
+
+ /**
+ * maximum time in milliseconds to wait for a new fetchemittuple to be
+ * available from the queue. The client will end if no tuple is available
+ * within this amount of time.
+ *
+ * @param maxWaitMs
+ */
+ public void setMaxWaitMillis(long maxWaitMs) {
+ this.maxWaitMillis = maxWaitMs;
+ }
+
+ public void setMode(String mode) {
+ if ("pipes".equals(mode)) {
+ this.mode = MODE.PIPES;
+ return;
+ }
+ throw new IllegalArgumentException("I regret that we have not yet implemented: '" + mode + "'");
+ }
+
+ public HttpClientFactory getHttpClientFactory() {
+ return httpClientFactory;
+ }
+
+ public void setHttpClientFactory(HttpClientFactory httpClientFactory) {
+ this.httpClientFactory = httpClientFactory;
+ }
+
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ public void setNumThreads(int numThreads) {
+ this.numThreads = numThreads;
+ }
+
+ public MODE getMode() {
+ return mode;
+ }
+
+ public List<String> getTikaEndpoints() {
+ return tikaEndpoints;
+ }
+
+ public void setTikaEndpoints(List<String> tikaEndpoints) {
+ this.tikaEndpoints = tikaEndpoints;
+ }
+
+ @Override
+ public void initialize(Map<String, Param> params) throws TikaConfigException {
+
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
+ if (tikaEndpoints.size() == 0) {
+ throw new TikaConfigException("tikaEndpoints must not be empty");
+ }
+ }
+}
diff --git a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java b/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
index d2b844edd..4800f7ea6 100644
--- a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
+++ b/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
@@ -16,6 +16,7 @@
*/
package org.apache.tika.server.client;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Files;
@@ -25,15 +26,29 @@ import java.nio.file.Paths;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-@Disabled("turn into actual unit test")
public class TestBasic {
@Test
+ public void testConfig() throws Exception {
+ Path p = Paths.get(
+ TestBasic.class.getResource("/tika-config-simple-fs-emitter.xml").toURI());
+ assertTrue(Files.isRegularFile(p));
+
+ TikaServerClientConfig clientConfig = TikaServerClientConfig.build(p);
+ assertEquals(1, clientConfig.getNumThreads());
+ assertEquals(5, clientConfig.getHttpClientFactory().getMaxConnections());
+ }
+
+ @Test
+ @Disabled("turn this into an actual test in tika-integration-tests?")
public void testBasic() throws Exception {
Path p = Paths.get(
TestBasic.class.getResource("/tika-config-simple-fs-emitter.xml").toURI());
assertTrue(Files.isRegularFile(p));
- String[] args = new String[]{p.toAbsolutePath().toString(), "http://localhost:9998/", "fs"};
+ String[] args = new String[]{p.toAbsolutePath().toString()};
+ long start = System.currentTimeMillis();
TikaClientCLI.main(args);
+ long elapsed = System.currentTimeMillis() - start;
+ System.out.println("elapsed " + elapsed);
}
}
diff --git a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 3938a6941..f25a1cda2 100644
--- a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -19,19 +19,23 @@
-->
<properties>
<service-loader initializableProblemHandler="throw"/>
- <pipesIterators>
- <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
- <params>
- <fetcherName>fs</fetcherName>
- <basePath>fix</basePath>
- </params>
- </pipesIterator>
- </pipesIterators>
+ <parsers>
+ <parser class="org.apache.tika.parser.DefaultParser">
+ <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+ </parser>
+ </parsers>
+ <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
+ <params>
+ <fetcherName>fs</fetcherName>
+ <emitterName>fs</emitterName>
+ <basePath>UPDATE</basePath>
+ </params>
+ </pipesIterator>
<fetchers>
<fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
<params>
<name>fs</name>
- <basePath>fix</basePath>
+ <basePath>UPDATE</basePath>
</params>
</fetcher>
</fetchers>
@@ -53,27 +57,50 @@
<emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
<params>
<name>fs</name>
- <basePath>fix</basePath>
+ <basePath>UPDATE</basePath>
</params>
</emitter>
- <!--
- <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
- <params>
- <param name="name" type="string">solr1</param>
- <param name="url" type="string">http://localhost:8983/solr/tika-test</param>
- <param name="attachmentStrategy" type="string">concatenate-content</param>
- <param name="contentField" type="string">content</param>
- <param name="commitWithin" type="int">10</param>
- </params>
- </emitter>
- <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
- <params>
- <param name="name" type="string">solr2</param>
- <param name="url" type="string">http://localhost:8983/solr/tika-test</param>
- <param name="attachmentStrategy" type="string">parent-child</param>
- <param name="contentField" type="string">content</param>
- <param name="commitWithin" type="int">10</param>
- </params>
- </emitter>-->
</emitters>
+ <pipes>
+ <params>
+ <numClients>6</numClients>
+ <forkedJvmArgs>
+ <arg>-Xmx1g</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ </params>
+ </pipes>
+ <server>
+ <params>
+ <port>9998</port>
+ <host>localhost</host>
+ <digest>sha256</digest>
+ <digestMarkLimit>1000000</digestMarkLimit>
+ <logLevel>debug</logLevel>
+ <returnStackTrace>true</returnStackTrace>
+ <noFork>true</noFork>
+ <endpoints>
+ <endpoint>tika</endpoint>
+ <endpoint>pipes</endpoint>
+ </endpoints>
+ </params>
+ </server>
+ <serverClientConfig>
+ <params>
+ <!-- this is the only option so far -->
+ <mode>pipes</mode>
+ <!-- this should be the same number as that in the pipes' numClients -->
+ <numThreads>6</numThreads>
+ <maxWaitMillis>600000</maxWaitMillis>
+ <tikaEndpoints>
+ <tikaEndpoint>http://localhost:9998</tikaEndpoint>
+ </tikaEndpoints>
+ </params>
+ <httpClientFactory class="org.apache.tika.client.HttpClientFactory">
+ <params>
+ <maxConnections>5</maxConnections>
+ </params>
+ </httpClientFactory>
+ </serverClientConfig>
</properties>
\ No newline at end of file