You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/06/13 14:48:31 UTC

[tika] 02/02: TIKA-3790 -- actually implement tika server client via pipes (not yet async)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 7877f9b33dbf242bc2fbc030f23309c025d5c71a
Author: tallison <ta...@apache.org>
AuthorDate: Mon Jun 13 10:48:19 2022 -0400

    TIKA-3790 -- actually implement tika server client via pipes (not yet async)
---
 CHANGES.txt                                        |   4 +
 .../java/org/apache/tika/pipes/PipesClient.java    |   5 +-
 .../java/org/apache/tika/pipes/PipesConfig.java    |  10 +-
 .../pipesiterator/fs/FileSystemPipesIterator.java  |   2 -
 tika-server/tika-server-client/pom.xml             |   5 +
 .../tika/server/client/TikaAsyncHttpClient.java}   |  29 +++--
 .../org/apache/tika/server/client/TikaClient.java  |  28 ++---
 .../apache/tika/server/client/TikaClientCLI.java   | 121 +++++++--------------
 ...ikaHttpClient.java => TikaPipesHttpClient.java} |  68 ++++++------
 .../tika/server/client/TikaServerClientConfig.java | 118 ++++++++++++++++++++
 .../org/apache/tika/server/client/TestBasic.java   |  19 +++-
 .../resources/tika-config-simple-fs-emitter.xml    |  85 ++++++++++-----
 12 files changed, 308 insertions(+), 186 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index bc7c68f4d..40a8827f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,9 @@
 Release 2.4.1 - ???
 
+   * Implement bulk upload in the OpenSearch emitter (TIKA-3791).
+
+   * Implement tika-server client via pipes mode (TIKA-3790).
+
    * Custom embedded parsers and EmbeddedDocumentHandlers
      can now add metadata to the container file's
      metadata (TIKA-3789).
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 9bcd4f755..94cd40a5f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -167,7 +167,10 @@ public class PipesClient implements Closeable {
                 throw new InterruptedException("thread interrupt");
             }
             PipesResult result = readResults(t, start);
-            LOG.info("finished reading result ");
+            if (LOG.isDebugEnabled()) {
+                long elapsed = System.currentTimeMillis() - readStart;
+                LOG.debug("finished reading result in {} ms", elapsed);
+            }
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("pipesClientId={}: timer -- read result: {} ms",
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
index 94ae135b8..06783d67c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -22,10 +22,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.tika.exception.TikaConfigException;
 
 public class PipesConfig extends PipesConfigBase {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);
+
     private long maxWaitForClientMillis = 60000;
 
     public static PipesConfig load(Path tikaConfig) throws IOException, TikaConfigException {
@@ -34,8 +39,9 @@ public class PipesConfig extends PipesConfigBase {
             Set<String> settings = pipesConfig.configure("pipes", is);
         }
         if (pipesConfig.getTikaConfig() == null) {
-            throw new TikaConfigException("must specify at least a <tikaConfig> element in the " +
-                    "<params> of <pipes>");
+            LOG.debug("A separate tikaConfig was not specified in the <pipes/> element in the  " +
+                    "config file; will use {} for pipes", tikaConfig);
+            pipesConfig.setTikaConfig(tikaConfig);
         }
         return pipesConfig;
     }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
index 35f424a37..88f29c5aa 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
@@ -124,6 +124,4 @@ public class FileSystemPipesIterator extends PipesIterator implements Initializa
             return FileVisitResult.CONTINUE;
         }
     }
-
-
 }
diff --git a/tika-server/tika-server-client/pom.xml b/tika-server/tika-server-client/pom.xml
index 27c59598c..7e21a1361 100644
--- a/tika-server/tika-server-client/pom.xml
+++ b/tika-server/tika-server-client/pom.xml
@@ -37,6 +37,11 @@
       <artifactId>tika-serialization</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-httpclient-commons</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
