You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/04/20 20:29:25 UTC

[tika] branch main updated: TIKA-3362 -- enable configuration of content type, writelimit and max embedded resources for async, FetchEmitTuple

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new cf2febc  TIKA-3362 -- enable configuration of content type, writelimit and max embedded resources for async, FetchEmitTuple
cf2febc is described below

commit cf2febcd2f931f77c3bf181b7972a083967eec85
Author: tallison <ta...@apache.org>
AuthorDate: Tue Apr 20 16:29:04 2021 -0400

    TIKA-3362 -- enable configuration of content type, writelimit and max embedded resources for async, FetchEmitTuple
---
 .../pipes/{fetchiterator => }/FetchEmitTuple.java  | 57 ++++++++-------
 .../java/org/apache/tika/pipes/HandlerConfig.java  | 75 ++++++++++++++++++++
 .../tika/pipes/fetchiterator/FetchIterator.java    | 31 ++++++++-
 .../fetchiterator/FileSystemFetchIterator.java     |  1 +
 .../fetchiterator/FileSystemFetchIteratorTest.java |  2 +
 .../pipes/fetchiterator/csv/CSVFetchIterator.java  |  9 ++-
 .../src/test/java/TestCSVFetchIterator.java        |  2 +-
 .../fetchiterator/jdbc/JDBCFetchIterator.java      | 13 ++--
 .../fetchiterator/jdbc/TestJDBCFetchIterator.java  |  2 +-
 .../pipes/fetchiterator/s3/S3FetchIterator.java    |  6 +-
 .../fetchiterator/s3/TestS3FetchIterator.java      |  2 +-
 .../java/org/apache/tika/pipes/async/AsyncCli.java |  2 +-
 .../org/apache/tika/pipes/async/AsyncData.java     |  2 +-
 .../apache/tika/pipes/async/AsyncProcessor.java    |  2 +-
 .../org/apache/tika/pipes/async/AsyncTask.java     |  2 +-
 .../org/apache/tika/pipes/async/AsyncWorker.java   |  2 +-
 .../tika/pipes/async/AsyncWorkerProcess.java       |  2 +-
 .../tika/pipes/async/AsyncProcessorTest.java       |  2 +-
 .../apache/tika/pipes/PipeIntegrationTests.java    |  1 -
 .../metadata/serialization/JsonFetchEmitTuple.java | 81 ++++++++++++++++------
 .../serialization/JsonFetchEmitTupleList.java      |  7 +-
 .../serialization/JsonFetchEmitTupleListTest.java  |  3 +-
 .../serialization/JsonFetchEmitTupleTest.java      | 14 ++--
 .../org/apache/tika/server/client/TikaClient.java  |  2 +-
 .../apache/tika/server/client/TikaClientCLI.java   |  2 +-
 .../apache/tika/server/client/TikaHttpClient.java  |  1 -
 .../apache/tika/server/core/TikaServerProcess.java |  2 +-
 .../tika/server/core/resource/AsyncParser.java     |  4 +-
 .../tika/server/core/resource/AsyncRequest.java    |  2 +-
 .../tika/server/core/resource/AsyncResource.java   |  2 +-
 .../tika/server/core/resource/EmitterResource.java | 60 ++++++++++------
 .../core/resource/RecursiveMetadataResource.java   | 54 ++++++++-------
 .../apache/tika/server/core/TikaEmitterTest.java   | 75 +++++++++++++++++++-
 .../core/TikaServerAsyncIntegrationTest.java       |  6 +-
 .../core/TikaServerEmitterIntegrationTest.java     |  7 +-
 .../server/core/TikaServerIntegrationTest.java     |  1 -
 36 files changed, 398 insertions(+), 140 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
similarity index 70%
rename from tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
rename to tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 974aa3b..35621e9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes;
+
+import java.util.Objects;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitKey;
@@ -23,19 +25,28 @@ import org.apache.tika.pipes.fetcher.FetchKey;
 public class FetchEmitTuple {
 
     public static final ON_PARSE_EXCEPTION DEFAULT_ON_PARSE_EXCEPTION = ON_PARSE_EXCEPTION.EMIT;
+
+    public enum ON_PARSE_EXCEPTION {
+        SKIP, EMIT
+    }
+
     private final FetchKey fetchKey;
     private EmitKey emitKey;
     private final Metadata metadata;
     private final ON_PARSE_EXCEPTION onParseException;
+    private HandlerConfig handlerConfig;
+
     public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata metadata) {
-        this(fetchKey, emitKey, metadata, DEFAULT_ON_PARSE_EXCEPTION);
+        this(fetchKey, emitKey, metadata, HandlerConfig.DEFAULT_HANDLER_CONFIG,
+                DEFAULT_ON_PARSE_EXCEPTION);
     }
 
     public FetchEmitTuple(FetchKey fetchKey, EmitKey emitKey, Metadata metadata,
-                          ON_PARSE_EXCEPTION onParseException) {
+                          HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException) {
         this.fetchKey = fetchKey;
         this.emitKey = emitKey;
         this.metadata = metadata;
+        this.handlerConfig = handlerConfig;
         this.onParseException = onParseException;
     }
 
