You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2022/07/11 21:17:49 UTC

[tika] branch kafka updated: wip - add kafka support

This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch kafka
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/kafka by this push:
     new ac32a7e84 wip - add kafka support
ac32a7e84 is described below

commit ac32a7e84d6075835dd5097c2078ea7c96d2b8de
Author: Nicholas DiPiazza <ni...@lucidworks.com>
AuthorDate: Mon Jul 11 16:17:40 2022 -0500

    wip - add kafka support
---
 .../tika-pipes-kafka-integration-tests/README.md   | 212 +++++++++++++++
 .../tika-pipes-kafka-integration-tests/pom.xml     |  11 +
 .../tika/pipes/kafka/tests/TikaPipesKafkaTest.java | 193 ++++++++++++++
 .../tika/pipes/s3/tests/PipeIntegrationTests.java  | 246 -----------------
 .../pipes/s3/tests/TikaPipesKafkaTestBase.java     | 228 ----------------
 .../src/test/resources/embedded/embedded.docx      | Bin 0 -> 99389 bytes
 .../kafka-docker/zk-single-kafka-single.yml        |  36 +++
 .../{tika-config-s3ToFs.xml => log4j2.xml}         |  36 +--
 .../src/test/resources/tika-config-kafka.xml       | 119 +++++++++
 .../src/test/resources/tika-config-s3Tos3.xml      |  52 ----
 .../pipes/solr/tests/TikaPipesSolrTestBase.java    |   1 -
 .../tika/pipes/emitter/kafka/KafkaEmitter.java     | 295 +++++++++++++--------
 .../tika-pipes-iterator-kafka/pom.xml              |   2 -
 .../pipesiterator/kafka/KafkaPipesIterator.java    | 129 ++++-----
 .../kafka/TestKafkaPipesIterator.java              |   4 +-
 15 files changed, 846 insertions(+), 718 deletions(-)

diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/README.md b/tika-integration-tests/tika-pipes-kafka-integration-tests/README.md
new file mode 100644
index 000000000..ca0ce84a4
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/README.md
@@ -0,0 +1,212 @@
+[![Docker Pulls](https://img.shields.io/docker/pulls/wurstmeister/kafka.svg)](https://hub.docker.com/r/wurstmeister/kafka/)
+[![Docker Stars](https://img.shields.io/docker/stars/wurstmeister/kafka.svg)](https://hub.docker.com/r/wurstmeister/kafka/)
+[![](https://images.microbadger.com/badges/version/wurstmeister/kafka.svg)](https://microbadger.com/images/wurstmeister/kafka "Get your own version badge on microbadger.com")
+[![](https://images.microbadger.com/badges/image/wurstmeister/kafka.svg)](https://microbadger.com/images/wurstmeister/kafka "Get your own image badge on microbadger.com")
+[![Build Status](https://app.travis-ci.com/wurstmeister/kafka-docker.svg?branch=master)](https://app.travis-ci.com/wurstmeister/kafka-docker)
+
+
+kafka-docker
+============
+
+Dockerfile for [Apache Kafka](http://kafka.apache.org/)
+
+The image is available directly from [Docker Hub](https://hub.docker.com/r/wurstmeister/kafka/)
+
+Tags and releases
+-----------------
+
+All versions of the image are built from the same set of scripts with only minor variations (i.e. certain features are not supported on older versions). The version format mirrors the Kafka format, `<scala version>-<kafka version>`. Initially, all images are built with the recommended version of scala documented on [http://kafka.apache.org/downloads](http://kafka.apache.org/downloads). To list all available tags:
+
+```
+curl -s https://registry.hub.docker.com/v2/repositories/wurstmeister/kafka/tags\?page_size\=1024 | jq -r '.results[].name' | sort -u | egrep '\d.\d{2}-.*'
+```
+
+Everytime the image is updated, all tags will be pushed with the latest updates. This should allow for greater consistency across tags, as well as any security updates that have been made to the base image.
+
+---
+
+## Announcements
+
+* **04-Jun-2019** - Update base image to openjdk 212 ([Release notes](https://www.oracle.com/technetwork/java/javase/8u212-relnotes-5292913.html). Please force pull to get these latest updates - including security patches etc.
+
+---
+
+## Pre-Requisites
+
+- install docker-compose [https://docs.docker.com/compose/install/](https://docs.docker.com/compose/install/)
+- modify the ```KAFKA_ADVERTISED_HOST_NAME``` in [docker-compose.yml](https://raw.githubusercontent.com/wurstmeister/kafka-docker/master/docker-compose.yml) to match your docker host IP (Note: Do not use localhost or 127.0.0.1 as the host ip if you want to run multiple brokers.)
+- if you want to customize any Kafka parameters, simply add them as environment variables in ```docker-compose.yml```, e.g. in order to increase the ```message.max.bytes``` parameter set the environment to ```KAFKA_MESSAGE_MAX_BYTES: 2000000```. To turn off automatic topic creation set ```KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'```
+- Kafka's log4j usage can be customized by adding environment variables prefixed with ```LOG4J_```. These will be mapped to ```log4j.properties```. For example: ```LOG4J_LOGGER_KAFKA_AUTHORIZER_LOGGER=DEBUG, authorizerAppender```
+
+**NOTE:** There are several 'gotchas' with configuring networking. If you are not sure about what the requirements are, please check out the [Connectivity Guide](https://github.com/wurstmeister/kafka-docker/wiki/Connectivity) in the [Wiki](https://github.com/wurstmeister/kafka-docker/wiki)
+
+## Usage
+
+Start a cluster:
+
+- ```docker-compose up -d ```
+
+Add more brokers:
+
+- ```docker-compose scale kafka=3```
+
+Destroy a cluster:
+
+- ```docker-compose stop```
+
+## Note
+
+The default ```docker-compose.yml``` should be seen as a starting point. By default each broker will get a new port number and broker id on restart. Depending on your use case this might not be desirable. If you need to use specific ports and broker ids, modify the docker-compose configuration accordingly, e.g. [docker-compose-single-broker.yml](https://github.com/wurstmeister/kafka-docker/blob/master/docker-compose-single-broker.yml):
+
+- ```docker-compose -f docker-compose-single-broker.yml up```
+
+## Broker IDs
+
+You can configure the broker id in different ways
+
+1. explicitly, using ```KAFKA_BROKER_ID```
+2. via a command, using ```BROKER_ID_COMMAND```, e.g. ```BROKER_ID_COMMAND: "hostname | awk -F'-' '{print $$2}'"```
+
+If you don't specify a broker id in your docker-compose file, it will automatically be generated (see [https://issues.apache.org/jira/browse/KAFKA-1070](https://issues.apache.org/jira/browse/KAFKA-1070). This allows scaling up and down. In this case it is recommended to use the ```--no-recreate``` option of docker-compose to ensure that containers are not re-created and thus keep their names and ids.
+
+
+## Automatically create topics
+
+If you want to have kafka-docker automatically create topics in Kafka during
+creation, a ```KAFKA_CREATE_TOPICS``` environment variable can be
+added in ```docker-compose.yml```.
+
+Here is an example snippet from ```docker-compose.yml```:
+
+        environment:
+          KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact"
+
+```Topic 1``` will have 1 partition and 3 replicas, ```Topic 2``` will have 1 partition, 1 replica and a `cleanup.policy` set to `compact`. Also, see FAQ: [Topic compaction does not work](https://github.com/wurstmeister/kafka-docker/wiki#topic-compaction-does-not-work)
+
+If you wish to use multi-line YAML or some other delimiter between your topic definitions, override the default `,` separator by specifying the `KAFKA_CREATE_TOPICS_SEPARATOR` environment variable.
+
+For example, `KAFKA_CREATE_TOPICS_SEPARATOR: "$$'\n'"` would use a newline to split the topic definitions. Syntax has to follow docker-compose escaping rules, and [ANSI-C](https://www.gnu.org/software/bash/manual/html_node/ANSI_002dC-Quoting.html) quoting.
+
+## Advertised hostname
+
+You can configure the advertised hostname in different ways
+
+1. explicitly, using ```KAFKA_ADVERTISED_HOST_NAME```
+2. via a command, using ```HOSTNAME_COMMAND```, e.g. ```HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"```
+
+When using commands, make sure you review the "Variable Substitution" section in [https://docs.docker.com/compose/compose-file/](https://docs.docker.com/compose/compose-file/#variable-substitution)
+
+If ```KAFKA_ADVERTISED_HOST_NAME``` is specified, it takes precedence over ```HOSTNAME_COMMAND```
+
+For AWS deployment, you can use the Metadata service to get the container host's IP:
+```
+HOSTNAME_COMMAND=wget -t3 -T2 -qO-  http://169.254.169.254/latest/meta-data/local-ipv4
+```
+Reference: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
+
+### Injecting HOSTNAME_COMMAND into configuration
+
+If you require the value of `HOSTNAME_COMMAND` in any of your other `KAFKA_XXX` variables, use the `_{HOSTNAME_COMMAND}` string in your variable value, i.e.
+
+```
+KAFKA_ADVERTISED_LISTENERS=SSL://_{HOSTNAME_COMMAND}:9093,PLAINTEXT://9092
+```
+
+## Advertised port
+
+If the required advertised port is not static, it may be necessary to determine this programatically. This can be done with the `PORT_COMMAND` environment variable.
+
+```
+PORT_COMMAND: "docker port $$(hostname) 9092/tcp | cut -d: -f2"
+```
+
+This can be then interpolated in any other `KAFKA_XXX` config using the `_{PORT_COMMAND}` string, i.e.
+
+```
+KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://1.2.3.4:_{PORT_COMMAND}
+```
+
+## Listener Configuration
+
+It may be useful to have the [Kafka Documentation](https://kafka.apache.org/documentation/) open, to understand the various broker listener configuration options.
+
+Since 0.9.0, Kafka has supported [multiple listener configurations](https://issues.apache.org/jira/browse/KAFKA-1809) for brokers to help support different protocols and discriminate between internal and external traffic. Later versions of Kafka have deprecated ```advertised.host.name``` and ```advertised.port```.
+
+**NOTE:** ```advertised.host.name``` and ```advertised.port``` still work as expected, but should not be used if configuring the listeners.
+
+### Example
+
+The example environment below:
+
+```
+HOSTNAME_COMMAND: curl http://169.254.169.254/latest/meta-data/public-hostname
+KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
+KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
+KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+```
+
+Will result in the following broker config:
+
+```
+advertised.listeners = OUTSIDE://ec2-xx-xx-xxx-xx.us-west-2.compute.amazonaws.com:9094,INSIDE://:9092
+listeners = OUTSIDE://:9094,INSIDE://:9092
+inter.broker.listener.name = INSIDE
+```
+
+### Rules
+
+* No listeners may share a port number.
+* An advertised.listener must be present by protocol name and port number in the list of listeners.
+
+## Broker Rack
+
+You can configure the broker rack affinity in different ways
+
+1. explicitly, using ```KAFKA_BROKER_RACK```
+2. via a command, using ```RACK_COMMAND```, e.g. ```RACK_COMMAND: "curl http://169.254.169.254/latest/meta-data/placement/availability-zone"```
+
+In the above example the AWS metadata service is used to put the instance's availability zone in the ```broker.rack``` property.
+
+## JMX
+
+For monitoring purposes you may wish to configure JMX. Additional to the standard JMX parameters, problems could arise from the underlying RMI protocol used to connect
+
+* java.rmi.server.hostname - interface to bind listening port
+* com.sun.management.jmxremote.rmi.port - The port to service RMI requests
+
+For example, to connect to a kafka running locally (assumes exposing port 1099)
+
+      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
+      JMX_PORT: 1099
+
+Jconsole can now connect at ```jconsole 192.168.99.100:1099```
+
+## Docker Swarm Mode
+
+The listener configuration above is necessary when deploying Kafka in a Docker Swarm using an overlay network. By separating OUTSIDE and INSIDE listeners, a host can communicate with clients outside the overlay network while still benefiting from it from within the swarm.
+
+In addition to the multiple-listener configuration, additional best practices for operating Kafka in a Docker Swarm include:
+
+* Use "deploy: global" in a compose file to launch one and only one Kafka broker per swarm node.
+* Use compose file version '3.2' (minimum Docker version 16.04) and the "long" port definition with the port in "host" mode instead of the default "ingress" load-balanced port binding. This ensures that outside requests are always routed to the correct broker. For example:
+
+```
+ports:
+   - target: 9094
+     published: 9094
+     protocol: tcp
+     mode: host
+```
+
+Older compose files using the short-version of port mapping may encounter Kafka client issues if their connection to individual brokers cannot be guaranteed.
+
+See the included sample compose file ```docker-compose-swarm.yml```
+
+## Release process
+
+See the [wiki](https://github.com/wurstmeister/kafka-docker/wiki/ReleaseProcess) for information on adding or updating versions to release to Dockerhub.
+
+## Tutorial
+
+[http://wurstmeister.github.io/kafka-docker/](http://wurstmeister.github.io/kafka-docker/)
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
index 4a2094d2b..9b65cbb4a 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
@@ -53,6 +53,17 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-app</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <scm>
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
new file mode 100644
index 000000000..b8af1e2bb
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
@@ -0,0 +1,193 @@
+package org.apache.tika.pipes.kafka.tests;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Stopwatch;
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.pipes.HandlerConfig;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+public class TikaPipesKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafkaTest.class);
+
+    public static final int ZK_PORT = 2181;
+    public static final int KAFKA_PORT = 9092;
+    public static final String KAFKA = "kafka1";
+    public static final String ZOOKEEPER = "zoo1";
+    public static final String PIPE_ITERATOR_TOPIC = "pipe_iterator_topic";
+    public static final String EMITTER_TOPIC = "emitter_topic";
+    private final int numDocs = 42;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final File testFileFolder = new File("target", "test-files");
+
+    private final Set<String> waitingFor = new HashSet<>();
+
+    private void createTestFiles(String bodyContent) throws Exception {
+        testFileFolder.mkdirs();
+        for (int i = 0; i < numDocs; ++i) {
+            String nextFileName = "test-" + i + ".html";
+            FileUtils.writeStringToFile(new File(testFileFolder, nextFileName),
+                    "<html><body>" + bodyContent + "</body></html>", StandardCharsets.UTF_8);
+            waitingFor.add(nextFileName);
+        }
+        FileUtils.copyInputStreamToFile(this.getClass().getResourceAsStream("/embedded/embedded.docx"),
+                new File(testFileFolder, "test-embedded.docx"));
+        waitingFor.add("test-embedded.docx");
+    }
+
+    @ClassRule
+    public static DockerComposeContainer environment =
+            new DockerComposeContainer(new File("src/test/resources/kafka-docker/zk-single-kafka-single.yml"))
+                    .withExposedService(KAFKA, KAFKA_PORT)
+                    .withExposedService(ZOOKEEPER, ZK_PORT)
+                    .withLogConsumer(ZOOKEEPER, new Slf4jLogConsumer(LOG))
+                    .withLogConsumer(KAFKA, new Slf4jLogConsumer(LOG));
+
+    @Test
+    public void testKafkaPipeIterator()
+            throws Exception {
+        createTestFiles("initial");
+        File tikaConfigFile = new File("target", "ta.xml");
+        File log4jPropFile = new File("target", "tmp-log4j2.xml");
+        try (InputStream is = this.getClass()
+                .getResourceAsStream("/pipes-fork-server-custom-log4j2.xml")) {
+            FileUtils.copyInputStreamToFile(is, log4jPropFile);
+        }
+        String tikaConfigTemplateXml;
+        try (InputStream is = this.getClass()
+                .getResourceAsStream("/tika-config-kafka.xml")) {
+            tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
+        }
+
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+        Properties consumerProps = new Properties();
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer");
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "grp");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
+        LOG.info("Listening to EMITTER_TOPIC={}", EMITTER_TOPIC);
+        consumer.subscribe(Collections.singletonList(EMITTER_TOPIC));
+
+        ExecutorService es = Executors.newCachedThreadPool();
+
+        es.execute(() -> {
+            try {
+                String tikaConfigXml =
+                        createTikaConfigXml(tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
+                                HandlerConfig.PARSE_MODE.RMETA);
+
+                FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
+                TikaCLI.main(new String[] {"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        // Send the initial messages to the pipe iterator.
+        KafkaProducer producer = new KafkaProducer(producerProps);
+        int numSent = 0;
+        for (int i = 0; i < numDocs; ++i) {
+            File nextFile = new File(testFileFolder, "test-" + i + ".html");
+            Map<String, Object> meta = new HashMap<>();
+            meta.put("name", nextFile.getName());
+            meta.put("path", nextFile.getAbsolutePath());
+            meta.put("totalSpace", nextFile.getTotalSpace());
+            try {
+                producer.send(new ProducerRecord<>(PIPE_ITERATOR_TOPIC,
+                        nextFile.getAbsolutePath(),
+                        objectMapper.writeValueAsString(meta))).get();
+                LOG.info("Sent fetch request : {}", nextFile.getAbsolutePath());
+                ++numSent;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        LOG.info("Producer is now complete - sent {}.", numSent);
+
+        LOG.info("Tika pipes have been run. See if we can pull the response messages from the EMITTER_TOPIC={}", EMITTER_TOPIC);
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (!waitingFor.isEmpty()) {
+            Assert.assertFalse(stopwatch.elapsed(TimeUnit.SECONDS) > 600);
+            try {
+                ConsumerRecords<String, String> records = consumer.poll(12000);
+                for (ConsumerRecord<String, String> record : records) {
+                    String val = record.value();
+                    Map<String, Object> valMap = objectMapper.readValue(val, Map.class);
+                    waitingFor.remove(record.key());
+                    Assert.assertNotNull(valMap.get("content_s"));
+                    Assert.assertNotNull(valMap.get("mime_s"));
+                    Assert.assertNotNull(valMap.get("length_i"));
+                    LOG.info("Received message key={}, offset={}", record.key(), record.offset());
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        LOG.info("Done");
+    }
+
+    @NotNull
+    private String bootstrapServers() {
+        return environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT);
+    }
+
+    @NotNull
+    private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile,
+                                       String tikaConfigTemplateXml,
+                                       HandlerConfig.PARSE_MODE parseMode) {
+        String res =
+                tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
+                        .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
+                        .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath())
+                        .replace("{PARSE_MODE}", parseMode.name())
+                        .replace("{PIPE_ITERATOR_TOPIC}", PIPE_ITERATOR_TOPIC)
+                        .replace("{EMITTER_TOPIC}", EMITTER_TOPIC)
+                        .replace("{BOOTSTRAP_SERVERS}", bootstrapServers());
+        return res;
+    }
+}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java
deleted file mode 100644
index 28d82dc80..000000000
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tika.pipes.s3.tests;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.iterable.S3Objects;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.s3.S3Emitter;
-import org.apache.tika.pipes.fetcher.Fetcher;
-import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.pipesiterator.PipesIterator;
-
-@Ignore("turn these into actual tests with mock s3")
-public class PipeIntegrationTests {
-
-    private static final Path OUTDIR = Paths.get("");
-
-    @Test
-    public void testBruteForce() throws Exception {
-        String region = "";
-        String profile = "";
-        String bucket = "";
-        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region)
-                .withCredentials(new ProfileCredentialsProvider(profile)).build();
-        s3Client.listObjects(bucket);
-        int cnt = 0;
-        long sz = 0;
-
-        for (S3ObjectSummary summary : S3Objects.withPrefix(s3Client, bucket, "")) {
-            Path targ = OUTDIR.resolve(summary.getKey());
-            if (Files.isRegularFile(targ)) {
-                continue;
-            }
-            if (!Files.isDirectory(targ.getParent())) {
-                Files.createDirectories(targ.getParent());
-            }
-            System.out
-                    .println("id: " + cnt + " :: " + summary.getKey() + " : " + summary.getSize());
-            S3Object s3Object = s3Client.getObject(bucket, summary.getKey());
-            Files.copy(s3Object.getObjectContent(), targ);
-            summary.getSize();
-            cnt++;
-            sz += summary.getSize();
-        }
-        System.out.println("iterated: " + cnt + " sz: " + sz);
-    }
-
-    @Test
-    public void testS3ToFS() throws Exception {
-        Fetcher fetcher = getFetcher("tika-config-s3ToFs.xml", "s3f");
-        PipesIterator pipesIterator = getPipesIterator("tika-config-s3ToFs.xml");
-
-        int numConsumers = 1;
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
-        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
-        for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new FSFetcherEmitter(queue, fetcher, null));
-        }
-        for (FetchEmitTuple t : pipesIterator) {
-            queue.offer(t);
-        }
-        for (int i = 0; i < numConsumers; i++) {
-            queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
-        }
-        int finished = 0;
-        try {
-            while (finished++ < numConsumers + 1) {
-                Future<Integer> future = completionService.take();
-                future.get();
-            }
-        } finally {
-            es.shutdownNow();
-        }
-    }
-
-    @Test
-    public void testS3ToS3() throws Exception {
-        Fetcher fetcher = getFetcher("tika-config-s3Tos3.xml", "s3f");
-        Emitter emitter = getEmitter("tika-config-s3Tos3.xml", "s3e");
-        PipesIterator pipesIterator = getPipesIterator("tika-config-s3Tos3.xml");
-        int numConsumers = 20;
-        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
-        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(es);
-        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(1000);
-        for (int i = 0; i < numConsumers; i++) {
-            completionService.submit(new S3FetcherEmitter(queue, fetcher, (S3Emitter) emitter));
-        }
-        for (FetchEmitTuple t : pipesIterator) {
-            queue.offer(t);
-        }
-        for (int i = 0; i < numConsumers; i++) {
-            queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
-        }
-        int finished = 0;
-        try {
-            while (finished++ < numConsumers + 1) {
-                Future<Integer> future = completionService.take();
-                future.get();
-            }
-        } finally {
-            es.shutdownNow();
-        }
-    }
-
-    private Fetcher getFetcher(String fileName, String fetcherName) throws Exception {
-        FetcherManager manager = FetcherManager.load(getPath(fileName));
-        return manager.getFetcher(fetcherName);
-    }
-
-    private Emitter getEmitter(String fileName, String emitterName) throws Exception {
-        EmitterManager manager = EmitterManager.load(getPath(fileName));
-        return manager.getEmitter(emitterName);
-    }
-
-    private PipesIterator getPipesIterator(String fileName) throws Exception {
-        return PipesIterator.build(getPath(fileName));
-    }
-
-    private Path getPath(String fileName) throws Exception {
-        return Paths.get(PipeIntegrationTests.class.getResource("/" + fileName).toURI());
-    }
-
-
-    private static class FSFetcherEmitter implements Callable<Integer> {
-        private static final AtomicInteger counter = new AtomicInteger(0);
-
-        private final Fetcher fetcher;
-        private final Emitter emitter;
-        private final ArrayBlockingQueue<FetchEmitTuple> queue;
-
-        FSFetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
-                         Emitter emitter) {
-            this.queue = queue;
-            this.fetcher = fetcher;
-            this.emitter = emitter;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-
-            while (true) {
-                FetchEmitTuple t = queue.poll(5, TimeUnit.MINUTES);
-                if (t == null) {
-                    throw new TimeoutException("");
-                }
-                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
-                    return 1;
-                }
-                process(t);
-            }
-        }
-
-        private void process(FetchEmitTuple t) throws IOException, TikaException {
-            Path targ = OUTDIR.resolve(t.getFetchKey().getFetchKey());
-            if (Files.isRegularFile(targ)) {
-                return;
-            }
-            try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
-                System.out.println(counter.getAndIncrement() + " : " + t);
-                Files.createDirectories(targ.getParent());
-                Files.copy(is, targ);
-            }
-        }
-    }
-
-    private static class S3FetcherEmitter implements Callable<Integer> {
-        private static final AtomicInteger counter = new AtomicInteger(0);
-
-        private final Fetcher fetcher;
-        private final S3Emitter emitter;
-        private final ArrayBlockingQueue<FetchEmitTuple> queue;
-
-        S3FetcherEmitter(ArrayBlockingQueue<FetchEmitTuple> queue, Fetcher fetcher,
-                         S3Emitter emitter) {
-            this.queue = queue;
-            this.fetcher = fetcher;
-            this.emitter = emitter;
-        }
-
-        @Override
-        public Integer call() throws Exception {
-
-            while (true) {
-                FetchEmitTuple t = queue.poll(5, TimeUnit.MINUTES);
-                if (t == null) {
-                    throw new TimeoutException("");
-                }
-                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
-                    return 1;
-                }
-                process(t);
-            }
-        }
-
-        private void process(FetchEmitTuple t) throws IOException, TikaException {
-            Metadata userMetadata = new Metadata();
-            userMetadata.set("project", "my-project");
-
-            try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(), t.getMetadata())) {
-                emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
-            }
-        }
-    }
-}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/TikaPipesKafkaTestBase.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/TikaPipesKafkaTestBase.java
deleted file mode 100644
index ee14cc07f..000000000
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/TikaPipesKafkaTestBase.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.s3.tests;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.tika.cli.TikaCLI;
-import org.apache.tika.pipes.HandlerConfig;
-import org.apache.tika.pipes.emitter.solr.SolrEmitter;
-import org.jetbrains.annotations.NotNull;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.jupiter.api.AfterEach;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
-import org.testcontainers.utility.DockerImageName;
-
-public abstract class TikaPipesKafkaTestBase {
-
-    private final String topic = "testtopic";
-    private final int numDocs = 42;
-    private final File testFileFolder = new File("target", "test-files");
-    protected GenericContainer<?> solr;
-    private int zkPort;
-    private int kafkaPort;
-
-    public abstract String getKafkaStandaloneImageName();
-
-    public boolean handlesParentChild() {
-        return true;
-    }
-
-    @Rule
-    public GenericContainer<?> kafkaContainer =
-            new GenericContainer<>(DockerImageName.parse(getKafkaStandaloneImageName()))
-                    .withExposedPorts(9092, 2181);
-
-    @AfterEach
-    public void tearDown() throws Exception {
-        FileUtils.deleteDirectory(testFileFolder);
-    }
-
-    @Test
-    public void testPipesIteratorWithSolrUrls() throws Exception {
-        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter();
-    }
-
-    private void createTestFiles(String bodyContent) throws Exception {
-        testFileFolder.mkdirs();
-        for (int i = 0; i < numDocs; ++i) {
-            FileUtils.writeStringToFile(new File(testFileFolder, "test-" + i + ".html"),
-                    "<html><body>" + bodyContent + "</body></html>", StandardCharsets.UTF_8);
-        }
-        FileUtils.copyInputStreamToFile(this.getClass().getResourceAsStream("/embedded/embedded.docx"),
-                new File(testFileFolder, "test-embedded.docx"));
-    }
-
-    protected void setupKafka(GenericContainer<?> solr) throws Exception {
-        createTestFiles("initial");
-
-        // create instance for properties to access producer configs
-        Properties props = new Properties();
-
-        kafkaPort = kafkaContainer.getMappedPort(9092);
-        zkPort = kafkaContainer.getMappedPort(2181);
-
-        //Assign localhost id
-        props.put("bootstrap.servers", kafkaContainer.getHost() + ":" + kafkaPort;
-
-        //Set acknowledgements for producer requests.
-        props.put("acks", "all");
-
-        //If the request fails, the producer can automatically retry,
-        props.put("retries", 0);
-
-        //Specify buffer size in config
-        props.put("batch.size", 16384);
-
-        //Reduce the no of requests less than 0
-        props.put("linger.ms", 1);
-
-        //The buffer.memory controls the total amount of memory available to the producer for buffering.
-        props.put("buffer.memory", 33554432);
-
-        KafkaProducer producer = new KafkaProducer<>(props);
-
-        for (int i = 0; i < numDocs; ++i) {
-            Map<String, Object> fields = new HashMap<>();
-            String filename = "test-" + i + ".html";
-            fields.put("id", filename);
-            fields.put("path", filename);
-            producer.send(new ProducerRecord<String, Map<String, Object>>(topic,
-                    Integer.toString(i), fields));
-        }
-        producer.close();
-    }
-
-    /**
-     * Runs a test using Solr Pipe Iterator, File Fetcher and Solr Emitter.
-     */
-    protected void runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter()
-            throws Exception {
-        File tikaConfigFile = new File("target", "ta.xml");
-        File log4jPropFile = new File("target", "tmp-log4j2.xml");
-        try (InputStream is = this.getClass()
-                .getResourceAsStream("/pipes-fork-server-custom-log4j2.xml")) {
-            FileUtils.copyInputStreamToFile(is, log4jPropFile);
-        }
-        String tikaConfigTemplateXml;
-        try (InputStream is = this.getClass()
-                .getResourceAsStream("/tika-config-solr-urls.xml")) {
-            tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
-        }
-
-        String tikaConfigXml =
-                createTikaConfigXml(useZk(), tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
-                        SolrEmitter.UpdateStrategy.ADD,
-                        SolrEmitter.AttachmentStrategy.PARENT_CHILD,
-                        HandlerConfig.PARSE_MODE.RMETA);
-        FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
-        TikaCLI.main(new String[] {"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
-
-        try (SolrClient solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(solrEndpoint)
-                .build()) {
-            solrClient.commit(collection, true, true);
-            Assert.assertEquals(numDocs, solrClient
-                    .query(collection, new SolrQuery("mime_s:\"text/html; charset=ISO-8859-1\""))
-                    .getResults().getNumFound());
-            Assert.assertEquals(numDocs,
-                    solrClient.query(collection, new SolrQuery("content_s:*initial*")).getResults()
-                            .getNumFound());
-            if(handlesParentChild()) {
-                Assert.assertEquals(3,
-                        solrClient.query(collection, new SolrQuery("_root_:\"test-embedded.docx\""))
-                                .getResults().getNumFound());
-            }
-            //clean up test-embedded.docx so that the iterator won't try to update its children
-            //in the next test
-
-            solrClient.deleteByQuery(collection, "_root_:\"test-embedded.docx\"");
-
-            solrClient.commit(collection, true, true);
-        }
-
-
-        // update the documents with "update must exist" and run tika async again with "UPDATE_MUST_EXIST".
-        // It should not fail, and docs should be updated.
-        createTestFiles("updated");
-        tikaConfigXml =
-                createTikaConfigXml(useZk(), tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
-                        SolrEmitter.UpdateStrategy.UPDATE_MUST_EXIST,
-                        SolrEmitter.AttachmentStrategy.PARENT_CHILD,
-                        HandlerConfig.PARSE_MODE.RMETA);
-        FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
-
-        TikaCLI.main(new String[] {"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
-
-        try (SolrClient solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(solrEndpoint)
-                .build()) {
-            solrClient.commit(collection, true, true);
-            Assert.assertEquals(numDocs, solrClient
-                    .query(collection, new SolrQuery("mime_s:\"text/html; charset=ISO-8859-1\""))
-                    .getResults().getNumFound());
-            Assert.assertEquals(numDocs,
-                    solrClient.query(collection, new SolrQuery("content_s:*updated*")).getResults()
-                            .getNumFound());
-        }
-    }
-
-    @NotNull
-    private String createTikaConfigXml(boolean useZk, File tikaConfigFile, File log4jPropFile,
-                                       String tikaConfigTemplateXml,
-                                       SolrEmitter.UpdateStrategy updateStrategy,
-                                       SolrEmitter.AttachmentStrategy attachmentStrategy,
-                                       HandlerConfig.PARSE_MODE parseMode) {
-        String res =
-                tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
-                        .replace("{UPDATE_STRATEGY}", updateStrategy.toString())
-                        .replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
-                        .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
-                        .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath())
-                        .replace("{PARSE_MODE}", parseMode.name());
-        if (useZk) {
-            res = res.replace("{SOLR_CONNECTION}",
-                    "<solrZkHosts>\n" + "        <solrZkHost>" + solrHost + ":" + zkPort +
-                            "</solrZkHost>\n" + "      </solrZkHosts>\n");
-        } else {
-            res = res.replace("{SOLR_CONNECTION}",
-                    "<solrUrls>\n" + "        <solrUrl>http://" + solrHost + ":" + solrPort +
-                            "/solr</solrUrl>\n" + "      </solrUrls>\n");
-        }
-        return res;
-    }
-}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/embedded/embedded.docx b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/embedded/embedded.docx
new file mode 100644
index 000000000..255da0f96
Binary files /dev/null and b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/embedded/embedded.docx differ
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka-docker/zk-single-kafka-single.yml b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka-docker/zk-single-kafka-single.yml
new file mode 100644
index 000000000..a812a9f48
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka-docker/zk-single-kafka-single.yml
@@ -0,0 +1,36 @@
+version: '2.1'
+
+services:
+  zoo1:
+    image: confluentinc/cp-zookeeper:7.2.0
+    hostname: zoo1
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_SERVER_ID: 1
+      ZOOKEEPER_SERVERS: zoo1:2888:3888
+
+  kafka1:
+    image: confluentinc/cp-kafka:7.2.0
+    hostname: kafka1
+    ports:
+      - "9092:9092"
+      - "29092:29092"
+      - "9999:9999"
+    environment:
+      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
+      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
+      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
+      KAFKA_BROKER_ID: 1
+      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+      KAFKA_JMX_PORT: 9999
+      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
+      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
+      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
+    depends_on:
+      - zoo1
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-s3ToFs.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/log4j2.xml
similarity index 50%
rename from tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-s3ToFs.xml
rename to tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/log4j2.xml
index 779ef53f2..c88e66e99 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-s3ToFs.xml
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/log4j2.xml
@@ -1,4 +1,5 @@
-<?xml version="1.0" encoding="UTF-8" ?>
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
 <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
@@ -17,24 +18,15 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<properties>
-    <fetchers>
-        <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
-            <params>
-                <name>s3</name>
-                <region>us-east-1</region>
-                <profile><!-- fill in here --></profile>
-            </params>
-        </fetcher>
-    </fetchers>
-    <pipesIterators>
-        <pipesIterator class="org.apache.tika.pipes.pipesiterator.s3.KafkaPipesIterator">
-            <params>
-                <fetcherName>s3</fetcherName>
-                <bucket><!-- fill in here --></bucket>
-                <region>us-east-1</region>
-                <profile><!-- fill in here --></profile>
-            </params>
-        </pipesIterator>
-    </pipesIterators>
-</properties>
\ No newline at end of file
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_ERR">
+      <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
new file mode 100644
index 000000000..5a9b4aea6
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+    <parsers>
+        <parser class="org.apache.tika.parser.DefaultParser">
+            <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+            <parser-exclude class="org.apache.tika.parser.pdf.PDFParser"/>
+            <parser-exclude class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser"/>
+            <parser-exclude class="org.apache.tika.parser.microsoft.OfficeParser"/>
+        </parser>
+        <parser class="org.apache.tika.parser.pdf.PDFParser">
+            <params>
+                <param name="extractActions" type="bool">true</param>
+                <param name="checkExtractAccessPermissions" type="bool">true</param>
+            </params>
+        </parser>
+        <parser class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser">
+            <params>
+                <param name="includeDeletedContent" type="bool">true</param>
+                <param name="includeMoveFromContent" type="bool">true</param>
+                <param name="extractMacros" type="bool">true</param>
+            </params>
+        </parser>
+        <parser class="org.apache.tika.parser.microsoft.OfficeParser">
+            <params>
+                <param name="extractMacros" type="bool">true</param>
+            </params>
+        </parser>
+    </parsers>
+    <metadataFilters>
+        <!-- depending on the file format, some dates do not have a timezone. This
+             filter arbitrarily assumes dates have a UTC timezone and will format all
+             dates as yyyy-MM-dd'T'HH:mm:ss'Z' whether or not they actually have a timezone.
+             -->
+        <metadataFilter class="org.apache.tika.metadata.filter.DateNormalizingMetadataFilter"/>
+        <metadataFilter class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
+            <params>
+                <excludeUnmapped>true</excludeUnmapped>
+                <mappings>
+                    <mapping from="X-TIKA:content" to="content_s"/>
+                    <mapping from="Content-Length" to="length_i"/>
+                    <mapping from="dc:creator" to="creators_ss"/>
+                    <mapping from="dc:title" to="title_s"/>
+                    <mapping from="Content-Type" to="mime_s"/>
+                    <mapping from="X-TIKA:EXCEPTION:container_exception" to="tika_exception_s"/>
+                </mappings>
+            </params>
+        </metadataFilter>
+    </metadataFilters>
+    <async>
+        <params>
+            <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+            <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+            <emitWithinMillis>10</emitWithinMillis>
+            <numEmitters>1</numEmitters>
+            <numClients>1</numClients>
+            <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+            <forkedJvmArgs>
+                <arg>-Xmx1g</arg>
+                <arg>-XX:ParallelGCThreads=2</arg>
+                <arg>-XX:+ExitOnOutOfMemoryError</arg>
+                <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+            </forkedJvmArgs>
+            <timeoutMillis>60000</timeoutMillis>
+        </params>
+    </async>
+    <fetchers>
+        <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+            <params>
+                <name>fsf</name>
+                <basePath>{PATH_TO_DOCS}</basePath>
+            </params>
+        </fetcher>
+    </fetchers>
+    <emitters>
+        <emitter class="org.apache.tika.pipes.emitter.kafka.KafkaEmitter">
+            <params>
+                <name>ke</name>
+                <topic>{EMITTER_TOPIC}</topic>
+                <bootstrapServers>{BOOTSTRAP_SERVERS}</bootstrapServers>
+                <groupId>grpid</groupId>
+            </params>
+        </emitter>
+        <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+            <params>
+                <name>fse</name>
+                <basePath>/path/to/extracts</basePath>
+            </params>
+        </emitter>
+    </emitters>
+    <pipesIterator class="org.apache.tika.pipes.pipesiterator.kafka.KafkaPipesIterator">
+        <params>
+            <topic>{PIPE_ITERATOR_TOPIC}</topic>
+            <bootstrapServers>{BOOTSTRAP_SERVERS}</bootstrapServers>
+            <groupId>grpid</groupId>
+            <autoOffsetReset>earliest</autoOffsetReset>
+            <pollDelayMs>50000</pollDelayMs>
+            <fetcherName>fsf</fetcherName>
+            <emitterName>ke</emitterName>
+        </params>
+    </pipesIterator>
+</properties>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-s3Tos3.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-s3Tos3.xml
deleted file mode 100644
index d53d649be..000000000
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-s3Tos3.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<properties>
-    <fetchers>
-        <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher">
-            <params>
-                <name>s3f</name>
-                <region>us-east-1</region>
-                <bucket><!-- fill in here --></bucket>
-                <profile><!-- fill in here --></profile>
-            </params>
-        </fetcher>
-    </fetchers>
-    <pipesIterators>
-        <pipesIterator class="org.apache.tika.pipes.pipesiterator.s3.KafkaPipesIterator">
-            <params>
-                <fetcherName>s3f</fetcherName>
-                <region>us-east-1</region>
-                <bucket><!-- fill in here --></bucket>
-                <profile><!-- fill in here --></profile>
-            </params>
-        </pipesIterator>
-    </pipesIterators>
-    <emitters>
-        <emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter">
-            <params>
-                <name>s3e</name>
-                <region>us-east-1</region>
-                <bucket><!-- fill in here -->></bucket>
-                <profile><!-- fill in here --></profile>
-                <fileExtension></fileExtension>
-            </params>
-        </emitter>
-    </emitters>
-</properties>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
index 49e51c9ff..c82323bd9 100644
--- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
+++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
@@ -30,7 +30,6 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
-import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.tika.cli.TikaCLI;
 import org.apache.tika.pipes.HandlerConfig;
diff --git a/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java b/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
index 2a03bdd00..6ca25c477 100644
--- a/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
@@ -18,54 +18,173 @@ package org.apache.tika.pipes.emitter.kafka;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonMetadataList;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.StreamEmitter;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Emits to kafka
  */
-public class KafkaEmitter extends AbstractEmitter implements Initializable, StreamEmitter {
+public class KafkaEmitter extends AbstractEmitter implements Initializable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEmitter.class);
-    private String server;
-    private String topic;
-    private String groupId;
-    private String prefix = null;
-    private String fileExtension = null;
-    private Producer<String, Map<String, Object>> producer;
+
+    private static final ObjectMapper OM = new ObjectMapper();
+
+    String topic;
+    String bootstrapServers;
+    String groupId;
+
+    String acks = "all";
+    int lingerMs = 5000;
+    int batchSize = 16384;
+    int bufferMemory = 32 * 1024 * 1024;
+    String compressionType = "none";
+    int connectionsMaxIdleMs = 9 * 60 * 1000;
+    int deliveryTimeoutMs = 120 * 1000;
+    boolean enableIdempotence = false;
+    String interceptorClasses;
+    int maxBlockMs = 60 * 1000;
+    int maxInFlightRequestsPerConnection = 5;
+    int maxRequestSize = 1024 * 1024;
+    int metadataMaxAgeMs = 5 * 60 * 1000;
+    int requestTimeoutMs = 30 * 1000;
+    int retries = Integer.MAX_VALUE;
+    int retryBackoffMs = 100;
+    int transactionTimeoutMs = 60000;
+    String transactionalId;
+    String clientId;
+    String keySerializer;
+    String valueSerializer;
+
+    private Producer<String, String> producer;
+
+    @Field
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    @Field
+    public void setAcks(String acks) {
+        this.acks = acks;
+    }
+
+    @Field
+    public void setLingerMs(int lingerMs) {
+        this.lingerMs = lingerMs;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    @Field
+    public void setBufferMemory(int bufferMemory) {
+        this.bufferMemory = bufferMemory;
+    }
+
+    @Field
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Field
+    public void setCompressionType(String compressionType) {
+        this.compressionType = compressionType;
+    }
+
+    @Field
+    public void setConnectionsMaxIdleMs(int connectionsMaxIdleMs) {
+        this.connectionsMaxIdleMs = connectionsMaxIdleMs;
+    }
+
+    @Field
+    public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
+        this.deliveryTimeoutMs = deliveryTimeoutMs;
+    }
+
+    @Field
+    public void setEnableIdempotence(boolean enableIdempotence) {
+        this.enableIdempotence = enableIdempotence;
+    }
+
+    @Field
+    public void setInterceptorClasses(String interceptorClasses) {
+        this.interceptorClasses = interceptorClasses;
+    }
+
+    @Field
+    public void setMaxBlockMs(int maxBlockMs) {
+        this.maxBlockMs = maxBlockMs;
+    }
+
+    @Field
+    public void setMaxInFlightRequestsPerConnection(int maxInFlightRequestsPerConnection) {
+        this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
+    }
+
+    @Field
+    public void setMaxRequestSize(int maxRequestSize) {
+        this.maxRequestSize = maxRequestSize;
+    }
 
     @Field
-    public void setServer(String server) {
-        this.server = server;
+    public void setMetadataMaxAgeMs(int metadataMaxAgeMs) {
+        this.metadataMaxAgeMs = metadataMaxAgeMs;
+    }
+
+    @Field
+    public void setRequestTimeoutMs(int requestTimeoutMs) {
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
+    @Field
+    public void setRetries(int retries) {
+        this.retries = retries;
+    }
+
+    @Field
+    public void setRetryBackoffMs(int retryBackoffMs) {
+        this.retryBackoffMs = retryBackoffMs;
+    }
+
+    @Field
+    public void setTransactionTimeoutMs(int transactionTimeoutMs) {
+        this.transactionTimeoutMs = transactionTimeoutMs;
+    }
+
+    @Field
+    public void setTransactionalId(String transactionalId) {
+        this.transactionalId = transactionalId;
+    }
+
+    @Field
+    public void setKeySerializer(String keySerializer) {
+        this.keySerializer = keySerializer;
+    }
+
+    @Field
+    public void setValueSerializer(String valueSerializer) {
+        this.valueSerializer = valueSerializer;
     }
 
     @Field
@@ -78,95 +197,36 @@ public class KafkaEmitter extends AbstractEmitter implements Initializable, Stre
         this.groupId = groupId;
     }
 
-    /**
-     * Requires the src-bucket/path/to/my/file.txt in the {@link TikaCoreProperties#SOURCE_PATH}.
-     *
-     * @param metadataList
-     * @throws IOException
-     * @throws TikaException
-     */
     @Override
     public void emit(String emitKey, List<Metadata> metadataList)
             throws IOException, TikaEmitterException {
-        if (metadataList == null || metadataList.size() == 0) {
+        if (metadataList == null || metadataList.isEmpty()) {
             throw new TikaEmitterException("metadata list must not be null or of size 0");
         }
+        for (Metadata metadata : metadataList) {
+            LOGGER.debug("about to emit to target topic: ({}) path:({})", topic, emitKey);
 
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try (Writer writer = new BufferedWriter(
-                new OutputStreamWriter(bos, StandardCharsets.UTF_8))) {
-            JsonMetadataList.toJson(metadataList, writer);
-        } catch (IOException e) {
-            throw new TikaEmitterException("can't jsonify", e);
-        }
-        byte[] bytes = bos.toByteArray();
-        try (InputStream is = TikaInputStream.get(bytes)) {
-            emit(emitKey, is, new Metadata());
-        }
-    }
-
-    /**
-     * @param path         -- object path, not including the bucket
-     * @param is           inputStream to copy
-     * @param userMetadata this will be written to the s3 ObjectMetadata's userMetadata
-     * @throws TikaEmitterException or IOexception if there is a Runtime s3 client exception
-     */
-    @Override
-    public void emit(String path, InputStream is, Metadata userMetadata)
-            throws IOException, TikaEmitterException {
-
-        if (!StringUtils.isBlank(prefix)) {
-            path = prefix + "/" + path;
-        }
-
-        if (!StringUtils.isBlank(fileExtension)) {
-            path += "." + fileExtension;
-        }
-
-        LOGGER.debug("about to emit to target topic: ({}) path:({})", topic, path);
-
-        Map<String, Object> fields = new HashMap<>();
-        for (String n : userMetadata.names()) {
-            String[] vals = userMetadata.getValues(n);
-            if (vals.length > 1) {
-                LOGGER.warn("Can only write the first value for key {}. I see {} values.",
-                        n,
-                        vals.length);
+            Map<String, Object> fields = new HashMap<>();
+            for (String n : metadata.names()) {
+                String[] vals = metadata.getValues(n);
+                if (vals.length > 1) {
+                    LOGGER.warn("Can only write the first value for key {}. I see {} values.",
+                            n,
+                            vals.length);
+                }
+                fields.put(n, vals[0]);
             }
-            fields.put(n, vals[0]);
-        }
 
-        producer.send(new ProducerRecord<>(topic, path, fields));
-    }
-
-    @Field
-    public void setPrefix(String prefix) {
-        //strip final "/" if it exists
-        if (prefix.endsWith("/")) {
-            this.prefix = prefix.substring(0, prefix.length() - 1);
-        } else {
-            this.prefix = prefix;
+            producer.send(new ProducerRecord<>(topic, emitKey, OM.writeValueAsString(fields)));
         }
     }
 
-    /**
-     * If you want to customize the output file's file extension.
-     * Do not include the "."
-     *
-     * @param fileExtension
-     */
-    @Field
-    public void setFileExtension(String fileExtension) {
-        this.fileExtension = fileExtension;
+    private void safePut(Properties props, String key, Object val) {
+        if (val != null) {
+            props.put(key, val);
+        }
     }
 
-    /**
-     * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
-     * e.g. AmazonClientException in a TikaConfigException.
-     *
-     * @param params params to use for initialization
-     * @throws TikaConfigException
-     */
     @Override
     public void initialize(Map<String, Param> params) throws TikaConfigException {
 
@@ -174,30 +234,57 @@ public class KafkaEmitter extends AbstractEmitter implements Initializable, Stre
         Properties props = new Properties();
 
         //Assign localhost id
-        props.put("bootstrap.servers", server);
+        safePut(props, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 
         //Set acknowledgements for producer requests.      
-        props.put("acks", "all");
+        safePut(props, ProducerConfig.ACKS_CONFIG, acks);
 
         //If the request fails, the producer can automatically retry,
-        props.put("retries", 0);
+        safePut(props, ProducerConfig.RETRIES_CONFIG, retries);
 
         //Specify buffer size in config
-        props.put("batch.size", 16384);
+        safePut(props, ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
 
         //Reduce the no of requests less than 0   
-        props.put("linger.ms", 1);
+        safePut(props, ProducerConfig.LINGER_MS_CONFIG, lingerMs);
 
         //The buffer.memory controls the total amount of memory available to the producer for buffering.   
-        props.put("buffer.memory", 33554432);
+        safePut(props, ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+
+        safePut(props, ProducerConfig.CLIENT_ID_CONFIG, clientId);
+        safePut(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
+        safePut(props, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
+        safePut(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
+        safePut(props, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorClasses);
+        safePut(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
+        safePut(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
+        safePut(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
+        safePut(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAgeMs);
+        safePut(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
+        safePut(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
+        safePut(props, ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
+        safePut(props, ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+
+        safePut(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerClass(keySerializer, StringSerializer.class));
+        safePut(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerClass(valueSerializer, StringSerializer.class));
 
         producer = new KafkaProducer<>(props);
     }
 
+    private Object serializerClass(String className, Class defaultClass) {
+        try {
+            return className == null ? defaultClass : Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            LOGGER.error("Could not find key serializer class: {}", className);
+            return null;
+        }
+    }
+
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
         mustNotBeEmpty("topic", this.topic);
-        mustNotBeEmpty("server", this.server);
+        mustNotBeEmpty("server", this.bootstrapServers);
     }
+
 }
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/pom.xml
index bfe552a32..f9a0d49a7 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/pom.xml
@@ -47,12 +47,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <version>${log4j2.version}</version>
-      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
-      <scope>provided</scope>
     </dependency>
   </dependencies>
   <build>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
index 74024c80b..4910a87dd 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
@@ -16,14 +16,11 @@
  */
 package org.apache.tika.pipes.pipesiterator.kafka;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -36,7 +33,6 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.io.FilenameUtils;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
@@ -49,19 +45,17 @@ import org.slf4j.LoggerFactory;
 public class KafkaPipesIterator extends PipesIterator implements Initializable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPipesIterator.class);
-    private String server;
-    private String topic;
-    private String groupId;
-    private Pattern fileNamePattern = null;
+    String topic;
+    String bootstrapServers;
+    String keySerializer;
+    String valueSerializer;
+    String groupId;
+    String autoOffsetReset = "earliest";
+    int pollDelayMs = 100;
 
-    private Properties properties;
+    private Properties props;
     private KafkaConsumer<String, String> consumer;
 
-    @Field
-    public void setServer(String server) {
-        this.server = server;
-    }
-
     @Field
     public void setTopic(String topic) {
         this.topic = topic;
@@ -73,74 +67,89 @@ public class KafkaPipesIterator extends PipesIterator implements Initializable {
     }
 
     @Field
-    public void setFileNamePattern(String fileNamePattern) {
-        this.fileNamePattern = Pattern.compile(fileNamePattern);
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    @Field
+    public void setKeySerializer(String keySerializer) {
+        this.keySerializer = keySerializer;
+    }
+
+    @Field
+    public void setAutoOffsetReset(String autoOffsetReset) {
+        this.autoOffsetReset = autoOffsetReset;
+    }
+
+    @Field
+    public void setValueSerializer(String valueSerializer) {
+        this.valueSerializer = valueSerializer;
+    }
+
+    @Field
+    public void setPollDelayMs(int pollDelayMs) {
+        this.pollDelayMs = pollDelayMs;
+    }
+
+    private void safePut(Properties props, String key, Object val) {
+        if (val != null) {
+            props.put(key, val);
+        }
     }
 
-    /**
-     * This initializes the s3 client. Note, we wrap S3's RuntimeExceptions,
-     * e.g. AmazonClientException in a TikaConfigException.
-     *
-     * @param params params to use for initialization
-     * @throws TikaConfigException
-     */
     @Override
     public void initialize(Map<String, Param> params) {
-        properties = new Properties();
-        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
-        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer .class.getName());
-        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumer = new KafkaConsumer<>(properties);
+        props = new Properties();
+        safePut(props, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        safePut(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializerClass(keySerializer, StringDeserializer.class));
+        safePut(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serializerClass(valueSerializer, StringDeserializer.class));
+        safePut(props, ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        safePut(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+        consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(topic));
     }
 
+    private Object serializerClass(String className, Class defaultClass) {
+        try {
+            return className == null ? defaultClass : Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            LOGGER.error("Could not find key serializer class: {}", className);
+            return null;
+        }
+    }
+
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
         super.checkInitialization(problemHandler);
-        TikaConfig.mustNotBeEmpty("server", this.server);
+        TikaConfig.mustNotBeEmpty("bootstrapServers", this.bootstrapServers);
         TikaConfig.mustNotBeEmpty("topic", this.topic);
     }
 
     @Override
-    protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+    protected void enqueue() throws InterruptedException, TimeoutException {
         String fetcherName = getFetcherName();
         String emitterName = getEmitterName();
         long start = System.currentTimeMillis();
         int count = 0;
         HandlerConfig handlerConfig = getHandlerConfig();
-        ConsumerRecords<String, String> records =
-                consumer.poll(Duration.ofMillis(100));
-        Matcher fileNameMatcher = null;
-        if (fileNamePattern != null) {
-            fileNameMatcher = fileNamePattern.matcher("");
-        }
+        ConsumerRecords<String, String> records;
 
-        // process the dataset received
-        for (ConsumerRecord<String, String> record : records) {
-            if (fileNameMatcher != null && !accept(fileNameMatcher, record.key())) {
-                continue;
+        do {
+            records = consumer.poll(Duration.ofMillis(pollDelayMs));
+            for (ConsumerRecord<String, String> r : records) {
+                long elapsed = System.currentTimeMillis() - start;
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("adding ({}) {} in {} ms", count, r.key(), elapsed);
+                }
+                tryToAdd(new FetchEmitTuple(r.key(), new FetchKey(fetcherName,
+                        r.key()),
+                        new EmitKey(emitterName, r.key()), new Metadata(), handlerConfig,
+                        getOnParseException()));
+                count++;
             }
-            long elapsed = System.currentTimeMillis() - start;
-            LOGGER.debug("adding ({}) {} in {} ms", count, record.key(), elapsed);
-            //TODO -- allow user specified metadata as the "id"?
-            tryToAdd(new FetchEmitTuple(record.key(), new FetchKey(fetcherName,
-                    record.key()),
-                    new EmitKey(emitterName, record.key()), new Metadata(), handlerConfig,
-                    getOnParseException()));
-            count++;
-        }
+        } while (!records.isEmpty());
         long elapsed = System.currentTimeMillis() - start;
-        LOGGER.info("finished enqueuing {} files in {} ms", count, elapsed);
-    }
-
-    private boolean accept(Matcher fileNameMatcher, String key) {
-        String fName = FilenameUtils.getName(key);
-        if (fileNameMatcher.reset(fName).find()) {
-            return true;
-        }
-        return false;
+        LOGGER.info("Finished enqueuing {} files in {} ms", count, elapsed);
     }
 }
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/test/java/org/apache/tika/pipes/pipesiterator/kafka/TestKafkaPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/test/java/org/apache/tika/pipes/pipesiterator/kafka/TestKafkaPipesIterator.java
index a2ace7af6..152c5101b 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/test/java/org/apache/tika/pipes/pipesiterator/kafka/TestKafkaPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/test/java/org/apache/tika/pipes/pipesiterator/kafka/TestKafkaPipesIterator.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
-import org.apache.tika.pipes.pipesiterator.kafka.KafkaPipesIterator;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
@@ -43,9 +42,8 @@ public class TestKafkaPipesIterator {
         KafkaPipesIterator it = new KafkaPipesIterator();
         it.setFetcherName("kafka");
         it.setGroupId("");//find one
-        it.setServer("");//use one
+        it.setBootstrapServers("");//use one
         it.setTopic("");//select one
-        it.setFileNamePattern("");
         it.initialize(Collections.EMPTY_MAP);
         int numConsumers = 6;
         ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);