You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/28 20:03:35 UTC

[tika] branch main updated (163dd47 -> ead2e75)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git.


    from 163dd47  TIKA-3499 -- tika-server
     new f181fc4  TIKA-3499 -- tika-pipes
     new ead2e75  TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve documentation and remove the unused AsyncClientConfig.java

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/tika/pipes/PipesClient.java    | 13 +---
 .../org/apache/tika/pipes/PipesConfigBase.java     | 75 +++++++++++++++++++---
 .../java/org/apache/tika/pipes/PipesServer.java    | 24 +++----
 .../apache/tika/pipes/async/AsyncClientConfig.java | 34 ----------
 .../org/apache/tika/pipes/async/AsyncConfig.java   | 17 -----
 tika-pipes/pom.xml                                 |  9 ---
 .../emitter/opensearch/OpenSearchClientTest.java   |  2 +-
 .../pipes/emitter/solr/SolrEmitterDevTest.java     |  6 +-
 .../tika/pipes/fetcher/http/HttpFetcherTest.java   |  8 +--
 .../tika/pipes/fetcher/s3/TestS3Fetcher.java       |  6 +-
 .../src/test/java/TestCSVPipesIterator.java        | 15 +++--
 .../pipesiterator/jdbc/TestJDBCPipesIterator.java  | 16 ++---
 .../pipesiterator/s3/TestS3PipesIterator.java      |  8 +--
 13 files changed, 113 insertions(+), 120 deletions(-)
 delete mode 100644 tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java

[tika] 01/02: TIKA-3499 -- tika-pipes

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit f181fc4bb5b9e3daf73107ad5c29dc0660fed905
Author: tallison <ta...@apache.org>
AuthorDate: Wed Jul 28 11:42:46 2021 -0400

    TIKA-3499 -- tika-pipes
---
 tika-pipes/pom.xml                                       |  9 ---------
 .../pipes/emitter/opensearch/OpenSearchClientTest.java   |  2 +-
 .../tika/pipes/emitter/solr/SolrEmitterDevTest.java      |  6 +++---
 .../apache/tika/pipes/fetcher/http/HttpFetcherTest.java  |  8 ++++----
 .../org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java  |  6 +++---
 .../src/test/java/TestCSVPipesIterator.java              | 15 +++++++++------
 .../pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java  | 16 ++++++++--------
 .../tika/pipes/pipesiterator/s3/TestS3PipesIterator.java |  8 ++++----
 8 files changed, 32 insertions(+), 38 deletions(-)

diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 3bb13ea..6f3bc52 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,15 +36,6 @@
     <module>tika-pipes-iterators</module>
   </modules>
 
-  <dependencies>
-    <!-- after we migrate everything to junit5, we can get rid of this -->
-    <dependency>
-      <groupId>org.junit.vintage</groupId>
-      <artifactId>junit-vintage-engine</artifactId>
-      <version>${junit5.version}</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
   <build>
     <plugins>
       <plugin>
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClientTest.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClientTest.java
index e92607f..34653dc 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClientTest.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClientTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.tika.pipes.emitter.opensearch;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.TikaTest;
 import org.apache.tika.metadata.Metadata;
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/SolrEmitterDevTest.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/SolrEmitterDevTest.java
index 8e60daf..cb72665 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/SolrEmitterDevTest.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/SolrEmitterDevTest.java
@@ -21,8 +21,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
@@ -33,7 +33,7 @@ import org.apache.tika.metadata.filter.FieldNameMappingFilter;
  * running instance of Solr.  Please add unit tests to the
  * tika-integration-tests/tika-pipes-solr-integration-tests
  */
