You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2022/07/18 20:30:08 UTC
[tika] branch kafka updated: remove dead file, rename good test
This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch kafka
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/kafka by this push:
new cea660ed9 remove dead file, rename good test
cea660ed9 is described below
commit cea660ed9e759ddc72a2a9469a29d5030712188d
Author: Nicholas DiPiazza <ni...@lucidworks.com>
AuthorDate: Mon Jul 18 15:30:00 2022 -0500
remove dead file, rename good test
---
.../pipes/kafka/tests/TikaPipesKafka2Test.java | 116 -------------
.../tika/pipes/kafka/tests/TikaPipesKafkaTest.java | 181 +++++----------------
2 files changed, 45 insertions(+), 252 deletions(-)
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java
deleted file mode 100644
index 50bc81029..000000000
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.tika.pipes.kafka.tests;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.utility.DockerImageName;
-
-public class TikaPipesKafka2Test {
- private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafka2Test.class);
- public static final String MY_GROUP_ID = "my-group-id";
- public static final String TOPIC = "topic";
- public static final int NUM_DOCS = 100;
-
- KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
-
- @Before
- public void before() {
- kafka.start();
- }
-
- @After
- public void after() {
- kafka.close();
- }
-
- @Test
- public void testPipes() throws ExecutionException, InterruptedException {
-
- Properties consumerProps = new Properties();
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
- consumerProps.put("group.id", MY_GROUP_ID);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- ExecutorService es = Executors.newCachedThreadPool();
-
- Properties producerProps = new Properties();
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- AtomicInteger remaining = new AtomicInteger();
- Future producerFuture = es.submit(() -> {
- try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
- while (remaining.get() <= NUM_DOCS) {
- String msg = "Message " + remaining.getAndIncrement();
- producer.send(new ProducerRecord<>(TOPIC, msg));
- }
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- producerFuture.get();
-
- Future consumerFuture = es.submit(() -> {
- try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
- consumer.subscribe(Collections.singletonList(TOPIC));
-
- while (remaining.get() > 0) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<String, String> record : records) {
- remaining.decrementAndGet();
- LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- consumerFuture.get();
-
- try {
- consumerFuture.get(3, TimeUnit.MINUTES);
- } catch (TimeoutException e) {
- throw new AssertionError("Could not get the consumers from the queue in 30 minutes", e);
- }
-
- try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
- consumer.subscribe(Collections.singletonList(TOPIC));
-
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(NUM_DOCS));
-
- Assert.assertTrue(records.isEmpty());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
index cc5afc546..9ac62776d 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
@@ -1,26 +1,16 @@
package org.apache.tika.pipes.kafka.tests;
-import java.io.File;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Stopwatch;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,13 +18,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.tika.cli.TikaCLI;
-import org.apache.tika.pipes.HandlerConfig;
-import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -42,33 +27,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
import org.testcontainers.utility.DockerImageName;
public class TikaPipesKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafkaTest.class);
-
- public static final String PIPE_ITERATOR_TOPIC = "pipe_iterator_topic";
- public static final String EMITTER_TOPIC = "emitter_topic";
- public static final String MY_GROUP_ID = "grpid";
- private final int numDocs = 42;
-
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- private final File testFileFolder = new File("target", "test-files");
-
- private final Set<String> waitingFor = new HashSet<>();
-
- private void createTestFiles(String bodyContent) throws Exception {
- testFileFolder.mkdirs();
- for (int i = 0; i < numDocs; ++i) {
- String nextFileName = "test-" + i + ".html";
- FileUtils.writeStringToFile(new File(testFileFolder, nextFileName),
- "<html><body>" + bodyContent + "</body></html>", StandardCharsets.UTF_8);
- waitingFor.add(nextFileName);
- }
- }
+ public static final String MY_GROUP_ID = "my-group-id";
+ public static final String TOPIC = "topic";
+ public static final int NUM_DOCS = 100;
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@@ -83,35 +48,7 @@ public class TikaPipesKafkaTest {
}
@Test
- public void testKafkaPipeIterator()
- throws Exception {
- createTestFiles("initial");
- File tikaConfigFile = new File("target", "ta.xml");
- File log4jPropFile = new File("target", "tmp-log4j2.xml");
- try (InputStream is = this.getClass()
- .getResourceAsStream("/pipes-fork-server-custom-log4j2.xml")) {
- FileUtils.copyInputStreamToFile(is, log4jPropFile);
- }
- String tikaConfigTemplateXml;
- try (InputStream is = this.getClass()
- .getResourceAsStream("/tika-config-kafka.xml")) {
- tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
- }
-
- Properties adminProperties = new Properties();
- adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
-
- Admin admin = Admin.create(adminProperties);
-
- NewTopic newTopic = new NewTopic(PIPE_ITERATOR_TOPIC, 1, (short) 1);
- newTopic.configs(ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, "1680000"));
-
- CreateTopicsResult result = admin.createTopics(
- Collections.singleton(newTopic)
- );
-
- KafkaFuture<Void> future = result.values().get(PIPE_ITERATOR_TOPIC);
- future.get();
+ public void testPipes() throws ExecutionException, InterruptedException {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
@@ -120,88 +57,60 @@ public class TikaPipesKafkaTest {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ ExecutorService es = Executors.newCachedThreadPool();
+
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
- LOG.info("Listening to EMITTER_TOPIC={}", EMITTER_TOPIC);
- consumer.subscribe(Collections.singletonList(EMITTER_TOPIC));
-
- ExecutorService es = Executors.newCachedThreadPool();
-
- try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
- int numSent = 0;
- for (int i = 0; i < numDocs; ++i) {
- File nextFile = new File(testFileFolder, "test-" + i + ".html");
- Map<String, Object> meta = new HashMap<>();
- meta.put("name", nextFile.getName());
- meta.put("path", nextFile.getAbsolutePath());
- meta.put("totalSpace", nextFile.getTotalSpace());
- try {
- producer.send(new ProducerRecord<>(PIPE_ITERATOR_TOPIC,
- nextFile.getAbsolutePath(),
- objectMapper.writeValueAsString(meta))).get();
- LOG.info("Sent fetch request : {}", nextFile.getAbsolutePath());
- ++numSent;
- } catch (Exception e) {
- throw new RuntimeException(e);
+ AtomicInteger remaining = new AtomicInteger();
+ Future producerFuture = es.submit(() -> {
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ while (remaining.get() <= NUM_DOCS) {
+ String msg = "Message " + remaining.getAndIncrement();
+ producer.send(new ProducerRecord<>(TOPIC, msg));
}
- }
- LOG.info("Producer is now complete - sent {}.", numSent);
- }
- es.execute(() -> {
- try {
- String tikaConfigXml =
- createTikaConfigXml(tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
- HandlerConfig.PARSE_MODE.RMETA);
-
- FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
- TikaCLI.main(new String[] {"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
} catch (Exception e) {
throw new RuntimeException(e);
}
});
- LOG.info("Tika pipes have been started. See if we can pull the response messages from the EMITTER_TOPIC={}", EMITTER_TOPIC);
-
- Stopwatch stopwatch = Stopwatch.createStarted();
- while (!waitingFor.isEmpty()) {
- Assert.assertFalse(stopwatch.elapsed(TimeUnit.MINUTES) > 2);
- try {
- consumer.seekToBeginning(consumer.assignment());
- ConsumerRecords<String, String> records = consumer.poll(2000);
- for (ConsumerRecord<String, String> record : records) {
- String val = record.value();
- Map<String, Object> valMap = objectMapper.readValue(val, Map.class);
- waitingFor.remove(FilenameUtils.getName(record.key()));
- Assert.assertNotNull(valMap.get("content_s"));
- Assert.assertNotNull(valMap.get("mime_s"));
- Assert.assertNotNull(valMap.get("length_i"));
- LOG.info("Received message key={}, offset={}", record.key(), record.offset());
+ producerFuture.get();
+
+ Future consumerFuture = es.submit(() -> {
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+
+ while (remaining.get() > 0) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : records) {
+ remaining.decrementAndGet();
+ LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);
}
+ });
+
+ consumerFuture.get();
+
+ try {
+ consumerFuture.get(3, TimeUnit.MINUTES);
+ } catch (TimeoutException e) {
+ throw new AssertionError("Could not get the consumers from the queue in 30 minutes", e);
}
- LOG.info("Done");
- }
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(NUM_DOCS));
- @NotNull
- private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile,
- String tikaConfigTemplateXml,
- HandlerConfig.PARSE_MODE parseMode) {
- String res =
- tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
- .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
- .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath())
- .replace("{PARSE_MODE}", parseMode.name())
- .replace("{PIPE_ITERATOR_TOPIC}", PIPE_ITERATOR_TOPIC)
- .replace("{EMITTER_TOPIC}", EMITTER_TOPIC)
- .replace("{BOOTSTRAP_SERVERS}", kafka.getBootstrapServers());
- return res;
+ Assert.assertTrue(records.isEmpty());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}