You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 07:13:47 UTC

[incubator-inlong] 01/02: [INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 10e328485a19e80611194521e17b63196ecff0bd
Author: zk1510 <59...@qq.com>
AuthorDate: Fri Jun 10 15:02:46 2022 +0800

    [INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)
---
 .../agent/plugin/sources/TestKafkaReader.java      | 64 +++++++++-------------
 1 file changed, 25 insertions(+), 39 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
index 9e28f6586..f9f118533 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
@@ -17,51 +17,37 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
-import java.util.List;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class TestKafkaReader {
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaReader.class);
+public class TestKafkaReader {
 
     @Test
-    public void testKafkaReader() {
-        KafkaSource kafkaSource = new KafkaSource();
-        JobProfile conf = JobProfile.parseJsonStr("{}");
-        conf.set("job.kafkaJob.topic", "test3");
-        conf.set("job.kafkaJob.bootstrap.servers", "127.0.0.1:9092");
-        conf.set("job.kafkaJob.group.id", "test_group1");
-        conf.set("job.kafkaJob.recordspeed.limit", "1");
-        conf.set("job.kafkaJob.bytespeed.limit", "1");
-        conf.set("job.kafkaJob.partition.offset", "0#0");
-        conf.set("job.kafkaJob.autoOffsetReset", "latest");
-        conf.set("proxy.inlongGroupId", "");
-        conf.set("proxy.inlongStreamId", "");
+    public void testKafkaConsumerInit() {
+        MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final String topic = "my_topic";
 
-        try {
-            List<Reader> readers = kafkaSource.split(conf);
-            LOGGER.info("total readers by split after:{}", readers.size());
-            readers.forEach(reader -> {
-                reader.init(conf);
-                Runnable runnable = () -> {
-                    while (!reader.isFinished()) {
-                        Message msg = reader.read();
-                        if (msg != null) {
-                            LOGGER.info(new String(msg.getBody()));
-                        }
-                    }
-                    LOGGER.info("reader is finished!");
-                };
-                // start thread
-                new Thread(runnable).start();
-            });
-        } catch (Exception e) {
-            LOGGER.error("get record failed:", e);
+        mockConsumer.assign(Collections.singletonList(new TopicPartition(topic, 0)));
+        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(new TopicPartition(topic, 0), 0L);
+        mockConsumer.updateBeginningOffsets(beginningOffsets);
+
+        mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 0L, "test_key", "test_value"));
+        ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(1000));
+        for (ConsumerRecord<String, String> record : records) {
+            byte[] recordValue = record.value().getBytes(StandardCharsets.UTF_8);
+            Assert.assertArrayEquals("test_value".getBytes(StandardCharsets.UTF_8), recordValue);
         }
     }
+
 }