@@ -58,10 +69,13 @@ public class FetchEmitTuple {
     public void setEmitKey(EmitKey emitKey) {
         this.emitKey = emitKey;
     }
-    @Override
-    public String toString() {
-        return "FetchEmitTuple{" + "fetchKey=" + fetchKey + ", emitKey=" + emitKey + ", metadata=" +
-                metadata + ", onParseException=" + onParseException + '}';
+
+    public void setHandlerConfig(HandlerConfig handlerConfig) {
+        this.handlerConfig = handlerConfig;
+    }
+
+    public HandlerConfig getHandlerConfig() {
+        return handlerConfig == null ? HandlerConfig.DEFAULT_HANDLER_CONFIG : handlerConfig;
     }
 
     @Override
@@ -72,31 +86,22 @@ public class FetchEmitTuple {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-
         FetchEmitTuple that = (FetchEmitTuple) o;
-
-        if (fetchKey != null ? !fetchKey.equals(that.fetchKey) : that.fetchKey != null) {
-            return false;
-        }
-        if (emitKey != null ? !emitKey.equals(that.emitKey) : that.emitKey != null) {
-            return false;
-        }
-        if (metadata != null ? !metadata.equals(that.metadata) : that.metadata != null) {
-            return false;
-        }
-        return onParseException == that.onParseException;
+        return Objects.equals(fetchKey, that.fetchKey) && Objects.equals(emitKey, that.emitKey) &&
+                Objects.equals(metadata, that.metadata) &&
+                onParseException == that.onParseException &&
+                Objects.equals(handlerConfig, that.handlerConfig);
     }
 
     @Override
     public int hashCode() {
-        int result = fetchKey != null ? fetchKey.hashCode() : 0;
-        result = 31 * result + (emitKey != null ? emitKey.hashCode() : 0);
-        result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
-        result = 31 * result + (onParseException != null ? onParseException.hashCode() : 0);
-        return result;
+        return Objects.hash(fetchKey, emitKey, metadata, onParseException, handlerConfig);
     }
 
-    public enum ON_PARSE_EXCEPTION {
-        SKIP, EMIT
+    @Override
+    public String toString() {
+        return "FetchEmitTuple{" + "fetchKey=" + fetchKey + ", emitKey=" + emitKey + ", metadata=" +
+                metadata + ", onParseException=" + onParseException + ", handlerConfig=" +
+                handlerConfig + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
new file mode 100644
index 0000000..93e7a98
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.util.Objects;
+
+import org.apache.tika.sax.BasicContentHandlerFactory;
+
+public class HandlerConfig {
+
+    public static HandlerConfig DEFAULT_HANDLER_CONFIG =
+            new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1, -1);
+
+    private BasicContentHandlerFactory.HANDLER_TYPE type =
+            BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+    int writeLimit = -1;
+    int maxEmbeddedResources = -1;
+
+    public HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE type, int writeLimit,
+                         int maxEmbeddedResources) {
+        this.type = type;
+        this.writeLimit = writeLimit;
+        this.maxEmbeddedResources = maxEmbeddedResources;
+    }
+
+    public BasicContentHandlerFactory.HANDLER_TYPE getType() {
+        return type;
+    }
+
+    public int getWriteLimit() {
+        return writeLimit;
+    }
+
+    public int getMaxEmbeddedResources() {
+        return maxEmbeddedResources;
+    }
+
+    @Override
+    public String toString() {
+        return "HandlerConfig{" + "type=" + type + ", writeLimit=" + writeLimit +
+                ", maxEmbeddedResources=" + maxEmbeddedResources + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        HandlerConfig that = (HandlerConfig) o;
+        return writeLimit == that.writeLimit && maxEmbeddedResources == that.maxEmbeddedResources &&
+                type == that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, writeLimit, maxEmbeddedResources);
+    }
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
index 2b9273d..82732ff 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
@@ -35,6 +35,10 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaTimeoutException;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+
 /**
  * Abstract class that handles the testing for timeouts/thread safety
  * issues.  Concrete classes implement the blocking {@link #enqueue()}.
@@ -57,9 +61,14 @@ public abstract class FetchIterator
     private int queueSize = DEFAULT_QUEUE_SIZE;
     private String fetcherName;
     private String emitterName;
-    private int added = 0;
     private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException =
             FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
+    private BasicContentHandlerFactory.HANDLER_TYPE handlerType =
+            BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+    private int writeLimit = -1;
+    private int maxEmbeddedResources = -1;
+
+    private int added = 0;
     private FutureTask<Integer> futureTask;
 
     public String getFetcherName() {
@@ -109,12 +118,32 @@ public abstract class FetchIterator
         this.onParseException = onParseException;
     }
 
+    @Field
+    public void setHandlerType(String handlerType) {
+        this.handlerType = BasicContentHandlerFactory
+                .parseHandlerType(handlerType, BasicContentHandlerFactory.HANDLER_TYPE.TEXT);
+    }
+
+    @Field
+    public void setWriteLimit(int writeLimit) {
+        this.writeLimit = writeLimit;
+    }
+
+    @Field
+    void setMaxEmbeddedResources(int maxEmbeddedResources) {
+        this.maxEmbeddedResources = maxEmbeddedResources;
+    }
+
     public Integer call() throws Exception {
         enqueue();
         tryToAdd(COMPLETED_SEMAPHORE);
         return added;
     }
 
+    protected HandlerConfig getHandlerConfig() {
+        return new HandlerConfig(handlerType, writeLimit, maxEmbeddedResources);
+    }
+
     protected abstract void enqueue() throws IOException, TimeoutException, InterruptedException;
 
     protected void tryToAdd(FetchEmitTuple p) throws InterruptedException, TimeoutException {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
index 3487b63..2f8224e 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
@@ -31,6 +31,7 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
index 4e314bc..c87e3d0 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
@@ -30,6 +30,8 @@ import java.util.stream.Stream;
 
 import org.junit.Test;
 
+import org.apache.tika.pipes.FetchEmitTuple;
+
 
 public class FileSystemFetchIteratorTest {
 
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
index 3063124..6f65140 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
@@ -40,9 +40,10 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.apache.tika.utils.StringUtils;
 
@@ -92,6 +93,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
         this.emitKeyColumn = emitKeyColumn;
     }
 
+    @Field
     public void setCsvPath(Path csvPath) {
         this.csvPath = csvPath;
     }
@@ -110,7 +112,7 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
             }
 
             checkFetchEmitValidity(fetcherName, emitterName, fetchEmitKeyIndices, headers);
-
+            HandlerConfig handlerConfig = getHandlerConfig();
             for (CSVRecord record : records) {
                 String fetchKey = getFetchKey(fetchEmitKeyIndices, record);
                 String emitKey = getEmitKey(fetchEmitKeyIndices, record);
@@ -123,7 +125,8 @@ public class CSVFetchIterator extends FetchIterator implements Initializable {
                 }
                 Metadata metadata = loadMetadata(fetchEmitKeyIndices, headers, record);
                 tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
-                        new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
+                        new EmitKey(emitterName, emitKey), metadata, handlerConfig,
+                        getOnParseException()));
             }
         }
     }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
index fd86a05..31ff496 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
 
 public class TestCSVFetchIterator {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
index f1ca52c..a12cd06 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
@@ -39,9 +39,10 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 import org.apache.tika.utils.StringUtils;
 
@@ -99,6 +100,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         this.select = select;
     }
 
+
     @Override
     protected void enqueue() throws InterruptedException, IOException, TimeoutException {
         String fetcherName = getFetcherName();
@@ -106,6 +108,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         FetchEmitKeyIndices fetchEmitKeyIndices = null;
         List<String> headers = new ArrayList<>();
         int rowCount = 0;
+        HandlerConfig handlerConfig = getHandlerConfig();
         LOGGER.debug("select: {}", select);
         try (Statement st = db.createStatement()) {
             try (ResultSet rs = st.executeQuery(select)) {
@@ -116,7 +119,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
                                 headers);
                     }
                     try {
-                        processRow(fetcherName, emitterName, headers, fetchEmitKeyIndices, rs);
+                        processRow(fetcherName, emitterName, headers, fetchEmitKeyIndices, rs,
+                                handlerConfig);
                     } catch (SQLException e) {
                         LOGGER.warn("Failed to insert: " + rs, e);
                     }
@@ -153,7 +157,8 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
     }
 
     private void processRow(String fetcherName, String emitterName, List<String> headers,
-                            FetchEmitKeyIndices fetchEmitKeyIndices, ResultSet rs)
+                            FetchEmitKeyIndices fetchEmitKeyIndices, ResultSet rs,
+                            HandlerConfig handlerConfig)
             throws SQLException, TimeoutException, InterruptedException {
         Metadata metadata = new Metadata();
         String fetchKey = "";
@@ -182,7 +187,7 @@ public class JDBCFetchIterator extends FetchIterator implements Initializable {
         }
 
         tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
-                new EmitKey(emitterName, emitKey), metadata, getOnParseException()));
+                new EmitKey(emitterName, emitKey), metadata, handlerConfig, getOnParseException()));
     }
 
     private String toString(ResultSet rs) throws SQLException {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
index 6e549de..2473560 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
@@ -45,7 +45,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.tika.config.TikaConfig;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class TestJDBCFetchIterator {
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
index 988f149..c8f8663 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
@@ -39,9 +39,10 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class S3FetchIterator extends FetchIterator implements Initializable {
@@ -126,12 +127,13 @@ public class S3FetchIterator extends FetchIterator implements Initializable {
         String emitterName = getEmitterName();
         long start = System.currentTimeMillis();
         int count = 0;
+        HandlerConfig handlerConfig = getHandlerConfig();
         for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, prefix)) {
 
             long elapsed = System.currentTimeMillis() - start;
             LOGGER.debug("adding ({}) {} in {} ms", count, summary.getKey(), elapsed);
             tryToAdd(new FetchEmitTuple(new FetchKey(fetcherName, summary.getKey()),
-                    new EmitKey(emitterName, summary.getKey()), new Metadata(),
+                    new EmitKey(emitterName, summary.getKey()), new Metadata(), handlerConfig,
                     getOnParseException()));
             count++;
         }
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
index 50cb819..debed19 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ b/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 @Ignore("turn into an actual unit test")
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
index cb8347f..4321c2c 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class AsyncCli {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
index df80929..b34f872 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
@@ -21,10 +21,10 @@ import java.util.List;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncData extends EmitData {
 
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index efd6fec..c718a48 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -46,7 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class AsyncProcessor implements Closeable {
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
index e0c214a..d9cca42 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
@@ -18,7 +18,7 @@ package org.apache.tika.pipes.async;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 
 public class AsyncTask extends FetchEmitTuple {
 
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
index c26ee0f..ee3882b 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 
 /**
  * This controls monitoring of the AsyncWorkerProcess
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
index 988748f..43f96df 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
@@ -59,9 +59,9 @@ import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
 import org.apache.tika.utils.StringUtils;
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 04890b3..2408a63 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -37,9 +37,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 public class AsyncProcessorTest {
 
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 7fbba83..ef2e816 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -47,7 +47,6 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.s3.S3Emitter;
 import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 @Ignore("turn these into actual tests")
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 78ba9d2..9cd7011 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -16,22 +16,25 @@
  */
 package org.apache.tika.metadata.serialization;
 
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Locale;
+
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.utils.StringUtils;
 
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.util.Locale;
-
 public class JsonFetchEmitTuple {
 
     public static final String FETCHER = "fetcher";
@@ -39,7 +42,11 @@ public class JsonFetchEmitTuple {
     public static final String EMITTER = "emitter";
     public static final String EMITKEY = "emitKey";
     public static final String METADATAKEY = "metadata";
+    public static final String HANDLER_CONFIG = "handlerConfig";
     public static final String ON_PARSE_EXCEPTION = "onParseException";
+    private static final String HANDLER_CONFIG_TYPE = "type";
+    private static final String HANDLER_CONFIG_WRITE_LIMIT = "writeLimit";
+    private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
 
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
@@ -61,7 +68,9 @@ public class JsonFetchEmitTuple {
         String fetchKey = null;
         String emitterName = null;
         String emitKey = null;
-        FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = null;
+        FetchEmitTuple.ON_PARSE_EXCEPTION onParseException =
+                FetchEmitTuple.DEFAULT_ON_PARSE_EXCEPTION;
+        HandlerConfig handlerConfig = HandlerConfig.DEFAULT_HANDLER_CONFIG;
         Metadata metadata = new Metadata();
         while (token != JsonToken.END_OBJECT) {
             if (token != JsonToken.FIELD_NAME) {
@@ -89,24 +98,45 @@ public class JsonFetchEmitTuple {
                 } else if ("emit".equalsIgnoreCase(value)) {
                     onParseException = FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
                 } else {
-                    throw new IOException(ON_PARSE_EXCEPTION +
-                            " must be either 'skip' or 'emit'");
+                    throw new IOException(ON_PARSE_EXCEPTION + " must be either 'skip' or 'emit'");
                 }
+            } else if (HANDLER_CONFIG.equals(name)) {
+                handlerConfig = getHandlerConfig(jParser);
             }
             token = jParser.nextToken();
         }
 
-        if (onParseException == null) {
-            return new FetchEmitTuple(
-                    new FetchKey(fetcherName, fetchKey),
-                    new EmitKey(emitterName, emitKey), metadata
-            );
-        } else {
-            return new FetchEmitTuple(
-                    new FetchKey(fetcherName, fetchKey),
-                    new EmitKey(emitterName, emitKey), metadata, onParseException
-            );
+        return new FetchEmitTuple(new FetchKey(fetcherName, fetchKey),
+                new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException);
+    }
+
+    private static HandlerConfig getHandlerConfig(JsonParser jParser) throws IOException {
+
+        JsonToken token = jParser.nextToken();
+        if (token != JsonToken.START_OBJECT) {
+            throw new IOException("required start object, but see: " + token.name());
         }
+        BasicContentHandlerFactory.HANDLER_TYPE handlerType =
+                BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+        int writeLimit = -1;
+        int maxEmbeddedResources = -1;
+        String fieldName = jParser.nextFieldName();
+        while (fieldName != null) {
+            if (HANDLER_CONFIG_TYPE.equals(fieldName)) {
+                String value = jParser.nextTextValue();
+                handlerType = BasicContentHandlerFactory
+                        .parseHandlerType(value, HandlerConfig.DEFAULT_HANDLER_CONFIG.getType());
+            } else if (HANDLER_CONFIG_WRITE_LIMIT.equals(fieldName)) {
+                writeLimit = jParser.nextIntValue(-1);
+            } else if (HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES.equals(fieldName)) {
+                maxEmbeddedResources = jParser.nextIntValue(-1);
+            } else {
+                throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
+                        "' in the context of a handler config");
+            }
+            fieldName = jParser.nextFieldName();
+        }
+        return new HandlerConfig(handlerType, writeLimit, maxEmbeddedResources);
     }
 
     private static String getValue(JsonParser jParser) throws IOException {
@@ -142,6 +172,17 @@ public class JsonFetchEmitTuple {
             jsonGenerator.writeFieldName(METADATAKEY);
             JsonMetadata.writeMetadataObject(t.getMetadata(), jsonGenerator, false);
         }
+        if (t.getHandlerConfig() != HandlerConfig.DEFAULT_HANDLER_CONFIG) {
+            jsonGenerator.writeFieldName(HANDLER_CONFIG);
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeStringField(HANDLER_CONFIG_TYPE,
+                    t.getHandlerConfig().getType().name().toLowerCase(Locale.ROOT));
+            jsonGenerator.writeNumberField(HANDLER_CONFIG_WRITE_LIMIT,
+                    t.getHandlerConfig().getWriteLimit());
+            jsonGenerator.writeNumberField(HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES,
+                    t.getHandlerConfig().getMaxEmbeddedResources());
+            jsonGenerator.writeEndObject();
+        }
         jsonGenerator.writeStringField(ON_PARSE_EXCEPTION,
                 t.getOnParseException().name().toLowerCase(Locale.US));
         jsonGenerator.writeEndObject();
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
index 4791aee..6b8bdcb 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleList.java
@@ -20,11 +20,8 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
-import org.apache.tika.utils.StringUtils;
+
+import org.apache.tika.pipes.FetchEmitTuple;
 
 import java.io.IOException;
 import java.io.Reader;
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
index 80c37f4..6eba588 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleListTest.java
@@ -19,7 +19,8 @@ package org.apache.tika.metadata.serialization;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+
 import org.junit.Test;
 
 import java.io.Reader;
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
index 2c19afb..7b3bbfe 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
@@ -17,9 +17,12 @@
 package org.apache.tika.metadata.serialization;
 
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+
 import org.junit.Test;
 
 import java.io.Reader;
@@ -39,13 +42,12 @@ public class JsonFetchEmitTupleTest {
         m.add("m2", "v3");
         m.add("m3", "v4");
 
-        FetchEmitTuple t = new FetchEmitTuple(
-                new FetchKey("my_fetcher", "fetchKey1"),
-                        new EmitKey("my_emitter", "emitKey1"),
-                        m);
+        FetchEmitTuple t = new FetchEmitTuple(new FetchKey("my_fetcher", "fetchKey1"),
+                new EmitKey("my_emitter", "emitKey1"), m,
+                new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, 10000, 10),
+                FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);
-        System.out.println(writer.toString());
         Reader reader = new StringReader(writer.toString());
         FetchEmitTuple deserialized = JsonFetchEmitTuple.fromJson(reader);
         assertEquals(t, deserialized);
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
index a9f7319..1ebbcfd 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClient.java
@@ -26,7 +26,7 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 
 public class TikaClient {
 
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 5e583db..83d2128 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -39,7 +39,7 @@ import org.xml.sax.SAXException;
 
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.fetchiterator.FetchIterator;
 
 public class TikaClientCLI {
diff --git a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
index 9a5ff7f..e0e1585 100644
--- a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
+++ b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaHttpClient.java
@@ -92,7 +92,6 @@ class TikaHttpClient {
     }
 
     private TikaEmitterResult postJson(String endPoint, String jsonRequest) {
-        System.out.println("NED:" + endPoint);
         HttpPost post = new HttpPost(endPoint);
         ByteArrayEntity entity = new ByteArrayEntity(jsonRequest.getBytes(StandardCharsets.UTF_8));
         post.setEntity(entity);
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 3b29bfd..390d731 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -57,8 +57,8 @@ import org.apache.tika.config.TikaConfig;
 import org.apache.tika.parser.DigestingParser;
 import org.apache.tika.parser.digestutils.BouncyCastleDigester;
 import org.apache.tika.parser.digestutils.CommonsDigester;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.server.core.resource.AsyncEmitter;
 import org.apache.tika.server.core.resource.AsyncParser;
 import org.apache.tika.server.core.resource.AsyncResource;
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
index bf1019e..773297d 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
@@ -30,9 +30,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.utils.StringUtils;
 
 /**
@@ -112,7 +112,7 @@ public class AsyncParser implements Callable<Integer> {
                 .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
             metadataList = RecursiveMetadataResource
                     .parseMetadata(stream, metadata, new MultivaluedHashMap<>(),
-                            new UriInfoImpl(new MessageImpl()), "text");
+                            new UriInfoImpl(new MessageImpl()), t.getHandlerConfig());
         } catch (SecurityException e) {
             throw e;
         } catch (Exception e) {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
index d61c7b7..5636e1d 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncRequest.java
@@ -18,7 +18,7 @@ package org.apache.tika.server.core.resource;
 
 import java.util.List;
 
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
 
 public class AsyncRequest {
     private final List<FetchEmitTuple> tuples;
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 001926b..dea21c4 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -44,12 +44,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
+import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.FetchKey;
 import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 @Path("/async")
 public class AsyncResource {
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
index 16aa76e..54fe296 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/EmitterResource.java
@@ -25,6 +25,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -43,11 +45,12 @@ import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.Emitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.StringUtils;
 
@@ -60,6 +63,7 @@ public class EmitterResource {
      */
     public static final String EMIT_KEY_FOR_HTTP_HEADER = "emit-key";
     private static final String EMITTER_PARAM = "emitter";
+    private static final String HANDLER_PARAM = "type";
     private static final String FETCHER_NAME_ABBREV = "fn";
     private static final String FETCH_KEY_ABBREV = "fk";
     private static final String EMIT_KEY_ABBREV = "ek";
@@ -77,12 +81,13 @@ public class EmitterResource {
     }
 
     /**
-     * @param is          input stream is ignored in 'get'
+     * @param is              input stream is ignored in 'get'
      * @param httpHeaders
      * @param info
      * @param emitterName
-     * @param fetcherName specify the fetcherName in the url's query section
-     * @param fetchKey    specify the fetch key in the url's query section
+     * @param fetcherName     specify the fetcherName in the url's query section
+     * @param fetchKey        specify the fetch key in the url's query section
+     * @param handlerTypeName text, html, xml, body, ignore; default is text
      * @return
      * @throws Exception
      */
@@ -94,20 +99,24 @@ public class EmitterResource {
                                         @PathParam(EMITTER_PARAM) String emitterName,
                                         @QueryParam(FETCHER_NAME_ABBREV) String fetcherName,
                                         @QueryParam(FETCH_KEY_ABBREV) String fetchKey,
-                                        @QueryParam(EMIT_KEY_ABBREV) String emitKey)
+                                        @QueryParam(EMIT_KEY_ABBREV) String emitKey,
+                                        @QueryParam(HANDLER_PARAM) String handlerTypeName)
             throws Exception {
         Metadata metadata = new Metadata();
         Fetcher fetcher = TikaResource.getConfig().getFetcherManager().getFetcher(fetcherName);
         List<Metadata> metadataList;
         try (InputStream fetchedIs = fetcher.fetch(fetchKey, metadata)) {
+            HandlerConfig handlerConfig = RecursiveMetadataResource
+                    .buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName);
             metadataList = RecursiveMetadataResource
                     .parseMetadata(fetchedIs, metadata, httpHeaders.getRequestHeaders(), info,
-                            "text");
+                            handlerConfig);
         }
         emitKey = StringUtils.isBlank(emitKey) ? fetchKey : emitKey;
         return emit(new EmitKey(emitterName, emitKey), metadataList);
     }
 
+
     /**
      * The user puts the raw bytes of the file and specifies the emitter
      * as elsewhere.  This will not trigger a fetcher.  If you want a
@@ -117,28 +126,38 @@ public class EmitterResource {
      * {@link TikaCoreProperties#TIKA_CONTENT}
      * <p>
      * Must specify an emitter in the path, e.g. /emit/solr
+     * <p>
+     * Optionally, may specify handler, e.g. /emit/solr/xml
      *
-     * @param info        uri info
-     * @param emitterName which emitter to use; emitters must be configured in
-     *                    the TikaConfig file.
+     * @param info      uri info
+     * @param fullParam which emitter to use; emitters must be configured in
+     *                  the TikaConfig file.
      * @return InputStream that can be deserialized as a list of {@link Metadata} objects
      * @throws Exception
      */
     @PUT
     @Produces("application/json")
-    @Path("{" + EMITTER_PARAM + " : (\\w+)?}")
-    public Map<String, String> putRmeta(InputStream is,
-                                        @Context HttpHeaders httpHeaders,
+    @Path("{" + EMITTER_PARAM + " : (\\w+(/(text|body|xml|ignore))?)}")
+    public Map<String, String> putRmeta(InputStream is, @Context HttpHeaders httpHeaders,
                                         @Context UriInfo info,
-                                        @PathParam(EMITTER_PARAM) String emitterName
-    ) throws Exception {
+                                        @PathParam(EMITTER_PARAM) String fullParam)
+            throws Exception {
 
+        Matcher m = Pattern.compile("(\\w+)(?:/(\\w+))?").matcher(fullParam);
+        String emitterName = fullParam;
+        String handlerTypeName = "text";
+        if (m.find()) {
+            emitterName = m.group(1);
+            if (m.groupCount() > 1) {
+                handlerTypeName = m.group(2);
+            }
+        }
         Metadata metadata = new Metadata();
         String emitKey = httpHeaders.getHeaderString(EMIT_KEY_FOR_HTTP_HEADER);
-        List<Metadata> metadataList =
-                RecursiveMetadataResource.parseMetadata(is,
-                        metadata,
-                        httpHeaders.getRequestHeaders(), info, "text");
+        HandlerConfig handlerConfig = RecursiveMetadataResource
+                .buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName);
+        List<Metadata> metadataList = RecursiveMetadataResource
+                .parseMetadata(is, metadata, httpHeaders.getRequestHeaders(), info, handlerConfig);
         return emit(new EmitKey(emitterName, emitKey), metadataList);
     }
 
@@ -147,7 +166,7 @@ public class EmitterResource {
      * json object that contains an emitter and a fetcherString key with
      * the key to fetch the inputStream. Optionally, it may contain a metadata
      * object that will be used to populate the metadata key for pass
-     * through of metadata from the client.
+     * through of metadata from the client. It may also include a handler config.
      * <p>
      * The extracted text content is stored with the key
      * {@link TikaCoreProperties#TIKA_CONTENT}
@@ -174,7 +193,8 @@ public class EmitterResource {
                 .fetch(t.getFetchKey().getFetchKey(), metadata)) {
 
             metadataList = RecursiveMetadataResource
-                    .parseMetadata(stream, metadata, httpHeaders.getRequestHeaders(), info, "text");
+                    .parseMetadata(stream, metadata, httpHeaders.getRequestHeaders(), info,
+                            t.getHandlerConfig());
         } catch (Error error) {
             return returnError(t.getEmitKey().getEmitterName(), error);
         }
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index ae55b52..cd43f1b 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -43,6 +43,7 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
 import org.apache.tika.server.core.MetadataList;
@@ -58,34 +59,21 @@ public class RecursiveMetadataResource {
 
     public static List<Metadata> parseMetadata(InputStream is, Metadata metadata,
                                                MultivaluedMap<String, String> httpHeaders,
-                                               UriInfo info, String handlerTypeName)
+                                               UriInfo info, HandlerConfig handlerConfig)
             throws Exception {
 
         final ParseContext context = new ParseContext();
         Parser parser = TikaResource.createParser();
-        // TODO: parameterize choice of max chars/max embedded attachments
-        RecursiveParserWrapper wrapper = new RecursiveParserWrapper(parser);
-
 
+        RecursiveParserWrapper wrapper = new RecursiveParserWrapper(parser);
         fillMetadata(parser, metadata, httpHeaders);
         fillParseContext(httpHeaders, metadata, context);
         TikaResource.logRequest(LOG, "/rmeta", metadata);
-
-        int writeLimit = -1;
-        if (httpHeaders.containsKey("writeLimit")) {
-            writeLimit = Integer.parseInt(httpHeaders.getFirst("writeLimit"));
-        }
-
-        int maxEmbeddedResources = -1;
-        if (httpHeaders.containsKey("maxEmbeddedResources")) {
-            maxEmbeddedResources = Integer.parseInt(httpHeaders.getFirst("maxEmbeddedResources"));
-        }
-
-        BasicContentHandlerFactory.HANDLER_TYPE type =
-                BasicContentHandlerFactory.parseHandlerType(handlerTypeName, DEFAULT_HANDLER_TYPE);
-        RecursiveParserWrapperHandler handler =
-                new RecursiveParserWrapperHandler(new BasicContentHandlerFactory(type, writeLimit),
-                        maxEmbeddedResources, TikaResource.getConfig().getMetadataFilter());
+        BasicContentHandlerFactory.HANDLER_TYPE type = handlerConfig.getType();
+        RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
+                new BasicContentHandlerFactory(type, handlerConfig.getWriteLimit()),
+                handlerConfig.getMaxEmbeddedResources(),
+                TikaResource.getConfig().getMetadataFilter());
         try {
             TikaResource.parse(wrapper, LOG, "/rmeta", is, handler, metadata, context);
         } catch (TikaServerParseException e) {
@@ -133,7 +121,24 @@ public class RecursiveMetadataResource {
             throws Exception {
         return Response
                 .ok(parseMetadataToMetadataList(att.getObject(InputStream.class), new Metadata(),
-                        att.getHeaders(), info, handlerTypeName)).build();
+                        att.getHeaders(), info,
+                        buildHandlerConfig(att.getHeaders(), handlerTypeName))).build();
+    }
+
+    static HandlerConfig buildHandlerConfig(MultivaluedMap<String, String> httpHeaders,
+                                            String handlerTypeName) {
+        int writeLimit = -1;
+        if (httpHeaders.containsKey("writeLimit")) {
+            writeLimit = Integer.parseInt(httpHeaders.getFirst("writeLimit"));
+        }
+
+        int maxEmbeddedResources = -1;
+        if (httpHeaders.containsKey("maxEmbeddedResources")) {
+            maxEmbeddedResources = Integer.parseInt(httpHeaders.getFirst("maxEmbeddedResources"));
+        }
+        return new HandlerConfig(
+                BasicContentHandlerFactory.parseHandlerType(handlerTypeName, DEFAULT_HANDLER_TYPE),
+                writeLimit, maxEmbeddedResources);
     }
 
     /**
@@ -170,13 +175,14 @@ public class RecursiveMetadataResource {
         Metadata metadata = new Metadata();
         return Response.ok(parseMetadataToMetadataList(
                 TikaResource.getInputStream(is, metadata, httpHeaders), metadata,
-                httpHeaders.getRequestHeaders(), info, handlerTypeName)).build();
+                httpHeaders.getRequestHeaders(), info,
+                buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName))).build();
     }
 
     private MetadataList parseMetadataToMetadataList(InputStream is, Metadata metadata,
                                                      MultivaluedMap<String, String> httpHeaders,
-                                                     UriInfo info, String handlerTypeName)
+                                                     UriInfo info, HandlerConfig handlerConfig)
             throws Exception {
-        return new MetadataList(parseMetadata(is, metadata, httpHeaders, info, handlerTypeName));
+        return new MetadataList(parseMetadata(is, metadata, httpHeaders, info, handlerConfig));
     }
 }
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
index 3560b7d..e39b0ed 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaEmitterTest.java
@@ -50,9 +50,11 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
 import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
+import org.apache.tika.sax.BasicContentHandlerFactory;
 import org.apache.tika.server.core.resource.EmitterResource;
 import org.apache.tika.server.core.writer.JSONObjWriter;
 
@@ -142,7 +144,7 @@ public class TikaEmitterTest extends CXFTestBase {
     @Test
     public void testGet() throws Exception {
 
-        String q = "?fn=fsf&fk=hello_world.xml";
+        String q = "?fn=fsf&fk=hello_world.xml&type=text";
         String getUrl = endPoint + EMITTER_PATH_AND_FS + q;
         Response response = WebClient.create(getUrl).accept("application/json").get();
         assertEquals(200, response.getStatus());
@@ -159,6 +161,23 @@ public class TikaEmitterTest extends CXFTestBase {
     }
 
     @Test
+    public void testGetXML() throws Exception {
+
+        String q = "?fn=fsf&fk=hello_world.xml&type=xml";
+        String getUrl = endPoint + EMITTER_PATH_AND_FS + q;
+        Response response = WebClient.create(getUrl).accept("application/json").get();
+        assertEquals(200, response.getStatus());
+        List<Metadata> metadataList = null;
+        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
+            metadataList = JsonMetadataList.fromJson(reader);
+        }
+        assertEquals(1, metadataList.size());
+        Metadata metadata = metadataList.get(0);
+        String xml = metadata.get(TikaCoreProperties.TIKA_CONTENT);
+        assertContains("<p>hello world</p>", xml);
+    }
+
+    @Test
     public void testPost() throws Exception {
 
         Metadata userMetadata = new Metadata();
@@ -193,9 +212,40 @@ public class TikaEmitterTest extends CXFTestBase {
     }
 
     @Test
+    public void testPostXML() throws Exception {
+
+        Metadata userMetadata = new Metadata();
+        userMetadata.set("my-key", "my-value");
+        for (int i = 0; i < VALUE_ARRAY.length; i++) {
+            userMetadata.add("my-key-multi", VALUE_ARRAY[i]);
+        }
+
+        FetchEmitTuple t =
+                new FetchEmitTuple(new FetchKey("fsf", "hello_world.xml"), new EmitKey("fse", ""),
+                        userMetadata,
+                        new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, -1, -1),
+                        FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
+        StringWriter writer = new StringWriter();
+        JsonFetchEmitTuple.toJson(t, writer);
+
+        String getUrl = endPoint + EMITTER_PATH;
+        Response response =
+                WebClient.create(getUrl).accept("application/json").post(writer.toString());
+        assertEquals(200, response.getStatus());
+
+        List<Metadata> metadataList = null;
+        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
+            metadataList = JsonMetadataList.fromJson(reader);
+        }
+        assertEquals(1, metadataList.size());
+        Metadata metadata = metadataList.get(0);
+        assertContains("<p>hello world</p>", metadata.get(TikaCoreProperties.TIKA_CONTENT).trim());
+    }
+
+    @Test
     public void testPut() throws Exception {
 
-        String getUrl = endPoint + EMITTER_PATH_AND_FS;
+        String getUrl = endPoint + EMITTER_PATH_AND_FS + "/text";
         String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
 
         Response response = WebClient.create(getUrl).accept("application/json")
@@ -215,6 +265,25 @@ public class TikaEmitterTest extends CXFTestBase {
     }
 
     @Test
+    public void testPutXML() throws Exception {
+
+        String putUrl = endPoint + EMITTER_PATH_AND_FS + "/xml";
+        String metaPathKey = EmitterResource.EMIT_KEY_FOR_HTTP_HEADER;
+
+        Response response = WebClient.create(putUrl).accept("application/json")
+                .header(metaPathKey, "hello_world.xml")
+                .put(ClassLoader.getSystemResourceAsStream("test-documents/mock/hello_world.xml"));
+        assertEquals(200, response.getStatus());
+        List<Metadata> metadataList = null;
+        try (Reader reader = Files.newBufferedReader(TMP_OUTPUT_FILE)) {
+            metadataList = JsonMetadataList.fromJson(reader);
+        }
+        assertEquals(1, metadataList.size());
+        Metadata metadata = metadataList.get(0);
+        assertContains("<p>hello world</p>", metadata.get(TikaCoreProperties.TIKA_CONTENT));
+    }
+
+    @Test
     public void testPostNPE() throws Exception {
         Metadata userMetadata = new Metadata();
         userMetadata.set("my-key", "my-value");
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 7178e2a..44547c5 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -44,9 +44,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 
 @Ignore("useful for development...need to turn it into a real unit test")
 public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
@@ -170,6 +171,7 @@ public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
 
     private FetchEmitTuple getFetchEmitTuple(String fileName) throws IOException {
         return new FetchEmitTuple(new FetchKey(FETCHER_NAME, fileName),
-                new EmitKey(EMITTER_NAME, ""), new Metadata(), ON_PARSE_EXCEPTION);
+                new EmitKey(EMITTER_NAME, ""), new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
+                ON_PARSE_EXCEPTION);
     }
 }
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
index a99cea1..d5f480f 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerEmitterIntegrationTest.java
@@ -46,9 +46,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchEmitTuple;
 import org.apache.tika.utils.ProcessUtils;
 
 public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
@@ -242,7 +243,6 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
                              FetchEmitTuple.ON_PARSE_EXCEPTION onParseException) throws Exception {
 
         awaitServerStartup();
-        System.out.println(getJsonString(fileName, onParseException));
         Response response = WebClient
                 .create(endPoint + "/emit")
                 .accept("application/json")
@@ -264,7 +264,8 @@ public class TikaServerEmitterIntegrationTest extends IntegrationTestBase {
                                  FetchEmitTuple.ON_PARSE_EXCEPTION onParseException)
             throws IOException {
         FetchEmitTuple t = new FetchEmitTuple(new FetchKey(FETCHER_NAME, fileName),
-                new EmitKey(EMITTER_NAME, ""), new Metadata(), onParseException);
+                new EmitKey(EMITTER_NAME, ""), new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG,
+                onParseException);
         return JsonFetchEmitTuple.toJson(t);
     }
 }
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
index f86c44b..deec735 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerIntegrationTest.java
@@ -379,7 +379,6 @@ public class TikaServerIntegrationTest extends IntegrationTestBase {
                     } else if (r.nextFloat() < 0.02) {
                         file = TEST_HEAVY_HANG;
                     }
-                    System.out.println("writing file " + i + " : " + file);
                     response = WebClient.create(endPoint + META_PATH).accept("application/json")
                             .put(ClassLoader.getSystemResourceAsStream(file));
                 } catch (Exception e) {