You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/12 09:42:37 UTC
[incubator-inlong] branch master updated: [INLONG-3083][Agent] Agent should update scala version in kafka client, and commit offset when read finish (#3084)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b6910b2 [INLONG-3083][Agent] Agent should update scala version in kafka client, and commit offset when read finish (#3084)
b6910b2 is described below
commit b6910b2b753e55f203fc68c9395cd79a4a86fa57
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sat Mar 12 17:42:33 2022 +0800
[INLONG-3083][Agent] Agent should update scala version in kafka client, and commit offset when read finish (#3084)
---
.../org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java | 4 ++--
inlong-agent/pom.xml | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 4ec53ae..f2c267b 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -136,8 +136,6 @@ public class KafkaReader<K, V> implements Reader {
"partition:" + record.partition() + "value:" + recordValue + ", offset:" + record.offset());
// control speed
kafkaMetric.incReadNum();
- // commit offset
- consumer.commitAsync();
// commit succeed,then record current offset
snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
DefaultMessage message = new DefaultMessage(recordValue.getBytes(StandardCharsets.UTF_8), headerMap);
@@ -145,6 +143,8 @@ public class KafkaReader<K, V> implements Reader {
return message;
}
} else {
+ // commit offset
+ consumer.commitAsync();
fetchData(5000);
}
AgentUtils.silenceSleepInMs(waitTimeout);
diff --git a/inlong-agent/pom.xml b/inlong-agent/pom.xml
index af104a4..feaeca6 100644
--- a/inlong-agent/pom.xml
+++ b/inlong-agent/pom.xml
@@ -71,7 +71,7 @@
<prometheus.simpleclient.version>0.9.0</prometheus.simpleclient.version>
<kafka.version>3.0.0</kafka.version>
<jackson.version>2.13.1</jackson.version>
- <flink.scala.binary.version>2.12</flink.scala.binary.version>
+ <flink.scala.binary.version>2.13</flink.scala.binary.version>
<akka.version>2.5.21</akka.version>
<springcontext.version>5.3.13</springcontext.version>
</properties>