You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/22 11:53:50 UTC

[incubator-inlong] branch master updated: [INLONG-2545][Agent] agent pull task (binlog, file) from manager (#2547)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad1625  [INLONG-2545][Agent] agent pull task (binlog,file) from manager (#2547)
4ad1625 is described below

commit 4ad16253a48cf796095c0ae618d941ca0b8e6fa6
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Tue Feb 22 19:53:45 2022 +0800

    [INLONG-2545][Agent] agent pull task (binlog,file) from manager (#2547)
    
    * [Agent] agent pull task (binlog,file) from manager
    
    * [INLONG-2545][Agent] pull task from manager ;expand type of binlog
    
    Co-authored-by: jianchenglv <6>
---
 .../inlong/agent/constants/AgentConstants.java     |   2 +
 .../inlong/agent/constants/JobConstants.java       |  50 ++++--
 .../org/apache/inlong/agent/db/BerkeleyDbImp.java  |   1 +
 .../java/org/apache/inlong/agent/db/CommandDb.java |   1 +
 .../main/java/org/apache/inlong/agent/db/Db.java   |   2 +
 .../org/apache/inlong/agent/db/RocksDbImp.java     |   1 +
 .../apache/inlong/agent/db/TestBerkeleyDBImp.java  |   1 +
 .../agent/plugin/fetcher/ManagerFetcher.java       |  21 ++-
 .../dtos/{TaskResult.java => BinlogJob.java}       |  42 ++---
 .../fetcher/dtos/{DataConfig.java => FileJob.java} |  55 +++++--
 .../fetcher/dtos/{CmdConfig.java => Job.java}      |  15 +-
 .../agent/plugin/fetcher/dtos/JobProfileDto.java   | 172 ++++++++++++++-------
 .../dtos/{TaskResult.java => KafkaJob.java}        |  41 ++---
 .../agent/plugin/fetcher/dtos/TaskRequestDto.java  |   2 +-
 .../agent/plugin/fetcher/dtos/TaskResult.java      |   2 +
 .../TaskResult.java => utils/ExcuteLinux.java}     |  40 ++---
 .../apache/inlong/agent/plugin/TestFileAgent.java  |   3 +-
 .../inlong/agent/plugin/utils/TestUtils.java       |  52 ++++---
 .../src/test/resources/fileAgentJob.json           |  10 +-
 inlong-common/pom.xml                              |   6 +-
 .../apache/inlong/commons}/db/CommandEntity.java   |   2 +-
 .../org/apache/inlong/commons/dto}/CmdConfig.java  |   2 +-
 .../org/apache/inlong/commons/dto}/DataConfig.java |  16 +-
 .../apache/inlong/commons/dto}/TaskRequestDto.java |   5 +-
 .../inlong/commons}/enums/ManagerOpEnum.java       |   2 +-
 .../apache/inlong/commons/enums/TaskTypeEnum.java  |  36 ++---
 .../sdk/sort/impl/InLongPulsarFetcherImplTest.java |   2 +
 pom.xml                                            |  17 ++
 28 files changed, 383 insertions(+), 218 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
