You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 06:47:22 UTC

[incubator-inlong] branch master updated: [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager (#2929)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 43cfef7  [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager (#2929)
43cfef7 is described below

commit 43cfef77b432971bc1ee7ec5ebe96b7fe8e9c9fa
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sun Mar 6 14:47:14 2022 +0800

    [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager (#2929)
    
    * [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager
    
    * [INLONG-2928][Agent] Fix Kafka consumer multiple thread modification bugs
---
 .../org/apache/inlong/agent/conf/JobProfile.java   |  3 +-
 .../apache/inlong/agent/conf/TriggerProfile.java   |  2 +-
 .../org/apache/inlong/agent/db/JobProfileDb.java   | 12 +++---
 .../org/apache/inlong/agent/plugin/Reader.java     |  5 +++
 .../org/apache/inlong/agent/pojo/BinlogJob.java    |  6 +--
 .../apache/inlong/agent/pojo/JobProfileDto.java    | 15 +++----
 .../org/apache/inlong/agent/utils/AgentUtils.java  |  6 +--
 .../apache/inlong/agent/db/TestBerkeleyDBImp.java  |  2 +-
 .../org/apache/inlong/agent/core/AgentManager.java |  2 +
 .../apache/inlong/agent/core/HeartbeatManager.java | 46 ++++++++++------------
 .../apache/inlong/agent/core/job/JobManager.java   |  4 +-
 .../apache/inlong/agent/core/task/TaskWrapper.java |  3 +-
 .../apache/inlong/agent/task/TestTaskWrapper.java  |  5 +++
 .../agent/plugin/fetcher/ManagerFetcher.java       | 12 +++---
 .../agent/plugin/sources/reader/BinlogReader.java  | 24 ++++++++---
 .../agent/plugin/sources/reader/KafkaReader.java   | 17 ++++++--
 .../agent/plugin/sources/reader/SqlReader.java     |  5 +++
 .../plugin/sources/reader/TextFileReader.java      |  5 +++
 18 files changed, 109 insertions(+), 65 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
index 8448284..3d9d24b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
@@ -71,8 +71,7 @@ public class JobProfile extends AbstractConfiguration {
     @Override
     public boolean allRequiredKeyExist() {
         return hasKey(JobConstants.JOB_ID) && hasKey(JobConstants.JOB_SOURCE_CLASS)
-                && hasKey(JobConstants.JOB_SINK) && hasKey(JobConstants.JOB_CHANNEL) && hasKey(
-            JobConstants.JOB_NAME);
+                && hasKey(JobConstants.JOB_SINK) && hasKey(JobConstants.JOB_CHANNEL);
     }
 
     public String toJsonStr() {
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
index 6d1aa79..6c09e82 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
@@ -56,7 +56,7 @@ public class TriggerProfile extends JobProfile {
 
     @Override
     public boolean allRequiredKeyExist() {
-        return hasKey(JobConstants.JOB_TRIGGER) && super.allRequiredKeyExist();
+        return super.allRequiredKeyExist();
     }
 
     public String getTriggerId() {
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
index 6469efc..93769ec 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
@@ -36,8 +36,11 @@ public class JobProfileDb {
         this.db = db;
     }
 
-    public List<JobProfile> getAcceptedJobs() {
-        return getJobsByState(StateSearchKey.ACCEPTED);
+    public List<JobProfile> getRestartJobs() {
+        List<JobProfile> jobsByState = getJobsByState(StateSearchKey.ACCEPTED);
+        jobsByState.addAll(getJobsByState(StateSearchKey.RUNNING));
+        LOGGER.info("try to get restart jobs from db {}", jobsByState);
+        return jobsByState;
     }
 
     /**
@@ -64,6 +67,7 @@ public class JobProfileDb {
             KeyValueEntity entity = new KeyValueEntity(keyName,
                 jobProfile.toJsonStr(), jobProfile.get(JobConstants.JOB_DIR_FILTER_PATTERN, ""));
             entity.setStateSearchKey(StateSearchKey.ACCEPTED);
+            LOGGER.info("store job {} to db", jobProfile.toJsonStr());
             db.put(entity);
         }
     }
@@ -164,9 +168,7 @@ public class JobProfileDb {
         List<KeyValueEntity> entityList = db.search(stateSearchKey);
         List<JobProfile> profileList = new ArrayList<>();
         for (KeyValueEntity entity : entityList) {
-            if (entity.getKey().startsWith(JobConstants.JOB_ID_PREFIX)) {
-                profileList.add(entity.getAsJobProfile());
-            }
+            profileList.add(entity.getAsJobProfile());
         }
         return profileList;
     }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
index cec8285..4206b6b 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
@@ -58,4 +58,9 @@ public interface Reader extends Stage {
      * @return
      */
     String getSnapshot();
+
+    /**
+     * finish read
+     */
+    void finishRead();
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java
index 8edaa53..8c8ae69 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java
@@ -61,7 +61,7 @@ public class BinlogJob {
         private  String password;
         private  String hostname;
         private  String port;
-        private  String schema;
+        private  String includeSchema;
 
         private  String databaseWhiteList;
         private  String tableWhiteList;
@@ -69,8 +69,8 @@ public class BinlogJob {
         private  String intervalMs;
         private  String offsetFilename;
         private  String historyFilename;
-        private  String mode;
-        private  String ddl;
+        private  String snapshotMode;
+        private  String monitoredDdl;
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index ffca852..2b02ab2 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.agent.pojo;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+
 import com.google.gson.Gson;
 import lombok.Data;
 import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -24,10 +28,6 @@ import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
-import static java.util.Objects.requireNonNull;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-
 @Data
 public class JobProfileDto {
 
@@ -64,10 +64,10 @@ public class JobProfileDto {
         binlogJob.setUser(binlogJobTaskConfig.getUser());
         binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList());
         binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList());
-        binlogJob.setSchema(binlogJobTaskConfig.getSchema());
+        binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema());
         binlogJob.setPort(binlogJobTaskConfig.getPort());
         binlogJob.setOffsets(dataConfigs.getSnapshot());
-        binlogJob.setDdl(binlogJobTaskConfig.getDdl());
+        binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl());
         binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone());
 
         BinlogJob.Offset offset = new BinlogJob.Offset();
@@ -77,7 +77,7 @@ public class JobProfileDto {
         binlogJob.setOffset(offset);
 
         BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot();
-        snapshot.setMode(binlogJobTaskConfig.getMode());
+        snapshot.setMode(binlogJobTaskConfig.getSnapshotMode());
 
         binlogJob.setSnapshot(snapshot);
 
@@ -174,6 +174,7 @@ public class JobProfileDto {
         job.setOp(dataConfigs.getOp());
         job.setDeliveryTime(dataConfigs.getDeliveryTime());
         job.setUuid(dataConfigs.getUuid());
+        job.setSink(DEFAULT_DATAPROXY_SINK);
         TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
         switch (requireNonNull(taskType)) {
             case SQL:
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index c60bdcc..5d22f4c 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -337,8 +337,7 @@ public class AgentUtils {
      * check agent ip from manager
      */
     public static String fetchLocalIp() {
-        String localIp = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP, DEFAULT_LOCAL_IP);
-        return localIp;
+        return AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP, DEFAULT_LOCAL_IP);
     }
 
     /**
@@ -346,8 +345,7 @@ public class AgentUtils {
      */
     public static String  fetchLocalUuid() {
         String result = ExcuteLinux.exeCmd("dmidecode | grep UUID");
-        String  uuid = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID, result);
-        return uuid;
+        return AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID, result);
     }
 
     /**
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
index 2fda763..4458f0c 100755
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
@@ -80,7 +80,7 @@ public class TestBerkeleyDBImp {
 
     @Test
     public void testCommandDb() {
-        CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "2022-03-05 12:10:10");
+        CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "1");
         db.putCommand(commandEntity);
         CommandEntity command = db.getCommand("1");
         Assert.assertEquals("1", command.getId());
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index df0d6a2..64b1cff 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -168,6 +168,7 @@ public class AgentManager extends AbstractDaemon {
         triggerManager.start();
         jobManager.start();
         taskManager.start();
+        heartbeatManager.start();
         taskPositionManager.start();
         // read job profiles from local
         List<JobProfile> profileList = localProfile.readFromLocal();
@@ -207,6 +208,7 @@ public class AgentManager extends AbstractDaemon {
         triggerManager.stop();
         jobManager.stop();
         taskManager.stop();
+        heartbeatManager.stop();
         taskPositionManager.stop();
         this.db.close();
     }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index fd129d1..783168f 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.core;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.StringUtils;
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -26,10 +27,8 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -42,8 +41,8 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VI
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
 import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
 import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
+import static org.apache.inlong.agent.core.task.TaskPositionManager.DEFAULT_FLUSH_TIMEOUT;
 
-@Component
 public class HeartbeatManager  extends AbstractDaemon {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
@@ -72,14 +71,10 @@ public class HeartbeatManager  extends AbstractDaemon {
      * @return
      */
     private TaskSnapshotRequest getHeartBeat() {
-        AgentManager agentManager = new AgentManager();
-        HeartbeatManager heartbeatManager = new HeartbeatManager(agentManager);
-        JobManager jobManager = agentManager.getJobManager();
-        Map<String, JobWrapper> jobWrapperMap = jobManager.getJobs();
+        Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
 
         List<TaskSnapshotMessage> taskSnapshotMessageList = new ArrayList<>();
         TaskSnapshotRequest taskSnapshotRequest = new TaskSnapshotRequest();
-        TaskSnapshotMessage snapshotMessage = new TaskSnapshotMessage();
 
         Date date = new Date(System.currentTimeMillis());
 
@@ -90,10 +85,10 @@ public class HeartbeatManager  extends AbstractDaemon {
             }
             String offset = entry.getValue().getSnapshot();
             String jobId = entry.getKey();
+            TaskSnapshotMessage snapshotMessage = new TaskSnapshotMessage();
             snapshotMessage.setSnapshot(offset);
             snapshotMessage.setJobId(Integer.valueOf(jobId));
             taskSnapshotMessageList.add(snapshotMessage);
-
         }
         taskSnapshotRequest.setSnapshotList(taskSnapshotMessageList);
         taskSnapshotRequest.setReportTime(date);
@@ -103,21 +98,6 @@ public class HeartbeatManager  extends AbstractDaemon {
     }
 
     /**
-     * report heartbeat (default : per minute)
-     */
-    @Scheduled(cron = "${agent.scheduled.snapshotreport:0 0/1 * * * ? *}")
-    private  void sendHeartBeat() {
-        TaskSnapshotRequest taskSnapshotRequest = getHeartBeat();
-        try {
-            String returnStr = httpManager.doSentPost(reportSnapshotUrl,taskSnapshotRequest);
-            LOGGER.info(" {} report to manager",taskSnapshotRequest);
-        } catch (Throwable e) {
-            LOGGER.error(" sendHeartBeat to" + reportSnapshotUrl
-                    + " exception {}, {}", e.toString(), e.getStackTrace());
-        }
-    }
-
-    /**
      * build base url for manager according to config
      *
      * @example - http://127.0.0.1:8080/api/inlong/manager/openapi
@@ -135,11 +115,27 @@ public class HeartbeatManager  extends AbstractDaemon {
 
     @Override
     public void start() throws Exception {
+        submitWorker(heartBeatReportThread());
+    }
 
+    private Runnable heartBeatReportThread() {
+        return () -> {
+            while (isRunnable()) {
+                try {
+                    TaskSnapshotRequest taskSnapshotRequest = getHeartBeat();
+                    httpManager.doSentPost(reportSnapshotUrl,taskSnapshotRequest);
+                    LOGGER.info(" {} report to manager",taskSnapshotRequest);
+                    TimeUnit.SECONDS.sleep(DEFAULT_FLUSH_TIMEOUT);
+                } catch (Exception ex) {
+                    LOGGER.error("error caught", ex);
+                }
+            }
+        };
     }
 
     @Override
     public void stop() throws Exception {
-
+        waitForTerminate();
     }
+
 }
\ No newline at end of file
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 80e4181..1735a1c 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -178,8 +178,8 @@ public class JobManager extends AbstractDaemon {
      * @param jobInstancId
      */
     public boolean deleteJob(String jobInstancId) {
-        JobWrapper jobWrapper = jobs.remove(jobInstancId);
         LOGGER.info("start to delete job, job id set {}", jobs.keySet());
+        JobWrapper jobWrapper = jobs.remove(jobInstancId);
         if (jobWrapper != null) {
             LOGGER.info("delete job instance with job id {}", jobInstancId);
             jobWrapper.cleanup();
@@ -193,7 +193,7 @@ public class JobManager extends AbstractDaemon {
      * start all accepted jobs.
      */
     private void startJobs() {
-        List<JobProfile> profileList = getJobConfDb().getAcceptedJobs();
+        List<JobProfile> profileList = getJobConfDb().getRestartJobs();
         for (JobProfile profile : profileList) {
             LOGGER.info("init starting job from db {}", profile.toJsonStr());
             addJob(new Job(profile));
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index 4e8c029..7bdfc7b 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -91,6 +91,7 @@ public class TaskWrapper extends AbstractStateWrapper {
                     isException(), task.isReadFinished());
             // write end message
             task.getChannel().push(new EndMessage());
+            task.getReader().destroy();
         }, executorService);
     }
 
@@ -149,7 +150,7 @@ public class TaskWrapper extends AbstractStateWrapper {
      */
     void destroyTask() {
         LOGGER.info("destroy task id is {}", task.getTaskId());
-        task.getReader().destroy();
+        task.getReader().finishRead();
     }
 
     /**
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 11a3400..8a9131b 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -149,6 +149,11 @@ public class TestTaskWrapper {
             return null;
         }
 
+        @Override
+        public void finishRead() {
+
+        }
+
         public int getCount() {
             return count;
         }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 4ce560c..84063da 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.plugin.fetcher;
 
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
@@ -96,7 +97,8 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
 
     public static final String AGENT = "agent";
     private static final Logger LOGGER = LoggerFactory.getLogger(ManagerFetcher.class);
-    private static final Gson GSON = new Gson();
+    private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
+    private static final Gson GSON = gsonBuilder.create();
     private static final int MAX_RETRY = 2;
     private final String managerVipUrl;
     private final String baseManagerUrl;
@@ -261,7 +263,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
         }
         JobProfile profile = taskResult.getJobProfile();
         if (profile == null) {
-            LOGGER.error("profile is null");
             return;
         }
         agentManager.getJobManager().submitSqlJobProfile(profile);
@@ -272,10 +273,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
      */
     private void dealWithFileTaskResult(TaskResult taskResult) {
         LOGGER.info("deal with fetch result {}", taskResult);
-
         for (DataConfig dataConfig : taskResult.getDataConfigs()) {
             TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
-            LOGGER.info("the triggerProfile: {}", profile);
+            LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
             if (profile.hasKey(JOB_TRIGGER)) {
                 dealWithTdmTriggerProfile(profile);
             } else {
@@ -497,8 +497,8 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
     @Override
     public void start() throws Exception {
         // when agent start, check local ip and fetch manager ip list;
-        fetchLocalIp();
-        fetchLocalUuid();
+        localIp = fetchLocalIp();
+        uuid = fetchLocalUuid();
         fetchTdmList(true, 0);
         submitWorker(profileFetchThread());
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 052695b..6a3bd06 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -98,7 +98,7 @@ public class BinlogReader implements Reader {
     private BinlogSnapshotBase binlogSnapshot;
     private JobProfile jobProfile;
     private static final Gson gson = new Gson();
-
+    private boolean destroyed = false;
     private boolean enableReportConfigLog;
     private StreamConfigLogMetric streamConfigLogMetric;
 
@@ -231,6 +231,7 @@ public class BinlogReader implements Reader {
         props.setProperty("database.history.file.filename", databaseStoreHistoryName);
         props.setProperty("database.snapshot.mode", snapshotMode);
         props.setProperty("database.history.store.only.monitored.tables.ddl", historyMonitorDdl);
+        props.setProperty("database.allowPublicKeyRetrieval", "true");
         props.setProperty("key.converter.schemas.enable", "false");
         props.setProperty("value.converter.schemas.enable", "false");
         props.setProperty("include.schema.changes", includeSchemaChanges);
@@ -242,9 +243,13 @@ public class BinlogReader implements Reader {
 
     @Override
     public void destroy() {
-        finished = true;
-        executor.shutdownNow();
-        binlogSnapshot.close();
+        synchronized (this) {
+            if (!destroyed) {
+                executor.shutdownNow();
+                binlogSnapshot.close();
+                destroyed = true;
+            }
+        }
     }
 
     @Override
@@ -269,7 +274,16 @@ public class BinlogReader implements Reader {
 
     @Override
     public String getSnapshot() {
-        return binlogSnapshot.getSnapshot();
+        if (binlogSnapshot != null) {
+            return binlogSnapshot.getSnapshot();
+        } else {
+            return "";
+        }
+    }
+
+    @Override
+    public void finishRead() {
+        finished = true;
     }
 
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 8374617..4ec53ae 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -23,6 +23,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROU
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_OFFSET;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_RECORD_SPEED_LIMIT;
 
 import java.nio.charset.StandardCharsets;
@@ -89,6 +90,7 @@ public class KafkaReader<K, V> implements Reader {
     private String inlongStreamId;
     private String snapshot;
     private boolean isFinished = false;
+    private boolean destroyed = false;
 
     /**
      * init attribute
@@ -137,7 +139,7 @@ public class KafkaReader<K, V> implements Reader {
                 // commit offset
                 consumer.commitAsync();
                 // commit succeed,then record current offset
-                snapshot = String.valueOf(record.offset());
+                snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
                 DefaultMessage message = new DefaultMessage(recordValue.getBytes(StandardCharsets.UTF_8), headerMap);
                 recordReadLimit(1L, message.getBody().length);
                 return message;
@@ -190,8 +192,12 @@ public class KafkaReader<K, V> implements Reader {
 
     @Override
     public void destroy() {
-        isFinished = true;
-        consumer.close();
+        synchronized (this) {
+            if (!destroyed) {
+                consumer.close();
+                destroyed = true;
+            }
+        }
     }
 
     private void initReadTimeout(JobProfile jobConf) {
@@ -222,6 +228,11 @@ public class KafkaReader<K, V> implements Reader {
         return snapshot;
     }
 
+    @Override
+    public void finishRead() {
+        isFinished = true;
+    }
+
     private boolean fetchData(long fetchDataTimeout) {
         // cosume data
         ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(fetchDataTimeout));
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index fea6908..70b4328 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -163,6 +163,11 @@ public class SqlReader extends AbstractReader {
         return StringUtils.EMPTY;
     }
 
+    @Override
+    public void finishRead() {
+        destroy();
+    }
+
     /**
      * Init column meta data.
      *
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index d987d33..3223dc2 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -145,6 +145,11 @@ public class TextFileReader extends AbstractReader {
         return StringUtils.EMPTY;
     }
 
+    @Override
+    public void finishRead() {
+        destroy();
+    }
+
     public void addPatternValidator(String pattern) {
         if (pattern.isEmpty()) {
             return;