You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2022/07/18 20:27:03 UTC

[tika] branch kafka updated: finish setting up intiial kafka commit - tests pass.

This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch kafka
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/kafka by this push:
     new 026aa00bb finish setting up intiial kafka commit - tests pass.
026aa00bb is described below

commit 026aa00bb99892478708f32eef396eb8fc35ae6b
Author: Nicholas DiPiazza <ni...@lucidworks.com>
AuthorDate: Mon Jul 18 15:26:55 2022 -0500

    finish setting up intiial kafka commit - tests pass.
---
 .../tika-pipes-kafka-integration-tests/pom.xml     |   6 +
 .../pipes/kafka/tests/TikaPipesKafka2Test.java     | 116 ++++++++++++++++++
 .../tika/pipes/kafka/tests/TikaPipesKafkaTest.java | 132 ++++++++++++---------
 .../src/test/resources/tika-config-kafka.xml       |   3 +-
 .../tika/pipes/emitter/kafka/KafkaEmitter.java     |   6 -
 .../pipesiterator/kafka/KafkaPipesIterator.java    |   1 +
 6 files changed, 197 insertions(+), 67 deletions(-)

diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
index 9b65cbb4a..493444321 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/pom.xml
@@ -35,6 +35,12 @@
       <version>${test.containers.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <version>${test.containers.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-core</artifactId>
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java
new file mode 100644
index 000000000..50bc81029
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafka2Test.java
@@ -0,0 +1,116 @@
+package org.apache.tika.pipes.kafka.tests;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesKafka2Test {
+    private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafka2Test.class);
+    public static final String MY_GROUP_ID = "my-group-id";
+    public static final String TOPIC = "topic";
+    public static final int NUM_DOCS = 100;
+
+    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
+
+    @Before
+    public void before() {
+        kafka.start();
+    }
+
+    @After
+    public void after() {
+        kafka.close();
+    }
+
+    @Test
+    public void testPipes() throws ExecutionException, InterruptedException {
+
+        Properties consumerProps = new Properties();
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+        consumerProps.put("group.id", MY_GROUP_ID);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        ExecutorService es = Executors.newCachedThreadPool();
+
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+
+        AtomicInteger remaining = new AtomicInteger();
+        Future producerFuture = es.submit(() -> {
+            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+                while (remaining.get() <= NUM_DOCS) {
+                    String msg = "Message " + remaining.getAndIncrement();
+                    producer.send(new ProducerRecord<>(TOPIC, msg));
+                }
+
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        producerFuture.get();
+
+        Future consumerFuture = es.submit(() -> {
+            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+                consumer.subscribe(Collections.singletonList(TOPIC));
+
+                while (remaining.get() > 0) {
+                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+                    for (ConsumerRecord<String, String> record : records) {
+                        remaining.decrementAndGet();
+                        LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        consumerFuture.get();
+
+        try {
+            consumerFuture.get(3, TimeUnit.MINUTES);
+        } catch (TimeoutException e) {
+            throw new AssertionError("Could not get the consumers from the queue in 30 minutes", e);
+        }
+
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+            consumer.subscribe(Collections.singletonList(TOPIC));
+
+            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(NUM_DOCS));
+
+            Assert.assertTrue(records.isEmpty());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
index b8af1e2bb..cc5afc546 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/java/org/apache/tika/pipes/kafka/tests/TikaPipesKafkaTest.java
@@ -15,7 +15,12 @@ import java.util.concurrent.TimeUnit;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Stopwatch;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -23,29 +28,30 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.tika.cli.TikaCLI;
 import org.apache.tika.pipes.HandlerConfig;
 import org.jetbrains.annotations.NotNull;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.ClassRule;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.DockerComposeContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.utility.DockerImageName;
 
 public class TikaPipesKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafkaTest.class);
 
-    public static final int ZK_PORT = 2181;
-    public static final int KAFKA_PORT = 9092;
-    public static final String KAFKA = "kafka1";
-    public static final String ZOOKEEPER = "zoo1";
     public static final String PIPE_ITERATOR_TOPIC = "pipe_iterator_topic";
     public static final String EMITTER_TOPIC = "emitter_topic";
+    public static final String MY_GROUP_ID = "grpid";
     private final int numDocs = 42;
 
     private final ObjectMapper objectMapper = new ObjectMapper();
@@ -62,18 +68,19 @@ public class TikaPipesKafkaTest {
                     "<html><body>" + bodyContent + "</body></html>", StandardCharsets.UTF_8);
             waitingFor.add(nextFileName);
         }
-        FileUtils.copyInputStreamToFile(this.getClass().getResourceAsStream("/embedded/embedded.docx"),
-                new File(testFileFolder, "test-embedded.docx"));
-        waitingFor.add("test-embedded.docx");
     }
 
-    @ClassRule
-    public static DockerComposeContainer environment =
-            new DockerComposeContainer(new File("src/test/resources/kafka-docker/zk-single-kafka-single.yml"))
-                    .withExposedService(KAFKA, KAFKA_PORT)
-                    .withExposedService(ZOOKEEPER, ZK_PORT)
-                    .withLogConsumer(ZOOKEEPER, new Slf4jLogConsumer(LOG))
-                    .withLogConsumer(KAFKA, new Slf4jLogConsumer(LOG));
+    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
+
+    @Before
+    public void before() {
+        kafka.start();
+    }
+
+    @After
+    public void after() {
+        kafka.close();
+    }
 
     @Test
     public void testKafkaPipeIterator()
@@ -91,21 +98,32 @@ public class TikaPipesKafkaTest {
             tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
         }
 
-        Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
-        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        Properties adminProperties = new Properties();
+        adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+
+        Admin admin = Admin.create(adminProperties);
+
+        NewTopic newTopic = new NewTopic(PIPE_ITERATOR_TOPIC, 1, (short) 1);
+        newTopic.configs(ImmutableMap.of(TopicConfig.RETENTION_MS_CONFIG, "1680000"));
+
+        CreateTopicsResult result = admin.createTopics(
+                Collections.singleton(newTopic)
+        );
+
+        KafkaFuture<Void> future = result.values().get(PIPE_ITERATOR_TOPIC);
+        future.get();
 
         Properties consumerProps = new Properties();
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer");
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "grp");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+        consumerProps.put("group.id", MY_GROUP_ID);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
         LOG.info("Listening to EMITTER_TOPIC={}", EMITTER_TOPIC);
@@ -113,6 +131,27 @@ public class TikaPipesKafkaTest {
 
         ExecutorService es = Executors.newCachedThreadPool();
 
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            int numSent = 0;
+            for (int i = 0; i < numDocs; ++i) {
+                File nextFile = new File(testFileFolder, "test-" + i + ".html");
+                Map<String, Object> meta = new HashMap<>();
+                meta.put("name", nextFile.getName());
+                meta.put("path", nextFile.getAbsolutePath());
+                meta.put("totalSpace", nextFile.getTotalSpace());
+                try {
+                    producer.send(new ProducerRecord<>(PIPE_ITERATOR_TOPIC,
+                            nextFile.getAbsolutePath(),
+                            objectMapper.writeValueAsString(meta))).get();
+                    LOG.info("Sent fetch request : {}", nextFile.getAbsolutePath());
+                    ++numSent;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            LOG.info("Producer is now complete - sent {}.", numSent);
+        }
+
         es.execute(() -> {
             try {
                 String tikaConfigXml =
@@ -126,38 +165,18 @@ public class TikaPipesKafkaTest {
             }
         });
 
-        // Send the initial messages to the pipe iterator.
-        KafkaProducer producer = new KafkaProducer(producerProps);
-        int numSent = 0;
-        for (int i = 0; i < numDocs; ++i) {
-            File nextFile = new File(testFileFolder, "test-" + i + ".html");
-            Map<String, Object> meta = new HashMap<>();
-            meta.put("name", nextFile.getName());
-            meta.put("path", nextFile.getAbsolutePath());
-            meta.put("totalSpace", nextFile.getTotalSpace());
-            try {
-                producer.send(new ProducerRecord<>(PIPE_ITERATOR_TOPIC,
-                        nextFile.getAbsolutePath(),
-                        objectMapper.writeValueAsString(meta))).get();
-                LOG.info("Sent fetch request : {}", nextFile.getAbsolutePath());
-                ++numSent;
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        LOG.info("Producer is now complete - sent {}.", numSent);
-
-        LOG.info("Tika pipes have been run. See if we can pull the response messages from the EMITTER_TOPIC={}", EMITTER_TOPIC);
+        LOG.info("Tika pipes have been started. See if we can pull the response messages from the EMITTER_TOPIC={}", EMITTER_TOPIC);
 
         Stopwatch stopwatch = Stopwatch.createStarted();
         while (!waitingFor.isEmpty()) {
-            Assert.assertFalse(stopwatch.elapsed(TimeUnit.SECONDS) > 600);
+            Assert.assertFalse(stopwatch.elapsed(TimeUnit.MINUTES) > 2);
             try {
-                ConsumerRecords<String, String> records = consumer.poll(12000);
+                consumer.seekToBeginning(consumer.assignment());
+                ConsumerRecords<String, String> records = consumer.poll(2000);
                 for (ConsumerRecord<String, String> record : records) {
                     String val = record.value();
                     Map<String, Object> valMap = objectMapper.readValue(val, Map.class);
-                    waitingFor.remove(record.key());
+                    waitingFor.remove(FilenameUtils.getName(record.key()));
                     Assert.assertNotNull(valMap.get("content_s"));
                     Assert.assertNotNull(valMap.get("mime_s"));
                     Assert.assertNotNull(valMap.get("length_i"));
@@ -171,11 +190,6 @@ public class TikaPipesKafkaTest {
         LOG.info("Done");
     }
 
-    @NotNull
-    private String bootstrapServers() {
-        return environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT);
-    }
-
     @NotNull
     private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile,
                                        String tikaConfigTemplateXml,
@@ -187,7 +201,7 @@ public class TikaPipesKafkaTest {
                         .replace("{PARSE_MODE}", parseMode.name())
                         .replace("{PIPE_ITERATOR_TOPIC}", PIPE_ITERATOR_TOPIC)
                         .replace("{EMITTER_TOPIC}", EMITTER_TOPIC)
-                        .replace("{BOOTSTRAP_SERVERS}", bootstrapServers());
+                        .replace("{BOOTSTRAP_SERVERS}", kafka.getBootstrapServers());
         return res;
     }
 }
diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
index 5a9b4aea6..f15faf262 100644
--- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
+++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/tika-config-kafka.xml
@@ -95,7 +95,6 @@
                 <name>ke</name>
                 <topic>{EMITTER_TOPIC}</topic>
                 <bootstrapServers>{BOOTSTRAP_SERVERS}</bootstrapServers>
-                <groupId>grpid</groupId>
             </params>
         </emitter>
         <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
@@ -111,7 +110,7 @@
             <bootstrapServers>{BOOTSTRAP_SERVERS}</bootstrapServers>
             <groupId>grpid</groupId>
             <autoOffsetReset>earliest</autoOffsetReset>
-            <pollDelayMs>50000</pollDelayMs>
+            <pollDelayMs>1000</pollDelayMs>
             <fetcherName>fsf</fetcherName>
             <emitterName>ke</emitterName>
         </params>
diff --git a/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java b/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
index 6ca25c477..627876b4d 100644
--- a/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-kafka/src/main/java/org/apache/tika/pipes/emitter/kafka/KafkaEmitter.java
@@ -52,7 +52,6 @@ public class KafkaEmitter extends AbstractEmitter implements Initializable {
 
     String topic;
     String bootstrapServers;
-    String groupId;
 
     String acks = "all";
     int lingerMs = 5000;
@@ -192,11 +191,6 @@ public class KafkaEmitter extends AbstractEmitter implements Initializable {
         this.topic = topic;
     }
 
-    @Field
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
-    }
-
     @Override
     public void emit(String emitKey, List<Metadata> metadataList)
             throws IOException, TikaEmitterException {
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
index 4910a87dd..e6cd553c6 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
@@ -104,6 +104,7 @@ public class KafkaPipesIterator extends PipesIterator implements Initializable {
         safePut(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializerClass(keySerializer, StringDeserializer.class));
         safePut(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serializerClass(valueSerializer, StringDeserializer.class));
         safePut(props, ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        safePut(props, "group.inital.rebalance.delay.ms", 4000);
         safePut(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
         consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(topic));