-@Ignore
+@Disabled
 public class SolrEmitterDevTest {
 
     @Test
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
index 82f14af..718dde3 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/test/java/org/apache/tika/pipes/fetcher/http/HttpFetcherTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.tika.pipes.fetcher.http;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -25,14 +25,14 @@ import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.zip.GZIPInputStream;
 
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.io.TemporaryResources;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 
-@Ignore("requires network connectivity")
+@Disabled("requires network connectivity")
 public class HttpFetcherTest {
 
     @Test
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
index a00f5c3..0a05ac2 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/test/java/org/apache/tika/pipes/fetcher/s3/TestS3Fetcher.java
@@ -23,14 +23,14 @@ import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Collections;
 
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 
-@Ignore("write actual unit tests")
+@Disabled("write actual unit tests")
 public class TestS3Fetcher {
     private static final String FETCH_STRING = "";
     private final Path outputFile = Paths.get("");
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
index 1293ea6..173524f 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
@@ -16,7 +16,8 @@
  */
 
 import static org.apache.tika.pipes.pipesiterator.PipesIterator.COMPLETED_SEMAPHORE;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -30,7 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.pipesiterator.csv.CSVPipesIterator;
@@ -83,16 +84,18 @@ public class TestCSVPipesIterator {
         }
     }
 
-    @Test(expected = RuntimeException.class)
+    @Test
     public void testBadFetchKeyCol() throws Exception {
         Path p = get("test-simple.csv");
         CSVPipesIterator it = new CSVPipesIterator();
         it.setFetcherName("fs");
         it.setCsvPath(p);
-        it.setFetchKeyColumn("fetchKeyDoesntExist");
-        for (FetchEmitTuple t : it) {
+        assertThrows(RuntimeException.class, () -> {
+            it.setFetchKeyColumn("fetchKeyDoesntExist");
+            for (FetchEmitTuple t : it) {
 
-        }
+            }
+        });
     }
 
     private Path get(String testFileName) throws Exception {
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
index 177950d..08bfbeb 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
@@ -16,9 +16,9 @@
  */
 package org.apache.tika.pipes.pipesiterator.jdbc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -39,9 +39,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
@@ -54,7 +54,7 @@ public class TestJDBCPipesIterator {
     static Connection CONNECTION;
     static Path DB_DIR;
 
-    @BeforeClass
+    @BeforeAll
     public static void setUp() throws Exception {
         DB_DIR = Files.createTempDirectory("tika-jdbc-pipesiterator-test-");
 
@@ -79,7 +79,7 @@ public class TestJDBCPipesIterator {
         }
     }
 
-    @AfterClass
+    @AfterAll
     public static void tearDown() throws Exception {
         CONNECTION.close();
         FileUtils.deleteDirectory(DB_DIR.toFile());
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
index c647ea6..704622e 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
@@ -16,7 +16,7 @@
  */
 package org.apache.tika.pipes.pipesiterator.s3;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,13 +29,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
-@Ignore("turn into an actual unit test")
+@Disabled("turn into an actual unit test")
 public class TestS3PipesIterator {
 
 

[tika] 02/02: TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve documentation and remove the unused AsyncClientConfig.java

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit ead2e752cee9cefaa4ec44aed70b2c77f5732bbe
Author: tallison <ta...@apache.org>
AuthorDate: Wed Jul 28 16:02:42 2021 -0400

    TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve documentation and remove the unused AsyncClientConfig.java
---
 .../java/org/apache/tika/pipes/PipesClient.java    | 13 +---
 .../org/apache/tika/pipes/PipesConfigBase.java     | 75 +++++++++++++++++++---
 .../java/org/apache/tika/pipes/PipesServer.java    | 24 +++----
 .../apache/tika/pipes/async/AsyncClientConfig.java | 34 ----------
 .../org/apache/tika/pipes/async/AsyncConfig.java   | 17 -----
 5 files changed, 81 insertions(+), 82 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index cd070c7..46248f0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.tika.pipes.async.AsyncConfig;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.utils.ProcessUtils;
 
@@ -101,8 +100,8 @@ public class PipesClient implements Closeable {
         if (! ping()) {
             restart();
         }
-        if (pipesConfig.getMaxFilesProcessed() > 0 &&
-                filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
+        if (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
+                filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess()) {
             LOG.info("restarting server after hitting max files: " + filesProcessed);
             restart();
         }
@@ -356,13 +355,7 @@ public class PipesClient implements Closeable {
         commandLine.add(
                 ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
 
-        //turn off emit batching
-        String maxForEmitBatchBytes = "0";
-        if (pipesConfig instanceof AsyncConfig) {
-            maxForEmitBatchBytes =
-                    Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
-        }
-        commandLine.add(maxForEmitBatchBytes);
+        commandLine.add(Long.toString(pipesConfig.getMaxForEmitBatchBytes()));
         commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
         commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
         LOG.debug("commandline: " + commandLine);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index e0ad8d9..f48e7fd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -25,12 +25,35 @@ import org.apache.tika.config.ConfigBase;
 
 public class PipesConfigBase extends ConfigBase {
 
-    private long timeoutMillis = 30000;
-    private long startupTimeoutMillis = 240000;
-    private long shutdownClientAfterMillis = 300000;
-    private int numClients = 10;
+    /**
+     * default size to send back to the PipesClient for batch
+     * emitting.  If an extract is larger than this, it will be emitted
+     * directly from the forked PipesServer.
+     */
+    public static final long DEFAULT_MAX_FOR_EMIT_BATCH = 100000;
+
+    public static final long DEFAULT_TIMEOUT_MILLIS = 60000;
+
+    public static final long DEFAULT_STARTUP_TIMEOUT_MILLIS = 240000;
+
+    public static final long DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS = 300000;
+
+    public static final int DEFAULT_NUM_CLIENTS = 4;
+
+    public static final int DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS = 10000;
+
+    //if an extract is larger than this, the forked PipesServer should
+    //emit the extract directly and not send the contents back to the PipesClient
+    private long maxForEmitBatchBytes = DEFAULT_MAX_FOR_EMIT_BATCH;
+    private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
+    private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
+
+    private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
+    private int numClients = DEFAULT_NUM_CLIENTS;
+
+    private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;
+
     private List<String> forkedJvmArgs = new ArrayList<>();
-    private int maxFilesProcessed = 10000;
     private Path tikaConfig;
     private String javaPath = "java";
 
@@ -38,6 +61,10 @@ public class PipesConfigBase extends ConfigBase {
         return timeoutMillis;
     }
 
+    /**
+     * How long to wait in milliseconds before timing out the forked process.
+     * @param timeoutMillis
+     */
     public void setTimeoutMillis(long timeoutMillis) {
         this.timeoutMillis = timeoutMillis;
     }
@@ -46,6 +73,12 @@ public class PipesConfigBase extends ConfigBase {
         return shutdownClientAfterMillis;
     }
 
+    /**
+     * If the client has been inactive after this many milliseconds,
+     * shut it down.
+     *
+     * @param shutdownClientAfterMillis
+     */
     public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
         this.shutdownClientAfterMillis = shutdownClientAfterMillis;
     }
@@ -66,12 +99,17 @@ public class PipesConfigBase extends ConfigBase {
         this.forkedJvmArgs = jvmArgs;
     }
 
-    public int getMaxFilesProcessed() {
-        return maxFilesProcessed;
+    /**
+     * Restart the forked PipesServer after it has processed this many files to avoid
+     * slow-building memory leaks.
+     * @return
+     */
+    public int getMaxFilesProcessedPerProcess() {
+        return maxFilesProcessedPerProcess;
     }
 
-    public void setMaxFilesProcessed(int maxFilesProcessed) {
-        this.maxFilesProcessed = maxFilesProcessed;
+    public void setMaxFilesProcessedPerProcess(int maxFilesProcessedPerProcess) {
+        this.maxFilesProcessedPerProcess = maxFilesProcessedPerProcess;
     }
 
     public Path getTikaConfig() {
@@ -97,4 +135,23 @@ public class PipesConfigBase extends ConfigBase {
     public long getStartupTimeoutMillis() {
         return startupTimeoutMillis;
     }
+
+    /**
+     *  What is the maximum bytes size per extract that
+     *  will be allowed to be shipped back to the emit queue in the forking process.
+     *  If an extract is too big, skip the emit queue and forward it directly from the
+     *  forked PipesServer.
+     *  If set to <code>0</code>, this will never send an extract back for batch emitting,
+     *  but will always emit the extract directly from the forked PipeServer.
+     *  If set to <code>-1</code>, this will always send the extract back for batch emitting.
+     *
+     * @return the threshold extract size at which to emit directly from the forked PipeServer
+     */
+    public long getMaxForEmitBatchBytes() {
+        return maxForEmitBatchBytes;
+    }
+
+    public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
+        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 68b304f..ff0d41d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -117,7 +117,10 @@ public class PipesServer implements Runnable {
     private final Path tikaConfigPath;
     private final DataInputStream input;
     private final DataOutputStream output;
-    private final long maxExtractSizeToReturn;
+    //if an extract is larger than this value, emit it directly;
+    //if it is smaller than this value, write it back to the
+    //PipesClient so that it can cache the extracts and then batch emit.
+    private final long maxForEmitBatchBytes;
     private final long serverParseTimeoutMillis;
     private final long serverWaitTimeoutMillis;
     private Parser autoDetectParser;
@@ -128,17 +131,15 @@ public class PipesServer implements Runnable {
     private volatile boolean parsing;
     private volatile long since;
 
-    //logging is fussy...the logging frameworks grab stderr and stdout
-    //before we can redirect.  slf4j complains on stderr, log4j2 unconfigured writes to stdout
-    //We can add logging later but it has to be done carefully...
+
     public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
-                       long maxExtractSizeToReturn,
+                       long maxForEmitBatchBytes,
                        long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
             throws IOException, TikaException, SAXException {
         this.tikaConfigPath = tikaConfigPath;
         this.input = new DataInputStream(in);
         this.output = new DataOutputStream(out);
-        this.maxExtractSizeToReturn = maxExtractSizeToReturn;
+        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
         this.serverParseTimeoutMillis = serverParseTimeoutMillis;
         this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
         this.parsing = false;
@@ -334,14 +335,14 @@ public class PipesServer implements Runnable {
                 t.setEmitKey(emitKey);
             }
             EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
-            if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
+            if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
                 emit(t.getId(), emitData, stack);
             } else {
-                write(emitData, stack);
+                //ignore the stack, it is stored in the emit data
+                write(emitData);
             }
         } else {
-            write(STATUS.PARSE_EXCEPTION_NO_EMIT,
-                    stack.getBytes(StandardCharsets.UTF_8));
+            write(STATUS.PARSE_EXCEPTION_NO_EMIT, stack);
         }
 
     }
@@ -509,8 +510,7 @@ public class PipesServer implements Runnable {
     }
 
 
-    private void write(EmitData emitData, String stack) {
-        //TODO -- what do we do with the stack?
+    private void write(EmitData emitData) {
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
deleted file mode 100644
index 1fe5159..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-class AsyncClientConfig {
-
-    private int fetchQueueSize = 20000;
-    private long waitTimeoutMs;
-    private long maxFilesProcessed;
-
-    public static AsyncClientConfig load(Path p) throws IOException {
-
-        return new AsyncClientConfig();
-    }
-
-
-}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index fa4a1b0..c2d5ed8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -29,7 +29,6 @@ public class AsyncConfig extends PipesConfigBase {
     private long emitWithinMillis = 10000;
     private long emitMaxEstimatedBytes = 100000;
 
-    private long maxForEmitBatchBytes = 0;
     private int queueSize = 10000;
     private int numEmitters = 1;
 
@@ -73,22 +72,6 @@ public class AsyncConfig extends PipesConfigBase {
     }
 
 
-    /**
-     *  What is the maximum bytes size per extract that
-     *  will be allowed in the emit queue.  If an extract is too
-     *  big, skip the emit queue and forward it directly from the processor.  If
-     *  set to <code>0</code>, this will never send an extract back for batch emitting,
-     *  but will emit the extract directly from the processor.
-     * @return
-     */
-    public long getMaxForEmitBatchBytes() {
-        return maxForEmitBatchBytes;
-    }
-
-    public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
-        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
-    }
-
     public void setNumEmitters(int numEmitters) {
         this.numEmitters = numEmitters;
     }