You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/10 07:13:47 UTC
[incubator-inlong] 01/02: [INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 10e328485a19e80611194521e17b63196ecff0bd
Author: zk1510 <59...@qq.com>
AuthorDate: Fri Jun 10 15:02:46 2022 +0800
[INLONG-4483][Agent] Many ConnectException logs in unit test of Kafka source (#4590)
---
.../agent/plugin/sources/TestKafkaReader.java | 64 +++++++++-------------
1 file changed, 25 insertions(+), 39 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
index 9e28f6586..f9f118533 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestKafkaReader.java
@@ -17,51 +17,37 @@
package org.apache.inlong.agent.plugin.sources;
-import java.util.List;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.Reader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class TestKafkaReader {
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
- private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaReader.class);
+public class TestKafkaReader {
@Test
- public void testKafkaReader() {
- KafkaSource kafkaSource = new KafkaSource();
- JobProfile conf = JobProfile.parseJsonStr("{}");
- conf.set("job.kafkaJob.topic", "test3");
- conf.set("job.kafkaJob.bootstrap.servers", "127.0.0.1:9092");
- conf.set("job.kafkaJob.group.id", "test_group1");
- conf.set("job.kafkaJob.recordspeed.limit", "1");
- conf.set("job.kafkaJob.bytespeed.limit", "1");
- conf.set("job.kafkaJob.partition.offset", "0#0");
- conf.set("job.kafkaJob.autoOffsetReset", "latest");
- conf.set("proxy.inlongGroupId", "");
- conf.set("proxy.inlongStreamId", "");
+ public void testKafkaConsumerInit() {
+ MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ final String topic = "my_topic";
- try {
- List<Reader> readers = kafkaSource.split(conf);
- LOGGER.info("total readers by split after:{}", readers.size());
- readers.forEach(reader -> {
- reader.init(conf);
- Runnable runnable = () -> {
- while (!reader.isFinished()) {
- Message msg = reader.read();
- if (msg != null) {
- LOGGER.info(new String(msg.getBody()));
- }
- }
- LOGGER.info("reader is finished!");
- };
- // start thread
- new Thread(runnable).start();
- });
- } catch (Exception e) {
- LOGGER.error("get record failed:", e);
+ mockConsumer.assign(Collections.singletonList(new TopicPartition(topic, 0)));
+ HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(new TopicPartition(topic, 0), 0L);
+ mockConsumer.updateBeginningOffsets(beginningOffsets);
+
+ mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 0L, "test_key", "test_value"));
+ ConsumerRecords<String, String> records = mockConsumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record : records) {
+ byte[] recordValue = record.value().getBytes(StandardCharsets.UTF_8);
+ Assert.assertArrayEquals("test_value".getBytes(StandardCharsets.UTF_8), recordValue);
}
}
+
}