diff --git a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaAsyncHttpClient.java
similarity index 57%
copy from tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
copy to tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaAsyncHttpClient.java
index d2b844edd..8da79c736 100644
--- a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaAsyncHttpClient.java
@@ -16,24 +16,23 @@
  */
 package org.apache.tika.server.client;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.exception.TikaConfigException;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+/**
+ * Low-level class to handle the http layer.
+ */
+class TikaAsyncHttpClient extends TikaPipesHttpClient {
 
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
+    private TikaAsyncHttpClient(String baseUrl, HttpClientFactory httpClientFactory)
+            throws TikaConfigException {
+        super(baseUrl, httpClientFactory);
+    }
 
-@Disabled("turn into actual unit test")
-public class TestBasic {
+    private final String endPoint = "async";
 
-    @Test
-    public void testBasic() throws Exception {
-        Path p = Paths.get(
-                TestBasic.class.getResource("/tika-config-simple-fs-emitter.xml").toURI());
-        assertTrue(Files.isRegularFile(p));
-        String[] args = new String[]{p.toAbsolutePath().toString(), "http://localhost:9998/", "fs"};
-        TikaClientCLI.main(args);
+    String getEndpoint() {
+        return endPoint;
     }
+
 }
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index 1ebbcfd63..99e4f18e5 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -22,28 +22,28 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.tika.config.TikaConfig;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
 import org.apache.tika.pipes.FetchEmitTuple;
 
 public class TikaClient {
 
     private final Random random = new Random();
-    private final List<TikaHttpClient> clients;
+    private final List<TikaPipesHttpClient> clients;
 
 
-    private TikaClient(List<TikaHttpClient> clients) {
-
+    private TikaClient(List<TikaPipesHttpClient> clients) {
         this.clients = clients;
     }
 
-    public static TikaClient get(TikaConfig tikaConfig, List<String> tikaServers)
-            throws TikaClientConfigException {
+    public static TikaClient get(HttpClientFactory httpClientFactory, List<String> tikaServers)
+            throws TikaConfigException {
         List clients = new ArrayList<>();
         for (String url : tikaServers) {
-            clients.add(TikaHttpClient.get(url));
+            //client factory is not thread safe, create a copy per client
+            clients.add(new TikaPipesHttpClient(url, httpClientFactory.copy()));
         }
         return new TikaClient(clients);
     }
@@ -54,21 +54,13 @@ public class TikaClient {
     }*/
 
     public TikaEmitterResult parse(FetchEmitTuple fetchEmit) throws IOException, TikaException {
-        TikaHttpClient client = getHttpClient();
+        TikaPipesHttpClient client = getHttpClient();
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(fetchEmit, writer);
         return client.postJson(writer.toString());
     }
 
-    public TikaEmitterResult parseAsync(List<FetchEmitTuple> tuples)
-            throws IOException, TikaException {
-        StringWriter writer = new StringWriter();
-        JsonFetchEmitTupleList.toJson(tuples, writer);
-        TikaHttpClient client = getHttpClient();
-        return client.postJsonAsync(writer.toString());
-    }
-
-    private TikaHttpClient getHttpClient() {
+    private TikaPipesHttpClient getHttpClient() {
         if (clients.size() == 1) {
             return clients.get(0);
         }
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 8c5a1531a..ee54a9421 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -19,10 +19,7 @@ package org.apache.tika.server.client;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -37,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
-import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
@@ -47,49 +43,50 @@ public class TikaClientCLI {
     private static final Logger LOGGER = LoggerFactory.getLogger(TikaClientCLI.class);
     private static final int QUEUE_SIZE = 10000;
 
-    private final long maxWaitMs = 300000;
-
     public static void main(String[] args) throws Exception {
-        //TODO -- add an actual commandline,
         Path tikaConfigPath = Paths.get(args[0]);
-        int numThreads = Integer.parseInt(args[1]);
-        List<String> tikaServerUrls = Arrays.asList(args[2].split(","));
         TikaClientCLI cli = new TikaClientCLI();
-        cli.execute(tikaConfigPath, tikaServerUrls, numThreads);
+        cli.execute(tikaConfigPath);
     }
 
-    private void execute(Path tikaConfigPath, List<String> tikaServerUrls, int numThreads)
+    private void execute(Path tikaConfigPath)
             throws TikaException, IOException, SAXException {
-        TikaConfig config = new TikaConfig(tikaConfigPath);
-
-        ExecutorService executorService = Executors.newFixedThreadPool(numThreads + 1);
-        ExecutorCompletionService<Integer> completionService =
-                new ExecutorCompletionService<>(executorService);
-        final PipesIterator pipesIterator =
-                PipesIterator.build(tikaConfigPath);
-        final ArrayBlockingQueue<FetchEmitTuple> queue =
-                new ArrayBlockingQueue<>(QUEUE_SIZE);
-
-        completionService.submit(new PipesIteratorWrapper(pipesIterator, queue, numThreads));
-        if (tikaServerUrls.size() == numThreads) {
-            logDiffSizes(tikaServerUrls.size(), numThreads);
-            for (int i = 0; i < numThreads; i++) {
+        TikaServerClientConfig clientConfig = TikaServerClientConfig.build(tikaConfigPath);
+
+        ExecutorService executorService =
+                Executors.newFixedThreadPool(clientConfig.getNumThreads() + 1);
+
+        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
+
+        final PipesIterator pipesIterator = PipesIterator.build(tikaConfigPath);
+
+        final ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+
+        completionService.submit(new PipesIteratorWrapper(pipesIterator, queue));
+
+        if (clientConfig.getTikaEndpoints().size() == clientConfig.getNumThreads()) {
+            logDiffSizes(clientConfig.getTikaEndpoints().size(), clientConfig.getNumThreads());
+            for (int i = 0; i < clientConfig.getNumThreads(); i++) {
                 TikaClient client =
-                        TikaClient.get(config, Collections.singletonList(tikaServerUrls.get(i)));
-                completionService.submit(new FetchWorker(queue, client));
+                        TikaClient.get(clientConfig.getHttpClientFactory(),
+                                Collections.singletonList(clientConfig.getTikaEndpoints().get(i)));
+                completionService.submit(new FetchWorker(queue, client,
+                        clientConfig.getMaxWaitMillis()));
             }
         } else {
-            for (int i = 0; i < numThreads; i++) {
-                TikaClient client = TikaClient.get(config, tikaServerUrls);
-                completionService.submit(new FetchWorker(queue, client));
+            for (int i = 0; i < clientConfig.getNumThreads(); i++) {
+                TikaClient client = TikaClient.get(clientConfig.getHttpClientFactory(),
+                        clientConfig.getTikaEndpoints());
+                completionService.submit(new FetchWorker(queue, client,
+                        clientConfig.getMaxWaitMillis()));
             }
         }
 
         int finished = 0;
-        while (finished < numThreads + 1) {
+        while (finished < clientConfig.getNumThreads() + 1) {
             Future<Integer> future = null;
             try {
-                future = completionService.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+                future = completionService.poll(30, TimeUnit.SECONDS);
             } catch (InterruptedException e) {
                 //stop the world
                 LOGGER.error("", e);
@@ -101,7 +98,7 @@ public class TikaClientCLI {
                     future.get();
                 } catch (InterruptedException | ExecutionException e) {
                     //stop the world
-                    LOGGER.error("", e);
+                    LOGGER.error("critical main loop failure", e);
                     throw new RuntimeException(e);
                 }
             }
@@ -113,55 +110,21 @@ public class TikaClientCLI {
                 "Each client will randomly select a server from this list", servers, numThreads);
     }
 
-    private class AsyncFetchWorker implements Callable<Integer> {
-        private final ArrayBlockingQueue<FetchEmitTuple> queue;
-        private final TikaClient client;
-
-        public AsyncFetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
-            this.queue = queue;
-            this.client = client;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-            List<FetchEmitTuple> localCache = new ArrayList<>();
-            while (true) {
-
-                FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
-                if (t == null) {
-                    send(localCache);
-                    throw new TimeoutException("exceeded maxWaitMs");
-                }
-                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
-                    send(localCache);
-                    return 1;
-                }
-                if (localCache.size() > 20) {
-                    LOGGER.debug("about to send: {}", localCache.size());
-                    send(localCache);
-                    localCache.clear();
-                }
-                localCache.add(t);
-            }
-        }
-
-        private void send(List<FetchEmitTuple> localCache) {
-
-        }
-    }
-
     private class FetchWorker implements Callable<Integer> {
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final TikaClient client;
 
-        public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
+        private final long maxWaitMs;
+
+        public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue,
+                           TikaClient client, long maxWaitMs) {
             this.queue = queue;
             this.client = client;
+            this.maxWaitMs = maxWaitMs;
         }
 
         @Override
         public Integer call() throws Exception {
-
             while (true) {
 
                 FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
@@ -169,6 +132,8 @@ public class TikaClientCLI {
                     throw new TimeoutException("exceeded maxWaitMs");
                 }
                 if (t == PipesIterator.COMPLETED_SEMAPHORE) {
+                    //potentially blocks forever
+                    queue.put(PipesIterator.COMPLETED_SEMAPHORE);
                     return 1;
                 }
                 try {
@@ -184,15 +149,10 @@ public class TikaClientCLI {
     private static class PipesIteratorWrapper implements Callable<Integer> {
         private final PipesIterator pipesIterator;
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
-        private final int numThreads;
-
         public PipesIteratorWrapper(PipesIterator pipesIterator,
-                                    ArrayBlockingQueue<FetchEmitTuple> queue,
-                                    int numThreads) {
+                                    ArrayBlockingQueue<FetchEmitTuple> queue) {
             this.pipesIterator = pipesIterator;
             this.queue = queue;
-            this.numThreads = numThreads;
-
         }
 
         @Override
@@ -201,9 +161,8 @@ public class TikaClientCLI {
                 //potentially blocks forever
                 queue.put(t);
             }
-            for (int i = 0; i < numThreads; i ++) {
-                queue.put(PipesIterator.COMPLETED_SEMAPHORE);
-            }
+            //potentially blocks forever
+            queue.put(PipesIterator.COMPLETED_SEMAPHORE);
             return 1;
         }
     }
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaPipesHttpClient.java
similarity index 74%
rename from tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
rename to tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaPipesHttpClient.java
index 2c366a64d..4e712646d 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaPipesHttpClient.java
@@ -17,36 +17,34 @@
 package org.apache.tika.server.client;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 
-import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 
 /**
  * Low-level class to handle the http layer.
  */
-class TikaHttpClient {
+class TikaPipesHttpClient {
 
-    private static final String EMIT_ENDPOINT = "emit";
     private static final String TIKA_ENDPOINT = "tika";
-    private static final String ASYNC_ENDPOINT = "async";
-    private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClient.class);
-    private final HttpHost httpHost;
-    private final HttpClient httpClient;
-    private final String emitEndPointUrl;
-    private final String asyncEndPointUrl;
+    private static final Logger LOGGER = LoggerFactory.getLogger(TikaPipesHttpClient.class);
+    private final String endPoint = "pipes";
+
+    private final HttpClientFactory httpClientFactory;
+    private HttpClient httpClient;
+    private final String endPointUrl;
     private final String tikaUrl;
     private final int maxRetries = 3;
     //if can't make contact with Tika server, max wait time in ms
@@ -56,39 +54,37 @@ class TikaHttpClient {
 
     /**
      * @param baseUrl    url to base endpoint
-     * @param httpHost
-     * @param httpClient
      */
-    private TikaHttpClient(String baseUrl, HttpHost httpHost, HttpClient httpClient) {
+    TikaPipesHttpClient(String baseUrl, HttpClientFactory httpClientFactory)
+            throws TikaConfigException {
         if (!baseUrl.endsWith("/")) {
             baseUrl += "/";
         }
-        this.emitEndPointUrl = baseUrl + EMIT_ENDPOINT;
-        this.asyncEndPointUrl = baseUrl + ASYNC_ENDPOINT;
+        this.endPointUrl = baseUrl + getEndpoint();
         this.tikaUrl = baseUrl + TIKA_ENDPOINT;
-        this.httpHost = httpHost;
-        this.httpClient = httpClient;
+        this.httpClientFactory = httpClientFactory;
+        this.httpClient = getNewClient(baseUrl);
     }
 
-    static TikaHttpClient get(String baseUrl) throws TikaClientConfigException {
-        URI uri;
-        try {
-            uri = new URI(baseUrl);
-        } catch (URISyntaxException e) {
-            throw new TikaClientConfigException("bad URI", e);
-        }
-        HttpHost httpHost = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
-        //TODO: need to add other configuration stuff? proxy, username, password, timeouts...
-        HttpClient client = HttpClients.createDefault();
-        return new TikaHttpClient(baseUrl, httpHost, client);
+    String getEndpoint() {
+        return endPoint;
     }
 
-    public TikaEmitterResult postJsonAsync(String jsonRequest) {
-        return postJson(asyncEndPointUrl, jsonRequest);
+    private HttpClient getNewClient(String baseUrl)
+            throws TikaConfigException {
+        if (httpClient instanceof CloseableHttpClient) {
+            try {
+                ((CloseableHttpClient) httpClient).close();
+            } catch (IOException e) {
+                LOGGER.warn("exception closing client", e);
+            }
+        }
+        return httpClientFactory.build();
     }
 
+
     public TikaEmitterResult postJson(String jsonRequest) {
-        return postJson(emitEndPointUrl, jsonRequest);
+        return postJson(endPointUrl, jsonRequest);
     }
 
     private TikaEmitterResult postJson(String endPoint, String jsonRequest) {
@@ -103,7 +99,7 @@ class TikaHttpClient {
             while (tries++ < maxRetries) {
                 HttpResponse response = null;
                 try {
-                    response = httpClient.execute(httpHost, post);
+                    response = httpClient.execute(post);
                 } catch (IOException e) {
                     LOGGER.warn("Exception trying to parse", e);
                     waitForServer();
@@ -147,7 +143,7 @@ class TikaHttpClient {
 
             HttpGet get = new HttpGet(tikaUrl);
             try {
-                HttpResponse response = httpClient.execute(httpHost, get);
+                HttpResponse response = httpClient.execute(get);
                 if (response.getStatusLine().getStatusCode() == 200) {
                     LOGGER.debug("server back up");
                     return;
@@ -164,7 +160,7 @@ class TikaHttpClient {
         throw new TimeoutWaitingForTikaException("");
     }
 
-    private static class TimeoutWaitingForTikaException extends TikaException {
+    static class TimeoutWaitingForTikaException extends TikaException {
         public TimeoutWaitingForTikaException(String msg) {
             super(msg);
         }
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaServerClientConfig.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaServerClientConfig.java
new file mode 100644
index 000000000..6fe64e973
--- /dev/null
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaServerClientConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.server.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.config.ConfigBase;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+
+public class TikaServerClientConfig extends ConfigBase implements Initializable {
+
+
+    public static TikaServerClientConfig build(Path configFile)
+            throws IOException, TikaConfigException {
+        try (InputStream is = Files.newInputStream(configFile)) {
+            return buildSingle("serverClientConfig", TikaServerClientConfig.class, is);
+        }
+    }
+
+    enum MODE {
+        PIPES, ASYNC
+    }
+
+    private HttpClientFactory httpClientFactory;
+    private int numThreads = 1;
+    private MODE mode = MODE.PIPES;
+    private List<String> tikaEndpoints = new ArrayList<>();
+
+    private long maxWaitMillis = 60000;
+
+    public long getMaxWaitMillis() {
+        return maxWaitMillis;
+    }
+
+    /**
+     * maximum time in milliseconds to wait for a new fetchemittuple to be
+     * available from the queue.  The client will end if no tuple is available
+     * within this amount of time.
+     *
+     * @param maxWaitMs
+     */
+    public void setMaxWaitMillis(long maxWaitMs) {
+        this.maxWaitMillis = maxWaitMs;
+    }
+
+    public void setMode(String mode) {
+        if ("pipes".equals(mode)) {
+            this.mode = MODE.PIPES;
+            return;
+        }
+        throw new IllegalArgumentException("I regret that we have not yet implemented: '" + mode + "'");
+    }
+
+    public HttpClientFactory getHttpClientFactory() {
+        return httpClientFactory;
+    }
+
+    public void setHttpClientFactory(HttpClientFactory httpClientFactory) {
+        this.httpClientFactory = httpClientFactory;
+    }
+
+    public int getNumThreads() {
+        return numThreads;
+    }
+
+    public void setNumThreads(int numThreads) {
+        this.numThreads = numThreads;
+    }
+
+    public MODE getMode() {
+        return mode;
+    }
+
+    public List<String> getTikaEndpoints() {
+        return tikaEndpoints;
+    }
+
+    public void setTikaEndpoints(List<String> tikaEndpoints) {
+        this.tikaEndpoints = tikaEndpoints;
+    }
+
+    @Override
+    public void initialize(Map<String, Param> params) throws TikaConfigException {
+
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+        if (tikaEndpoints.size() == 0) {
+            throw new TikaConfigException("tikaEndpoints must not be empty");
+        }
+    }
+}
diff --git a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java b/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
index d2b844edd..4800f7ea6 100644
--- a/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
+++ b/tika-server/tika-server-client/src/test/java/org/apache/tika/server/client/TestBasic.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.server.client;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Files;
@@ -25,15 +26,29 @@ import java.nio.file.Paths;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
-@Disabled("turn into actual unit test")
 public class TestBasic {
 
     @Test
+    public void testConfig() throws Exception {
+        Path p = Paths.get(
+                TestBasic.class.getResource("/tika-config-simple-fs-emitter.xml").toURI());
+        assertTrue(Files.isRegularFile(p));
+
+        TikaServerClientConfig clientConfig = TikaServerClientConfig.build(p);
+        assertEquals(1, clientConfig.getNumThreads());
+        assertEquals(5, clientConfig.getHttpClientFactory().getMaxConnections());
+    }
+
+    @Test
+    @Disabled("turn this into an actual test in tika-integration-tests?")
     public void testBasic() throws Exception {
         Path p = Paths.get(
                 TestBasic.class.getResource("/tika-config-simple-fs-emitter.xml").toURI());
         assertTrue(Files.isRegularFile(p));
-        String[] args = new String[]{p.toAbsolutePath().toString(), "http://localhost:9998/", "fs"};
+        String[] args = new String[]{p.toAbsolutePath().toString()};
+        long start = System.currentTimeMillis();
         TikaClientCLI.main(args);
+        long elapsed = System.currentTimeMillis() - start;
+        System.out.println("elapsed " + elapsed);
     }
 }
diff --git a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 3938a6941..f25a1cda2 100644
--- a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -19,19 +19,23 @@
 -->
 <properties>
   <service-loader initializableProblemHandler="throw"/>
-  <pipesIterators>
-    <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
-      <params>
-        <fetcherName>fs</fetcherName>
-        <basePath>fix</basePath>
-      </params>
-    </pipesIterator>
-  </pipesIterators>
+  <parsers>
+    <parser class="org.apache.tika.parser.DefaultParser">
+      <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+    </parser>
+  </parsers>
+  <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
+    <params>
+      <fetcherName>fs</fetcherName>
+      <emitterName>fs</emitterName>
+      <basePath>UPDATE</basePath>
+    </params>
+  </pipesIterator>
   <fetchers>
     <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
       <params>
         <name>fs</name>
-        <basePath>fix</basePath>
+        <basePath>UPDATE</basePath>
       </params>
     </fetcher>
   </fetchers>
@@ -53,27 +57,50 @@
     <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
       <params>
         <name>fs</name>
-        <basePath>fix</basePath>
+        <basePath>UPDATE</basePath>
       </params>
     </emitter>
-    <!--
-    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
-        <params>
-            <param name="name" type="string">solr1</param>
-            <param name="url" type="string">http://localhost:8983/solr/tika-test</param>
-            <param name="attachmentStrategy" type="string">concatenate-content</param>
-            <param name="contentField" type="string">content</param>
-            <param name="commitWithin" type="int">10</param>
-        </params>
-    </emitter>
-    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
-        <params>
-            <param name="name" type="string">solr2</param>
-            <param name="url" type="string">http://localhost:8983/solr/tika-test</param>
-            <param name="attachmentStrategy" type="string">parent-child</param>
-            <param name="contentField" type="string">content</param>
-            <param name="commitWithin" type="int">10</param>
-        </params>
-    </emitter>-->
   </emitters>
+  <pipes>
+    <params>
+      <numClients>6</numClients>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+    </params>
+  </pipes>
+  <server>
+    <params>
+      <port>9998</port>
+      <host>localhost</host>
+      <digest>sha256</digest>
+      <digestMarkLimit>1000000</digestMarkLimit>
+      <logLevel>debug</logLevel>
+      <returnStackTrace>true</returnStackTrace>
+      <noFork>true</noFork>
+      <endpoints>
+        <endpoint>tika</endpoint>
+        <endpoint>pipes</endpoint>
+      </endpoints>
+    </params>
+  </server>
+  <serverClientConfig>
+    <params>
+      <!-- this is the only option so far -->
+      <mode>pipes</mode>
+      <!-- this should be the same number as that in the pipes' numClients -->
+      <numThreads>6</numThreads>
+      <maxWaitMillis>600000</maxWaitMillis>
+      <tikaEndpoints>
+        <tikaEndpoint>http://localhost:9998</tikaEndpoint>
+      </tikaEndpoints>
+    </params>
+    <httpClientFactory class="org.apache.tika.client.HttpClientFactory">
+      <params>
+        <maxConnections>5</maxConnections>
+      </params>
+    </httpClientFactory>
+  </serverClientConfig>
 </properties>
\ No newline at end of file