index 2c5bc81..7ace2c9 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
@@ -162,6 +162,8 @@ public class AgentConstants {
 
     public static final String AGENT_LOCAL_IP = "agent.local.ip";
 
+    public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
+
     public static final String PROMETHEUS_ENABLE = "agent.prometheus.enable";
     public static final boolean DEFAULT_PROMETHEUS_ENABLE = false;
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
index 7b092a5..eb6f5a7 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
@@ -29,27 +29,57 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_RETRY = "job.retry";
 
     public static final String JOB_SOURCE = "job.source";
+
     public static final String JOB_SINK = "job.sink";
     public static final String JOB_CHANNEL = "job.channel";
-    public static final String JOB_TRIGGER = "job.trigger";
     public static final String JOB_NAME = "job.name";
-    public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
+
     public static final String DEFAULT_JOB_NAME = "default";
     public static final String JOB_DESCRIPTION = "job.description";
     public static final String DEFAULT_JOB_DESCRIPTION = "default job description";
     public static final String DEFAULT_JOB_LINE_FILTER = "";
 
+    //File job
+    public static final String JOB_TRIGGER = "job.filejob.trigger";
+    public static final String JOB_LINE_FILTER_PATTERN = "job.filejob.dir.pattern";
+    public static final String JOB_DIR_FILTER_PATTERN = "job.filejob.dir.pattern";
+    public static final String JOB_FILE_TIME_OFFSET = "job.filejob.timeOffset";
+    public static final String JOB_FILE_MAX_WAIT = "job.filejob.file.max.wait";
+    public static final String JOB_ADDITION_STR = "job.filejob.additionStr";
+    public static final String JOB_CYCLE_UNIT = "job.filejob.cycleUnit";
+
+    public static final String JOB_DIR_FILTER_PATH = "job.filejob.dir.path";
+
+    //Binlog job
+    private static final String JOB_DATABASE_USER = "job.binlogjob.user";
+    private static final String JOB_DATABASE_PASSWORD = "job.binlogjob.password";
+    private static final String JOB_DATABASE_HOSTNAME = "job.binlogjob.hostname";
+    private static final String JOB_DATABASE_WHITELIST = "job.binlogjob.tableWhiteList";
+    private static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogjob.database.serverTimezone";
+    private static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "offset.binlogjob.offset.flush.interval.ms";
+    private static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogjob.database.history.file.filename";
+    private static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogjob.database.snapshot.mode";
+    private static final  String JOB_DATABASE_OFFSET = "job.binlogjob.database.offset";
+
+    //Kafka job
+    private static final  String SOURCE_KAFKA_TOPIC = "job.kafkajob.topic";
+    private static final  String SOURCE_KAFKA_KEY_DESERIALIZER = "job.kafkajob.key.deserializer";
+    private static final  String SOURCE_KAFKA_VALUE_DESERIALIZER = "job.kafkajob.value.Deserializer";
+    private static final  String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "job.kafkajob.bootstrap.servers";
+    private static final  String SOURCE_KAFKA_GROUP_ID = "job.kafkajob.group.Id";
+    private static final  String SOURCE_KAFKA_RECORD_SPEED = "job.kafkajob.record.speed";
+    private static final  String SOURCE_KAFKA_BYTE_SPEED_LIMIT = "job.kafkajob.byte.speed.limit";
+    private static final  String SOURCE_KAFKA_MIN_INTERVAL = "job.kafkajob.min.interval";
+    private static final  String SOURCE_KAFKA_OFFSET = "job.kafkajob.offset";
+    private static final  String SOURCE_KAFKA_READ_TIMEOUT = "job.kafkajob.read.timeout";
+
     // job type, delete/add
     public static final String JOB_TYPE = "job.type";
 
     public static final String JOB_CHECKPOINT = "job.checkpoint";
 
-    // offset for time
-    public static final String JOB_FILE_TIME_OFFSET = "job.timeOffset";
-
     public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
 
-    public static final String JOB_FILE_MAX_WAIT = "job.file.max.wait";
     // time in min
     public static final int DEFAULT_JOB_FILE_MAX_WAIT = 1;
 
@@ -57,10 +87,6 @@ public class JobConstants extends CommonConstants {
 
     public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 100;
 
-    public static final String JOB_DIR_FILTER_PATTERN = "job.dir.pattern";
-
-    public static final String JOB_DIR_FILTER_PATH = "job.dir.path";
-
     public static final String JOB_ID_PREFIX = "job_";
 
     public static final String SQL_JOB_ID = "sql_job_id";
@@ -74,16 +100,12 @@ public class JobConstants extends CommonConstants {
     // field splitter
     public static final String JOB_FIELD_SPLITTER = "job.splitter";
 
-    public static final String JOB_ADDITION_STR = "job.additionStr";
-
     // job delivery time
     public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
 
     // job time reading file
     public static final String JOB_DATA_TIME = "job.dataTime";
 
-    public static final String JOB_CYCLE_UNIT = "job.cycleUnit";
-
     /**
      * when job is retried, the retry time should be provided
      */
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
index c96d395..59df4ee 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.constants.AgentConstants;
 import org.apache.inlong.agent.constants.CommonConstants;
+import org.apache.inlong.commons.db.CommandEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
index e5a663e..7986669 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.db;
 
 import java.util.List;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.commons.db.CommandEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
index b8ef6dc..ca05e17 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.agent.db;
 
+import org.apache.inlong.commons.db.CommandEntity;
+
 import java.io.Closeable;
 import java.util.List;
 import javax.management.openmbean.KeyAlreadyExistsException;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index 8babb0e..5268a2f 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.constants.AgentConstants;
+import org.apache.inlong.commons.db.CommandEntity;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
index 26fb905..3e9b8f1 100755
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.db;
 import java.util.List;
 
 import org.apache.inlong.agent.AgentBaseTestsHelper;
+import org.apache.inlong.commons.db.CommandEntity;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 29355d7..e2452e4 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -27,6 +27,7 @@ import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_HOM
 import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE;
 import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT;
 import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
+import static org.apache.inlong.agent.constants.AgentConstants.AGENT_LOCAL_UUID;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_OP;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_RETRY_TIME;
 import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
@@ -73,15 +74,16 @@ import org.apache.inlong.agent.conf.ProfileFetcher;
 import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.db.CommandDb;
-import org.apache.inlong.agent.db.CommandEntity;
+import org.apache.inlong.commons.db.CommandEntity;
+import org.apache.inlong.commons.dto.CmdConfig;
+import org.apache.inlong.commons.dto.TaskRequestDto;
+import org.apache.inlong.commons.enums.ManagerOpEnum;
 import org.apache.inlong.agent.plugin.Trigger;
-import org.apache.inlong.agent.plugin.fetcher.dtos.CmdConfig;
 import org.apache.inlong.agent.plugin.fetcher.dtos.ConfirmAgentIpRequest;
 import org.apache.inlong.agent.plugin.fetcher.dtos.DbCollectorTaskRequestDto;
 import org.apache.inlong.agent.plugin.fetcher.dtos.DbCollectorTaskResult;
-import org.apache.inlong.agent.plugin.fetcher.dtos.TaskRequestDto;
 import org.apache.inlong.agent.plugin.fetcher.dtos.TaskResult;
-import org.apache.inlong.agent.plugin.fetcher.enums.ManagerOpEnum;
+import org.apache.inlong.agent.plugin.utils.ExcuteLinux;
 import org.apache.inlong.agent.plugin.utils.HttpManager;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -110,6 +112,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
     private final AgentManager agentManager;
     private final HttpManager httpManager;
     private String localIp;
+    private String uuid;
 
     private CommandDb commandDb;
 
@@ -289,6 +292,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
             List<CommandEntity> unackedCommands) {
         TaskRequestDto requset = new TaskRequestDto();
         requset.setAgentIp(localIp);
+        requset.setUuid(uuid);
         requset.setCommandInfo(unackedCommands);
         return requset;
     }
@@ -418,6 +422,14 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
     }
 
     /**
+     * check agent uuid from manager
+     */
+    private void fetchLocalUuid() {
+        String result = ExcuteLinux.exeCmd("dmidecode | grep UUID");
+        localIp = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID, result);
+    }
+
+    /**
      * confirm local ips from manager
      *
      * @param localIps
@@ -499,6 +511,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
     public void start() throws Exception {
         // when agent start, check local ip and fetch manager ip list;
         fetchLocalIp();
+        fetchLocalUuid();
         fetchTdmList(true, 0);
         submitWorker(profileFetchThread());
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/BinlogJob.java
similarity index 55%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/BinlogJob.java
index 5432f65..8e92fd0 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/BinlogJob.java
@@ -18,26 +18,32 @@
 package org.apache.inlong.agent.plugin.fetcher.dtos;
 
 import lombok.Data;
-import org.apache.inlong.agent.conf.TriggerProfile;
-
-import java.util.ArrayList;
-import java.util.List;
 
 @Data
-public class TaskResult {
+public class BinlogJob extends Job {
+
+    private  String user;
+    private  String password;
+    private  String hostname;
+    private  String whitelist;
+    private  String timeZone;
+    private  String intervalMs;
+    private  String storeHistoryFilename;
+    private  String snapshotMode;
+    private  String offset;
 
-    private List<CmdConfig> cmdConfigs;
-    private List<DataConfig> dataConfigs;
+    @Data
+    public static class BinlogJobTaskConfig {
 
-    public List<TriggerProfile> getTriggerProfiles() {
-        List<TriggerProfile> triggerProfiles = new ArrayList<>();
-        if (dataConfigs == null || dataConfigs.isEmpty()) {
-            return triggerProfiles;
-        }
-        dataConfigs.forEach(
-                dataConfig -> triggerProfiles.add(JobProfileDto
-                        .convertToTriggerProfile(dataConfig))
-        );
-        return triggerProfiles;
+        private  String user;
+        private  String password;
+        private  String hostname;
+        private  String whitelist;
+        private  String timeZone;
+        private  String intervalMs;
+        private  String storeHistoryFilename;
+        private  String snapshotMode;
+        private  String offset;
     }
-}
\ No newline at end of file
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/FileJob.java
similarity index 55%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/FileJob.java
index 8950199..6670f60 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/FileJob.java
@@ -20,24 +20,47 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
 import lombok.Data;
 
 @Data
-public class DataConfig {
-    private String additionalAttr;
-    private String inlongGroupId;
-    private String dataName;
-    private String inlongStreamId;
-    private String deliveryTime;
-    private String fieldSplitter;
+public class FileJob extends Job {
+
+    private String trigger;
+
+    private Dir dir;
+    private Thread thread;
+    private int id;
+    private String pattern;
     private String cycleUnit;
-    private String ip;
-    private String middlewareType;
-    private String mqMasterAddress;
-    private String op;
-    private String scheduleTime;
-    private Integer taskId;
     private String timeOffset;
-    private String topic;
+    private String addictiveString;
 
-    public boolean isValid() {
-        return true;
+    @Data
+    public static class Dir {
+
+        private String path;
+        private String pattern;
     }
+
+    @Data
+    public static class Running {
+
+        private String core;
+    }
+
+    @Data
+    public static class Thread {
+
+        private Running running;
+    }
+
+    @Data
+    public static class FileJobTaskConfig {
+
+        private String dataName;
+        private String path;
+        private int taskId;
+        private String pattern;
+        private String cycleUnit;
+        private String timeOffset;
+        private String additionalAttr;
+    }
+
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/Job.java
similarity index 82%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/Job.java
index 6cb4eca..8ebf364 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/Job.java
@@ -20,9 +20,14 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
 import lombok.Data;
 
 @Data
-public class CmdConfig {
-    private String dataTime;
-    private Integer id;
-    private Integer op;
-    private Integer taskId;
+public class Job {
+
+    private String deliveryTime;
+    private String op;
+
+    private String name;
+    private String source;
+    private String sink;
+    private String channel;
+
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index cbc1b25..cfaa4a4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin.fetcher.dtos;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.inlong.agent.plugin.fetcher.constants.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
 import static org.apache.inlong.agent.plugin.fetcher.constants.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
 
@@ -24,56 +25,36 @@ import com.google.gson.Gson;
 import lombok.Data;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.commons.dto.DataConfig;
+import org.apache.inlong.commons.enums.TaskTypeEnum;
 
 @Data
 public class JobProfileDto {
 
     private static final Gson GSON = new Gson();
+
     private Job job;
     private Proxy proxy;
 
     public static final String DEFAULT_TRIGGER = "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger";
+
     public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
-    public static final String DEFAULT_SOURCE = "org.apache.inlong.agent.plugin.sources.TextFileSource";
-
-    @Data
-    public static class Dir {
-
-        private String path;
-        private String pattern;
-    }
 
-    @Data
-    public static class Running {
-
-        private String core;
-    }
-
-    @Data
-    public static class Thread {
-
-        private Running running;
-    }
+    //filesource
+    public static final String DEFAULT_SOURCE = "org.apache.inlong.agent.plugin.sources.TextFileSource";
+    //binlogsource
+    public static final String BINLOG_SOURCE = "org.apache.inlong.agent.plugin.sources.BinlogSource";
+    //kafkasource
+    public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource";
 
     @Data
     public static class Job {
 
-        private Dir dir;
-        private String trigger;
-        private int id;
-        private Thread thread;
-        private String name;
-        private String source;
-        private String sink;
-        private String channel;
-        private String pattern;
-        private String op;
-        private String cycleUnit;
-        private String timeOffset;
-        private String deliveryTime;
-        private String addictiveString;
+        private FileJob fileJob;
+        private BinlogJob binlogJob;
+        private KafkaJob kafkaJob;
     }
 
     @Data
@@ -91,27 +72,91 @@ public class JobProfileDto {
         private Manager manager;
     }
 
-    private static Job getJob(DataConfig dataConfigs) {
-        Job job = new Job();
-        Dir dir = new Dir();
-        dir.setPattern(dataConfigs.getDataName());
-        job.setDir(dir);
-        job.setTrigger(DEFAULT_TRIGGER);
-        job.setChannel(DEFAULT_CHANNEL);
-        job.setName(MANAGER_JOB);
-        job.setSource(DEFAULT_SOURCE);
-        job.setSink(DEFAULT_DATAPROXY_SINK);
-        job.setId(dataConfigs.getTaskId());
-        job.setTimeOffset(dataConfigs.getTimeOffset());
-        job.setOp(dataConfigs.getOp());
-        job.setDeliveryTime(dataConfigs.getDeliveryTime());
-        if (!dataConfigs.getAdditionalAttr().isEmpty()) {
-            job.setAddictiveString(dataConfigs.getAdditionalAttr());
+    private static BinlogJob getBinlogJob(DataConfig dataConfigs) {
+
+        BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = new BinlogJob.BinlogJobTaskConfig();
+        Gson gson = new Gson();
+        binlogJobTaskConfig = gson.fromJson(dataConfigs.getTaskConfig(), BinlogJob.BinlogJobTaskConfig.class);
+        BinlogJob binlogJob = new BinlogJob();
+
+        binlogJob.setHostname(binlogJobTaskConfig.getHostname());
+        binlogJob.setPassword(binlogJobTaskConfig.getPassword());
+        binlogJob.setTimeZone(binlogJobTaskConfig.getTimeZone());
+        binlogJob.setSnapshotMode(binlogJobTaskConfig.getSnapshotMode());
+        binlogJob.setUser(binlogJobTaskConfig.getUser());
+        binlogJob.setStoreHistoryFilename(binlogJobTaskConfig.getStoreHistoryFilename());
+        binlogJob.setIntervalMs(binlogJobTaskConfig.getIntervalMs());
+        binlogJob.setSnapshotMode(binlogJobTaskConfig.getSnapshotMode());
+        binlogJob.setOffset(binlogJobTaskConfig.getOffset());
+
+        binlogJob.setChannel(DEFAULT_CHANNEL);
+        binlogJob.setName(MANAGER_JOB);
+        binlogJob.setSource(BINLOG_SOURCE);
+        binlogJob.setSink(DEFAULT_DATAPROXY_SINK);
+        binlogJob.setDeliveryTime(dataConfigs.getDeliveryTime());
+        binlogJob.setOp(dataConfigs.getOp());
+
+        return binlogJob;
+    }
+
+    private static FileJob getFileJob(DataConfig dataConfigs) {
+
+        FileJob fileJob = new FileJob();
+        fileJob.setTrigger(DEFAULT_TRIGGER);
+        fileJob.setChannel(DEFAULT_CHANNEL);
+        fileJob.setName(MANAGER_JOB);
+        fileJob.setSource(DEFAULT_SOURCE);
+        fileJob.setSink(DEFAULT_DATAPROXY_SINK);
+
+        FileJob.FileJobTaskConfig fileJobTaskConfig = new FileJob.FileJobTaskConfig();
+        Gson gson = new Gson();
+        fileJobTaskConfig = gson.fromJson(dataConfigs.getTaskConfig(), FileJob.FileJobTaskConfig.class);
+
+        FileJob.Dir dir = new FileJob.Dir();
+        dir.setPattern(fileJobTaskConfig.getDataName());
+        dir.setPath(fileJobTaskConfig.getPath());
+        fileJob.setDir(dir);
+
+        fileJob.setId(fileJobTaskConfig.getTaskId());
+        fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
+
+        if (!fileJobTaskConfig.getAdditionalAttr().isEmpty()) {
+            fileJob.setAddictiveString(fileJobTaskConfig.getAdditionalAttr());
         }
-        if (dataConfigs.getCycleUnit() != null) {
-            job.setCycleUnit(dataConfigs.getCycleUnit());
+        if (fileJobTaskConfig.getCycleUnit() != null) {
+            fileJob.setCycleUnit(fileJobTaskConfig.getCycleUnit());
         }
-        return job;
+        fileJob.setDeliveryTime(dataConfigs.getDeliveryTime());
+        fileJob.setOp(dataConfigs.getOp());
+
+        return fileJob;
+    }
+
+    private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
+
+        KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = new KafkaJob.KafkaJobTaskConfig();
+        Gson gson = new Gson();
+        kafkaJobTaskConfig = gson.fromJson(dataConfigs.getTaskConfig(), KafkaJob.KafkaJobTaskConfig.class);
+        KafkaJob kafkaJob = new KafkaJob();
+
+        kafkaJob.setTopic(kafkaJobTaskConfig.getTopic());
+        kafkaJob.setKeyDeserializer(kafkaJobTaskConfig.getValueDeserializer());
+        kafkaJob.setValueDeserializer(kafkaJobTaskConfig.getKeyDeserializer());
+        kafkaJob.setBootstrapServers(kafkaJobTaskConfig.getBootstrapServers());
+        kafkaJob.setGroupId(kafkaJobTaskConfig.getGroupId());
+        kafkaJob.setRecordSpeed(kafkaJobTaskConfig.getRecordSpeed());
+        kafkaJob.setByteSpeedLimit(kafkaJobTaskConfig.getByteSpeedLimit());
+        kafkaJob.setMinInterval(kafkaJobTaskConfig.getMinInterval());
+        kafkaJob.setOffset(kafkaJobTaskConfig.getOffset());
+
+        kafkaJob.setChannel(DEFAULT_CHANNEL);
+        kafkaJob.setName(MANAGER_JOB);
+        kafkaJob.setSource(KAFKA_SOURCE);
+        kafkaJob.setSink(DEFAULT_DATAPROXY_SINK);
+        kafkaJob.setDeliveryTime(dataConfigs.getDeliveryTime());
+        kafkaJob.setOp(dataConfigs.getOp());
+
+        return kafkaJob;
     }
 
     private static Proxy getProxy(DataConfig dataConfigs) {
@@ -130,11 +175,30 @@ public class JobProfileDto {
         if (!dataConfigs.isValid()) {
             throw new IllegalArgumentException("input dataConfig" + dataConfigs + "is invalid please check");
         }
+        TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
         JobProfileDto profileDto = new JobProfileDto();
         Proxy proxy = getProxy(dataConfigs);
-        Job job = getJob(dataConfigs);
         profileDto.setProxy(proxy);
-        profileDto.setJob(job);
+        Job job = new Job();
+        switch (requireNonNull(taskType)) {
+            case SQL:
+            case BINLOG:
+                BinlogJob binlogJob = getBinlogJob(dataConfigs);
+                job.setBinlogJob(binlogJob);
+                profileDto.setJob(job);
+                break;
+            case FILE:
+                FileJob fileJob = getFileJob(dataConfigs);
+                job.setFileJob(fileJob);
+                profileDto.setJob(job);
+                break;
+            case KAFKA:
+                KafkaJob kafkaJob = getKafkaJob(dataConfigs);
+                job.setKafkaJob(kafkaJob);
+                profileDto.setJob(job);
+                break;
+            default:
+        }
         return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
     }
 }
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/KafkaJob.java
similarity index 54%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/KafkaJob.java
index 5432f65..174a497 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/KafkaJob.java
@@ -18,26 +18,31 @@
 package org.apache.inlong.agent.plugin.fetcher.dtos;
 
 import lombok.Data;
-import org.apache.inlong.agent.conf.TriggerProfile;
-
-import java.util.ArrayList;
-import java.util.List;
 
 @Data
-public class TaskResult {
+public class KafkaJob extends Job {
+
+    private  String topic;
+    private  String keyDeserializer;
+    private  String valueDeserializer;
+    private  String bootstrapServers;
+    private  String groupId;
+    private  String recordSpeed;
+    private  String byteSpeedLimit;
+    private  String minInterval;
+    private  String offset;
 
-    private List<CmdConfig> cmdConfigs;
-    private List<DataConfig> dataConfigs;
+    @Data
+    public static class KafkaJobTaskConfig {
 
-    public List<TriggerProfile> getTriggerProfiles() {
-        List<TriggerProfile> triggerProfiles = new ArrayList<>();
-        if (dataConfigs == null || dataConfigs.isEmpty()) {
-            return triggerProfiles;
-        }
-        dataConfigs.forEach(
-                dataConfig -> triggerProfiles.add(JobProfileDto
-                        .convertToTriggerProfile(dataConfig))
-        );
-        return triggerProfiles;
+        private  String topic;
+        private  String keyDeserializer;
+        private  String valueDeserializer;
+        private  String bootstrapServers;
+        private  String groupId;
+        private  String recordSpeed;
+        private  String byteSpeedLimit;
+        private  String minInterval;
+        private  String offset;
     }
-}
\ No newline at end of file
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
index 1c3105e..3fd8d5e 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
 import java.util.ArrayList;
 import java.util.List;
 import lombok.Data;
-import org.apache.inlong.agent.db.CommandEntity;
+import org.apache.inlong.commons.db.CommandEntity;
 
 @Data
 public class TaskRequestDto {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
index 5432f65..eb9bcf1 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
@@ -19,6 +19,8 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
 
 import lombok.Data;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.commons.dto.CmdConfig;
+import org.apache.inlong.commons.dto.DataConfig;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/ExcuteLinux.java
similarity index 51%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/ExcuteLinux.java
index 5432f65..cb12dea 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/ExcuteLinux.java
@@ -15,29 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.agent.plugin.utils;
 
-import lombok.Data;
-import org.apache.inlong.agent.conf.TriggerProfile;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 
-import java.util.ArrayList;
-import java.util.List;
+public class ExcuteLinux {
 
-@Data
-public class TaskResult {
+    public static String exeCmd(String commandStr) {
 
-    private List<CmdConfig> cmdConfigs;
-    private List<DataConfig> dataConfigs;
+        String result = null;
+        try {
+            String[] cmd = new String[]{"/bin/sh", "-c",commandStr};
+            Process ps = Runtime.getRuntime().exec(cmd);
 
-    public List<TriggerProfile> getTriggerProfiles() {
-        List<TriggerProfile> triggerProfiles = new ArrayList<>();
-        if (dataConfigs == null || dataConfigs.isEmpty()) {
-            return triggerProfiles;
+            BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
+            StringBuffer sb = new StringBuffer();
+            String line;
+            while ((line = br.readLine()) != null) {
+                sb.append(line).append("\n");
+            }
+            result = sb.toString();
+
+        } catch (Exception e) {
+            e.printStackTrace();
         }
-        dataConfigs.forEach(
-                dataConfig -> triggerProfiles.add(JobProfileDto
-                        .convertToTriggerProfile(dataConfig))
-        );
-        return triggerProfiles;
+
+        return result;
+
     }
 }
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index c548f5e..971775d 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -120,7 +120,8 @@ public class TestFileAgent {
 
     @Test
     public void testOneJobOnly() throws Exception {
-        TriggerProfile triggerProfile = TriggerProfile.parseJsonStr(TestUtils.getTestTriggerProfile());
+        String jsonString = TestUtils.getTestTriggerProfile();
+        TriggerProfile triggerProfile = TriggerProfile.parseJsonStr(jsonString);
         triggerProfile.set(JOB_DIR_FILTER_PATTERN, helper.getParentPath() + triggerProfile.get(JOB_DIR_FILTER_PATTERN));
         triggerProfile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
             "test[0-9].dat").toString());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index bbafd0f..db0d600 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -29,31 +29,33 @@ public class TestUtils {
 
     public static String getTestTriggerProfile() {
         return "{\n"
-            + "  \"job\": {\n"
-            + "    \"dir\": {\n"
-            + "      \"path\": \"\",\n"
-            + "      \"pattern\": \"/AgentBaseTestsHelper/"
-            + "org.apache.tubemq.inlong.plugin.fetcher.TestTdmFetcher/test[0-9].dat\"\n"
-            + "    },\n"
-            + "    \"trigger\": \"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger\",\n"
-            + "    \"id\": 1,\n"
-            + "    \"thread\" : {\n"
-            + "\"running\": {\n"
-            + "\"core\": \"4\"\n"
-            + "}\n"
-            + "},  \n"
-            + "    \"op\": \"0\",\n"
-            + "    \"ip\": \"127.0.0.1\",\n"
-            + "    \"name\": \"fileAgentTest\",\n"
-            + "    \"source\": \"org.apache.inlong.agent.plugin.sources.TextFileSource\",\n"
-            + "    \"sink\": \"org.apache.inlong.agent.plugin.sinks.MockSink\",\n"
-            + "    \"channel\": \"org.apache.inlong.agent.plugin.channel.MemoryChannel\",\n"
-            + "    \"standalone\": true,\n"
-            + "    \"additionStr\": \"m=15&file=test\",\n"
-            + "    \"deliveryTime\": \"1231313\",\n"
-            + "    \"splitter\": \"&\"\n"
-            + "  }\n"
-            + "  }";
+                + "  \"job\": {\n"
+                + "    \"filejob\": {\n"
+                + "      \"additionStr\": \"m=15&file=test\",\n"
+                + "      \"trigger\": \"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger\",\n"
+                + "      \"dir\": {\n"
+                + "        \"path\": \"\",\n"
+                + "        \"pattern\": \"/AgentBaseTestsHelper/"
+                + "org.apache.tubemq.inlong.plugin.fetcher.TestTdmFetcher/test[0-9].dat\"\n"
+                + "      },\n"
+                + "      \"thread\" : {\n"
+                + "\"running\": {\n"
+                + "\"core\": \"4\"\n"
+                + "}\n"
+                + "} \n"
+                + "    },\n"
+                + "    \"id\": 1,\n"
+                + "    \"op\": \"0\",\n"
+                + "    \"ip\": \"127.0.0.1\",\n"
+                + "    \"name\": \"fileAgentTest\",\n"
+                + "    \"source\": \"org.apache.inlong.agent.plugin.sources.TextFileSource\",\n"
+                + "    \"sink\": \"org.apache.inlong.agent.plugin.sinks.MockSink\",\n"
+                + "    \"channel\": \"org.apache.inlong.agent.plugin.channel.MemoryChannel\",\n"
+                + "    \"standalone\": true,\n"
+                + "    \"deliveryTime\": \"1231313\",\n"
+                + "    \"splitter\": \"&\"\n"
+                + "  }\n"
+                + "  }";
     }
 
     public static void createHugeFiles(String fileName, String rootDir, String record) throws Exception {
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
index 33f1b07..8a21ba4 100755
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
+++ b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
@@ -5,10 +5,12 @@
         "wait": 1
       }
     },
-    "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger",
-    "dir": {
-      "path": "",
-      "pattern": "/test.[0-9]"
+    "filejob": {
+      "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger",
+      "dir": {
+        "path": "",
+        "pattern": "/test.[0-9]"
+      }
     },
     "id": 1,
     "name": "fileAgentTest",
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index d014f95..33c5eee 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -63,7 +63,11 @@
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
-            <version>1.18.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.sleepycat</groupId>
+            <artifactId>je</artifactId>
             <scope>compile</scope>
         </dependency>
     </dependencies>
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandEntity.java b/inlong-common/src/main/java/org/apache/inlong/commons/db/CommandEntity.java
similarity index 97%
rename from inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandEntity.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/db/CommandEntity.java
index eae9f87..35bb877 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandEntity.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/db/CommandEntity.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.commons.db;
 
 import com.sleepycat.persist.model.Entity;
 import com.sleepycat.persist.model.PrimaryKey;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java b/inlong-common/src/main/java/org/apache/inlong/commons/dto/CmdConfig.java
similarity index 94%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/dto/CmdConfig.java
index 6cb4eca..eab12df 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/dto/CmdConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.commons.dto;
 
 import lombok.Data;
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/commons/dto/DataConfig.java
similarity index 75%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/dto/DataConfig.java
index 8950199..ea0704a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/dto/DataConfig.java
@@ -15,29 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.commons.dto;
 
 import lombok.Data;
 
 @Data
 public class DataConfig {
-    private String additionalAttr;
     private String inlongGroupId;
-    private String dataName;
     private String inlongStreamId;
     private String deliveryTime;
-    private String fieldSplitter;
-    private String cycleUnit;
+    private String uuid;
     private String ip;
-    private String middlewareType;
-    private String mqMasterAddress;
     private String op;
-    private String scheduleTime;
     private Integer taskId;
-    private String timeOffset;
-    private String topic;
+    private Integer taskType;
+    private String taskConfig;
 
     public boolean isValid() {
         return true;
     }
-}
+}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java b/inlong-common/src/main/java/org/apache/inlong/commons/dto/TaskRequestDto.java
similarity index 89%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
copy to inlong-common/src/main/java/org/apache/inlong/commons/dto/TaskRequestDto.java
index 1c3105e..bb3ce75 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/dto/TaskRequestDto.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.commons.dto;
 
 import java.util.ArrayList;
 import java.util.List;
 import lombok.Data;
-import org.apache.inlong.agent.db.CommandEntity;
+import org.apache.inlong.commons.db.CommandEntity;
 
 @Data
 public class TaskRequestDto {
     private String agentIp;
+    private String uuid;
     private List<CommandEntity> commandInfo = new ArrayList<>();
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/commons/enums/ManagerOpEnum.java
similarity index 97%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
copy to inlong-common/src/main/java/org/apache/inlong/commons/enums/ManagerOpEnum.java
index 799c474..1501404 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/enums/ManagerOpEnum.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.fetcher.enums;
+package org.apache.inlong.commons.enums;
 
 import static java.util.Objects.requireNonNull;
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/commons/enums/TaskTypeEnum.java
similarity index 57%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/enums/TaskTypeEnum.java
index 799c474..e43e5c3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/enums/TaskTypeEnum.java
@@ -15,46 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.fetcher.enums;
+package org.apache.inlong.commons.enums;
 
 import static java.util.Objects.requireNonNull;
 
-public enum ManagerOpEnum {
-    ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4), ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
+public enum TaskTypeEnum {
+    SQL(1), BINLOG(2), FILE(3), KAFKA(4);
 
     private int type;
 
-    ManagerOpEnum(int type) {
+    TaskTypeEnum(int type) {
         this.type = type;
     }
 
-    public static ManagerOpEnum getOpType(int opType) {
-        requireNonNull(opType);
-        switch (opType) {
-            case 0:
-                return ADD;
+    public static TaskTypeEnum getTaskType(int taskType) {
+        requireNonNull(taskType);
+        switch (taskType) {
             case 1:
-                return DEL;
+                return SQL;
             case 2:
-                return RETRY;
+                return BINLOG;
             case 3:
-                return BACKTRACK;
+                return FILE;
             case 4:
-                return FROZEN;
-            case 5:
-                return ACTIVE;
-            case 6:
-                return CHECK;
-            case 7:
-                return REDOMETRIC;
-            case 8:
-                return MAKEUP;
+                return KAFKA;
             default:
-                throw new RuntimeException("such op type doesn't exist");
+                throw new RuntimeException("such task type doesn't exist");
         }
     }
 
     public int getType() {
         return type;
     }
-}
+}
\ No newline at end of file
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
index ca17f58..746f2af 100644
--- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
@@ -41,9 +41,11 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+@PowerMockIgnore("javax.management.*")
 @RunWith(PowerMockRunner.class)
 public class InLongPulsarFetcherImplTest {
 
diff --git a/pom.xml b/pom.xml
index f73742d..d895699 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,7 @@
         <docker.organization>inlong</docker.organization>
         <lombok.version>1.18.22</lombok.version>
         <logback.version>1.2.10</logback.version>
+        <je.version>7.3.7</je.version>
     </properties>
 
     <dependencyManagement>
@@ -132,6 +133,11 @@
                 <artifactId>logback-core</artifactId>
                 <version>${logback.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.sleepycat</groupId>
+                <artifactId>je</artifactId>
+                <version>${je.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -293,6 +299,17 @@
     </build>
 
     <repositories>
+        <repository>
+            <id>berkeleydb-je</id>
+            <name>berkeleydb-je</name>
+            <url>https://download.dcache.org/nexus/repository/berkeleydb-je/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
     </repositories>
 
 </project>