You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2022/07/18 20:27:03 UTC
[tika] branch kafka updated: finish setting up intiial kafka commit - tests pass.
This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch kafka
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/kafka by this push:
new 026aa00bb finish setting up intiial kafka commit - tests pass.
026aa00bb is described below
commit 026aa00bb99892478708f32eef396eb8fc35ae6b
Author: Nicholas DiPiazza <ni...@lucidworks.com>
AuthorDate: Mon Jul 18 15:26:55 2022 -0500
finish setting up intiial kafka commit - tests pass.
---
.../tika-pipes-kafka-integration-tests/pom.xml | 6 +
.../pipes/kafka/tests/TikaPipesKafka2Test.java | 116 ++++++++++++++++++
.../tika/pipes/kafka/tests/TikaPipesKafkaTest.java | 132 ++++++++++++---------
.../src/test/resources/tika-config-kafka.xml | 3 +-
.../tika/pipes/emitter/kafka/KafkaEmitter.java | 6 -
.../pipesiterator/kafka/KafkaPipesIterator.java | 1 +
6 files changed, 197 insertions(+), 67 deletions(-)
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
index 9b65cbb4a..493444321 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
@@ -35,6 +35,12 @@
<version>${test.containers.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${test.containers.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tika-core</artifactId>
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java
new file mode 100644
index 000000000..50bc81029
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java
@@ -0,0 +1,116 @@
+package org.apache.tika.pipes.kafka.tests;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesKafka2Test {
+ private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafka2Test.class);
+ public static final String MY_GROUP_ID = "my-group-id";
+ public static final String TOPIC = "topic";
+ public static final int NUM_DOCS = 100;
+
+ KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
+
+ @Before
+ public void before() {
+ kafka.start();
+ }
+
+ @After
+ public void after() {
+ kafka.close();
+ }
+
+ @Test
+ public void testPipes() throws ExecutionException, InterruptedException {
+
+ Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ consumerProps.put("group.id", MY_GROUP_ID);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ ExecutorService es = Executors.newCachedThreadPool();
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+ AtomicInteger remaining = new AtomicInteger();
+ Future producerFuture = es.submit(() -> {
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ while (remaining.get() <= NUM_DOCS) {
+ String msg = "Message " + remaining.getAndIncrement();
+ producer.send(new ProducerRecord<>(TOPIC, msg));
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ producerFuture.get();
+
+ Future consumerFuture = es.submit(() -> {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+
+ while (remaining.get() > 0) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : records) {
+ remaining.decrementAndGet();
+ LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ consumerFuture.get();
+
+ try {
+ consumerFuture.get(3, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ throw new AssertionError("Could not get the consumers from the queue in 30 minutes", e);
+ }
+
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(NUM_DOCS));
+
+ Assert.assertTrue(records.isEmpty());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
index b8af1e2bb..cc5afc546 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
@@ -15,7 +15,12 @@ import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -23,29 +28,30 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.tika.cli.TikaCLI;
import org.apache.tika.pipes.HandlerConfig;
import org.jetbrains.annotations.NotNull;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.ClassRule;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.DockerComposeContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.utility.DockerImageName;
public class TikaPipesKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafkaTest.class);
- public static final int ZK_PORT = 2181;
- public static final int KAFKA_PORT = 9092;
- public static final String KAFKA = "kafka1";
- public static final String ZOOKEEPER = "zoo1";
public static final String PIPE_ITERATOR_TOPIC = "pipe_iterator_topic";
public static final String EMITTER_TOPIC = "emitter_topic";
+ public static final String MY_GROUP_ID = "grpid";
private final int numDocs = 42;
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -62,18 +68,19 @@ public class TikaPipesKafkaTest {
"<html><body>" + bodyContent + "</body></html>", StandardCharsets.UTF_8);
waitingFor.add(nextFileName);
}
- FileUtils.copyInputStreamToFile(this.getClass().getResourceAsStream("/embedded/embedded.docx"),
- new File(testFileFolder, "test-embedded.docx"));
- waitingFor.add("test-embedded.docx");
}
- @ClassRule
- public static DockerComposeContainer environment =
- new DockerComposeContainer(new File("src/test/resources/kafka-docker/zk-single-kafka-single.yml"))
- .withExposedService(KAFKA, KAFKA_PORT)
- .withExposedService(ZOOKEEPER, ZK_PORT)
- .withLogConsumer(ZOOKEEPER, new Slf4jLogConsumer(LOG))
- .withLogConsumer(KAFKA, new Slf4jLogConsumer(LOG));
+ KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
+
+ @Before
+ public void before() {
+ kafka.start();
+ }
+
+ @After
+ public void after() {
+ kafka.close();
+ }
@Test
public void testKafkaPipeIterator()
@@ -91,21 +98,32 @@ public class TikaPipesKafkaTest {
tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
}
- Properties producerProps = new Properties();
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
- producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ Properties adminProperties = new Properties();
+ adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+
+ Admin admin = Admin.create(adminProperties);
+
+ NewTopic newTopic = new NewTopic(PIPE_ITERATOR_TOPIC, 1, (short) 1);
+ newTopic.configs(ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, "1680000"));
+
+ CreateTopicsResult result = admin.createTopics(
+ Collections.singleton(newTopic)
+ );
+
+ KafkaFuture<Void> future = result.values().get(PIPE_ITERATOR_TOPIC);
+ future.get();
Properties consumerProps = new Properties();
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer");
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "grp");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ consumerProps.put("group.id", MY_GROUP_ID);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
LOG.info("Listening to EMITTER_TOPIC={}", EMITTER_TOPIC);
@@ -113,6 +131,27 @@ public class TikaPipesKafkaTest {
ExecutorService es = Executors.newCachedThreadPool();
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ int numSent = 0;
+ for (int i = 0; i < numDocs; ++i) {
+ File nextFile = new File(testFileFolder, "test-" + i + ".html");
+ Map<String, Object> meta = new HashMap<>();
+ meta.put("name", nextFile.getName());
+ meta.put("path", nextFile.getAbsolutePath());
+ meta.put("totalSpace", nextFile.getTotalSpace());
+ try {
+ producer.send(new ProducerRecord<>(PIPE_ITERATOR_TOPIC,
+ nextFile.getAbsolutePath(),
+ objectMapper.writeValueAsString(meta))).get();
+ LOG.info("Sent fetch request : {}", nextFile.getAbsolutePath());
+ ++numSent;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ LOG.info("Producer is now complete - sent {}.", numSent);
+ }
+
es.execute(() -> {
try {
String tikaConfigXml =
@@ -126,38 +165,18 @@ public class TikaPipesKafkaTest {
}
});
- // Send the initial messages to the pipe iterator.
- KafkaProducer producer = new KafkaProducer(producerProps);
- int numSent = 0;
- for (int i = 0; i < numDocs; ++i) {
- File nextFile = new File(testFileFolder, "test-" + i + ".html");
- Map<String, Object> meta = new HashMap<>();
- meta.put("name", nextFile.getName());
- meta.put("path", nextFile.getAbsolutePath());
- meta.put("totalSpace", nextFile.getTotalSpace());
- try {
- producer.send(new ProducerRecord<>(PIPE_ITERATOR_TOPIC,
- nextFile.getAbsolutePath(),
- objectMapper.writeValueAsString(meta))).get();
- LOG.info("Sent fetch request : {}", nextFile.getAbsolutePath());
- ++numSent;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- LOG.info("Producer is now complete - sent {}.", numSent);
-
- LOG.info("Tika pipes have been run. See if we can pull the response messages from the EMITTER_TOPIC={}", EMITTER_TOPIC);
+ LOG.info("Tika pipes have been started. See if we can pull the response messages from the EMITTER_TOPIC={}", EMITTER_TOPIC);
Stopwatch stopwatch = Stopwatch.createStarted();
while (!waitingFor.isEmpty()) {
- Assert.assertFalse(stopwatch.elapsed(TimeUnit.SECONDS) > 600);
+ Assert.assertFalse(stopwatch.elapsed(TimeUnit.MINUTES) > 2);
try {
- ConsumerRecords<String, String> records = consumer.poll(12000);
+ consumer.seekToBeginning(consumer.assignment());
+ ConsumerRecords<String, String> records = consumer.poll(2000);
for (ConsumerRecord<String, String> record : records) {
String val = record.value();
Map<String, Object> valMap = objectMapper.readValue(val, Map.class);
- waitingFor.remove(record.key());
+ waitingFor.remove(FilenameUtils.getName(record.key()));
Assert.assertNotNull(valMap.get("content_s"));
Assert.assertNotNull(valMap.get("mime_s"));
Assert.assertNotNull(valMap.get("length_i"));
@@ -171,11 +190,6 @@ public class TikaPipesKafkaTest {
LOG.info("Done");
}
- @NotNull
- private String bootstrapServers() {
- return environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT);
- }
-
@NotNull
private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile,
String tikaConfigTemplateXml,
@@ -187,7 +201,7 @@ public class TikaPipesKafkaTest {
.replace("{PARSE_MODE}", parseMode.name())
.replace("{PIPE_ITERATOR_TOPIC}", PIPE_ITERATOR_TOPIC)
.replace("{EMITTER_TOPIC}", EMITTER_TOPIC)
- .replace("{BOOTSTRAP_SERVERS}", bootstrapServers());
+ .replace("{BOOTSTRAP_SERVERS}", kafka.getBootstrapServers());
return res;
}
}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
index 5a9b4aea6..f15faf262 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
@@ -95,7 +95,6 @@
<name>ke</name>
<topic>{EMITTER_TOPIC}</topic>
<bootstrapServers>{BOOTSTRAP_SERVERS}</bootstrapServers>
- <groupId>grpid</groupId>
</params>
</emitter>
<emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
@@ -111,7 +110,7 @@
<bootstrapServers>{BOOTSTRAP_SERVERS}</bootstrapServers>
<groupId>grpid</groupId>
<autoOffsetReset>earliest</autoOffsetReset>
- <pollDelayMs>50000</pollDelayMs>
+ <pollDelayMs>1000</pollDelayMs>
<fetcherName>fsf</fetcherName>
<emitterName>ke</emitterName>
</params>
diff --git a/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java b/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
index 6ca25c477..627876b4d 100644
--- a/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
@@ -52,7 +52,6 @@ public class KafkaEmitter extends AbstractEmitter implements Initializable {
String topic;
String bootstrapServers;
- String groupId;
String acks = "all";
int lingerMs = 5000;
@@ -192,11 +191,6 @@ public class KafkaEmitter extends AbstractEmitter implements Initializable {
this.topic = topic;
}
- @Field
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
@Override
public void emit(String emitKey, List<Metadata> metadataList)
throws IOException, TikaEmitterException {
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
index 4910a87dd..e6cd553c6 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
@@ -104,6 +104,7 @@ public class KafkaPipesIterator extends PipesIterator implements Initializable {
safePut(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializerClass(keySerializer, StringDeserializer.class));
safePut(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serializerClass(valueSerializer, StringDeserializer.class));
safePut(props, ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ safePut(props, "group.inital.rebalance.delay.ms", 4000);
safePut(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));