You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/12 09:42:37 UTC

[incubator-inlong] branch master updated: [INLONG-3083][Agent] Agent should update scala version in kafka client, and commit offset when read finish (#3084)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b6910b2  [INLONG-3083][Agent] Agent should update scala version in kafka client, and commit offset when read finish (#3084)
b6910b2 is described below

commit b6910b2b753e55f203fc68c9395cd79a4a86fa57
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sat Mar 12 17:42:33 2022 +0800

    [INLONG-3083][Agent] Agent should update scala version in kafka client, and commit offset when read finish (#3084)
---
 .../org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java    | 4 ++--
 inlong-agent/pom.xml                                                  | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 4ec53ae..f2c267b 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -136,8 +136,6 @@ public class KafkaReader<K, V> implements Reader {
                         "partition:" + record.partition() + "value:" + recordValue + ", offset:" + record.offset());
                 // control speed
                 kafkaMetric.incReadNum();
-                // commit offset
-                consumer.commitAsync();
                 // commit succeed,then record current offset
                 snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
                 DefaultMessage message = new DefaultMessage(recordValue.getBytes(StandardCharsets.UTF_8), headerMap);
@@ -145,6 +143,8 @@ public class KafkaReader<K, V> implements Reader {
                 return message;
             }
         } else {
+            // commit offset
+            consumer.commitAsync();
             fetchData(5000);
         }
         AgentUtils.silenceSleepInMs(waitTimeout);
diff --git a/inlong-agent/pom.xml b/inlong-agent/pom.xml
index af104a4..feaeca6 100644
--- a/inlong-agent/pom.xml
+++ b/inlong-agent/pom.xml
@@ -71,7 +71,7 @@
         <prometheus.simpleclient.version>0.9.0</prometheus.simpleclient.version>
         <kafka.version>3.0.0</kafka.version>
         <jackson.version>2.13.1</jackson.version>
-        <flink.scala.binary.version>2.12</flink.scala.binary.version>
+        <flink.scala.binary.version>2.13</flink.scala.binary.version>
         <akka.version>2.5.21</akka.version>
         <springcontext.version>5.3.13</springcontext.version>
     </properties>