You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/22 11:53:50 UTC
[incubator-inlong] branch master updated: [INLONG-2545][Agent] agent pull task (binlog, file) from manager (#2547)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad1625 [INLONG-2545][Agent] agent pull task (binlog,file) from manager (#2547)
4ad1625 is described below
commit 4ad16253a48cf796095c0ae618d941ca0b8e6fa6
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Tue Feb 22 19:53:45 2022 +0800
[INLONG-2545][Agent] agent pull task (binlog,file) from manager (#2547)
* [Agent] agent pull task (binlog,file) from manager
* [INLONG-2545][Agent] pull task from manager ;expand type of binlog
Co-authored-by: jianchenglv <6>
---
.../inlong/agent/constants/AgentConstants.java | 2 +
.../inlong/agent/constants/JobConstants.java | 50 ++++--
.../org/apache/inlong/agent/db/BerkeleyDbImp.java | 1 +
.../java/org/apache/inlong/agent/db/CommandDb.java | 1 +
.../main/java/org/apache/inlong/agent/db/Db.java | 2 +
.../org/apache/inlong/agent/db/RocksDbImp.java | 1 +
.../apache/inlong/agent/db/TestBerkeleyDBImp.java | 1 +
.../agent/plugin/fetcher/ManagerFetcher.java | 21 ++-
.../dtos/{TaskResult.java => BinlogJob.java} | 42 ++---
.../fetcher/dtos/{DataConfig.java => FileJob.java} | 55 +++++--
.../fetcher/dtos/{CmdConfig.java => Job.java} | 15 +-
.../agent/plugin/fetcher/dtos/JobProfileDto.java | 172 ++++++++++++++-------
.../dtos/{TaskResult.java => KafkaJob.java} | 41 ++---
.../agent/plugin/fetcher/dtos/TaskRequestDto.java | 2 +-
.../agent/plugin/fetcher/dtos/TaskResult.java | 2 +
.../TaskResult.java => utils/ExcuteLinux.java} | 40 ++---
.../apache/inlong/agent/plugin/TestFileAgent.java | 3 +-
.../inlong/agent/plugin/utils/TestUtils.java | 52 ++++---
.../src/test/resources/fileAgentJob.json | 10 +-
inlong-common/pom.xml | 6 +-
.../apache/inlong/commons}/db/CommandEntity.java | 2 +-
.../org/apache/inlong/commons/dto}/CmdConfig.java | 2 +-
.../org/apache/inlong/commons/dto}/DataConfig.java | 16 +-
.../apache/inlong/commons/dto}/TaskRequestDto.java | 5 +-
.../inlong/commons}/enums/ManagerOpEnum.java | 2 +-
.../apache/inlong/commons/enums/TaskTypeEnum.java | 36 ++---
.../sdk/sort/impl/InLongPulsarFetcherImplTest.java | 2 +
pom.xml | 17 ++
28 files changed, 383 insertions(+), 218 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
index 2c5bc81..7ace2c9 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
@@ -162,6 +162,8 @@ public class AgentConstants {
public static final String AGENT_LOCAL_IP = "agent.local.ip";
+ public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
+
public static final String PROMETHEUS_ENABLE = "agent.prometheus.enable";
public static final boolean DEFAULT_PROMETHEUS_ENABLE = false;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
index 7b092a5..eb6f5a7 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
@@ -29,27 +29,57 @@ public class JobConstants extends CommonConstants {
public static final String JOB_RETRY = "job.retry";
public static final String JOB_SOURCE = "job.source";
+
public static final String JOB_SINK = "job.sink";
public static final String JOB_CHANNEL = "job.channel";
- public static final String JOB_TRIGGER = "job.trigger";
public static final String JOB_NAME = "job.name";
- public static final String JOB_LINE_FILTER_PATTERN = "job.pattern";
+
public static final String DEFAULT_JOB_NAME = "default";
public static final String JOB_DESCRIPTION = "job.description";
public static final String DEFAULT_JOB_DESCRIPTION = "default job description";
public static final String DEFAULT_JOB_LINE_FILTER = "";
+ //File job
+ public static final String JOB_TRIGGER = "job.filejob.trigger";
+ public static final String JOB_LINE_FILTER_PATTERN = "job.filejob.dir.pattern";
+ public static final String JOB_DIR_FILTER_PATTERN = "job.filejob.dir.pattern";
+ public static final String JOB_FILE_TIME_OFFSET = "job.filejob.timeOffset";
+ public static final String JOB_FILE_MAX_WAIT = "job.filejob.file.max.wait";
+ public static final String JOB_ADDITION_STR = "job.filejob.additionStr";
+ public static final String JOB_CYCLE_UNIT = "job.filejob.cycleUnit";
+
+ public static final String JOB_DIR_FILTER_PATH = "job.filejob.dir.path";
+
+ //Binlog job
+ private static final String JOB_DATABASE_USER = "job.binlogjob.user";
+ private static final String JOB_DATABASE_PASSWORD = "job.binlogjob.password";
+ private static final String JOB_DATABASE_HOSTNAME = "job.binlogjob.hostname";
+ private static final String JOB_DATABASE_WHITELIST = "job.binlogjob.tableWhiteList";
+ private static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogjob.database.serverTimezone";
+ private static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "offset.binlogjob.offset.flush.interval.ms";
+ private static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogjob.database.history.file.filename";
+ private static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogjob.database.snapshot.mode";
+ private static final String JOB_DATABASE_OFFSET = "job.binlogjob.database.offset";
+
+ //Kafka job
+ private static final String SOURCE_KAFKA_TOPIC = "job.kafkajob.topic";
+ private static final String SOURCE_KAFKA_KEY_DESERIALIZER = "job.kafkajob.key.deserializer";
+ private static final String SOURCE_KAFKA_VALUE_DESERIALIZER = "job.kafkajob.value.Deserializer";
+ private static final String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "job.kafkajob.bootstrap.servers";
+ private static final String SOURCE_KAFKA_GROUP_ID = "job.kafkajob.group.Id";
+ private static final String SOURCE_KAFKA_RECORD_SPEED = "job.kafkajob.record.speed";
+ private static final String SOURCE_KAFKA_BYTE_SPEED_LIMIT = "job.kafkajob.byte.speed.limit";
+ private static final String SOURCE_KAFKA_MIN_INTERVAL = "job.kafkajob.min.interval";
+ private static final String SOURCE_KAFKA_OFFSET = "job.kafkajob.offset";
+ private static final String SOURCE_KAFKA_READ_TIMEOUT = "job.kafkajob.read.timeout";
+
// job type, delete/add
public static final String JOB_TYPE = "job.type";
public static final String JOB_CHECKPOINT = "job.checkpoint";
- // offset for time
- public static final String JOB_FILE_TIME_OFFSET = "job.timeOffset";
-
public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
- public static final String JOB_FILE_MAX_WAIT = "job.file.max.wait";
// time in min
public static final int DEFAULT_JOB_FILE_MAX_WAIT = 1;
@@ -57,10 +87,6 @@ public class JobConstants extends CommonConstants {
public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 100;
- public static final String JOB_DIR_FILTER_PATTERN = "job.dir.pattern";
-
- public static final String JOB_DIR_FILTER_PATH = "job.dir.path";
-
public static final String JOB_ID_PREFIX = "job_";
public static final String SQL_JOB_ID = "sql_job_id";
@@ -74,16 +100,12 @@ public class JobConstants extends CommonConstants {
// field splitter
public static final String JOB_FIELD_SPLITTER = "job.splitter";
- public static final String JOB_ADDITION_STR = "job.additionStr";
-
// job delivery time
public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
// job time reading file
public static final String JOB_DATA_TIME = "job.dataTime";
- public static final String JOB_CYCLE_UNIT = "job.cycleUnit";
-
/**
* when job is retried, the retry time should be provided
*/
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
index c96d395..59df4ee 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/BerkeleyDbImp.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constants.AgentConstants;
import org.apache.inlong.agent.constants.CommonConstants;
+import org.apache.inlong.commons.db.CommandEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
index e5a663e..7986669 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.db;
import java.util.List;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.commons.db.CommandEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
index b8ef6dc..ca05e17 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.db;
+import org.apache.inlong.commons.db.CommandEntity;
+
import java.io.Closeable;
import java.util.List;
import javax.management.openmbean.KeyAlreadyExistsException;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index 8babb0e..5268a2f 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constants.AgentConstants;
+import org.apache.inlong.commons.db.CommandEntity;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
index 26fb905..3e9b8f1 100755
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.db;
import java.util.List;
import org.apache.inlong.agent.AgentBaseTestsHelper;
+import org.apache.inlong.commons.db.CommandEntity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 29355d7..e2452e4 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -27,6 +27,7 @@ import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_HOM
import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE;
import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT;
import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
+import static org.apache.inlong.agent.constants.AgentConstants.AGENT_LOCAL_UUID;
import static org.apache.inlong.agent.constants.JobConstants.JOB_OP;
import static org.apache.inlong.agent.constants.JobConstants.JOB_RETRY_TIME;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
@@ -73,15 +74,16 @@ import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.CommandDb;
-import org.apache.inlong.agent.db.CommandEntity;
+import org.apache.inlong.commons.db.CommandEntity;
+import org.apache.inlong.commons.dto.CmdConfig;
+import org.apache.inlong.commons.dto.TaskRequestDto;
+import org.apache.inlong.commons.enums.ManagerOpEnum;
import org.apache.inlong.agent.plugin.Trigger;
-import org.apache.inlong.agent.plugin.fetcher.dtos.CmdConfig;
import org.apache.inlong.agent.plugin.fetcher.dtos.ConfirmAgentIpRequest;
import org.apache.inlong.agent.plugin.fetcher.dtos.DbCollectorTaskRequestDto;
import org.apache.inlong.agent.plugin.fetcher.dtos.DbCollectorTaskResult;
-import org.apache.inlong.agent.plugin.fetcher.dtos.TaskRequestDto;
import org.apache.inlong.agent.plugin.fetcher.dtos.TaskResult;
-import org.apache.inlong.agent.plugin.fetcher.enums.ManagerOpEnum;
+import org.apache.inlong.agent.plugin.utils.ExcuteLinux;
import org.apache.inlong.agent.plugin.utils.HttpManager;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -110,6 +112,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
private final AgentManager agentManager;
private final HttpManager httpManager;
private String localIp;
+ private String uuid;
private CommandDb commandDb;
@@ -289,6 +292,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
List<CommandEntity> unackedCommands) {
TaskRequestDto requset = new TaskRequestDto();
requset.setAgentIp(localIp);
+ requset.setUuid(uuid);
requset.setCommandInfo(unackedCommands);
return requset;
}
@@ -418,6 +422,14 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
}
/**
+ * check agent uuid from manager
+ */
+ private void fetchLocalUuid() {
+ String result = ExcuteLinux.exeCmd("dmidecode | grep UUID");
+ localIp = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID, result);
+ }
+
+ /**
* confirm local ips from manager
*
* @param localIps
@@ -499,6 +511,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
public void start() throws Exception {
// when agent start, check local ip and fetch manager ip list;
fetchLocalIp();
+ fetchLocalUuid();
fetchTdmList(true, 0);
submitWorker(profileFetchThread());
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/BinlogJob.java
similarity index 55%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/BinlogJob.java
index 5432f65..8e92fd0 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/BinlogJob.java
@@ -18,26 +18,32 @@
package org.apache.inlong.agent.plugin.fetcher.dtos;
import lombok.Data;
-import org.apache.inlong.agent.conf.TriggerProfile;
-
-import java.util.ArrayList;
-import java.util.List;
@Data
-public class TaskResult {
+public class BinlogJob extends Job {
+
+ private String user;
+ private String password;
+ private String hostname;
+ private String whitelist;
+ private String timeZone;
+ private String intervalMs;
+ private String storeHistoryFilename;
+ private String snapshotMode;
+ private String offset;
- private List<CmdConfig> cmdConfigs;
- private List<DataConfig> dataConfigs;
+ @Data
+ public static class BinlogJobTaskConfig {
- public List<TriggerProfile> getTriggerProfiles() {
- List<TriggerProfile> triggerProfiles = new ArrayList<>();
- if (dataConfigs == null || dataConfigs.isEmpty()) {
- return triggerProfiles;
- }
- dataConfigs.forEach(
- dataConfig -> triggerProfiles.add(JobProfileDto
- .convertToTriggerProfile(dataConfig))
- );
- return triggerProfiles;
+ private String user;
+ private String password;
+ private String hostname;
+ private String whitelist;
+ private String timeZone;
+ private String intervalMs;
+ private String storeHistoryFilename;
+ private String snapshotMode;
+ private String offset;
}
-}
\ No newline at end of file
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/FileJob.java
similarity index 55%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/FileJob.java
index 8950199..6670f60 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/FileJob.java
@@ -20,24 +20,47 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
import lombok.Data;
@Data
-public class DataConfig {
- private String additionalAttr;
- private String inlongGroupId;
- private String dataName;
- private String inlongStreamId;
- private String deliveryTime;
- private String fieldSplitter;
+public class FileJob extends Job {
+
+ private String trigger;
+
+ private Dir dir;
+ private Thread thread;
+ private int id;
+ private String pattern;
private String cycleUnit;
- private String ip;
- private String middlewareType;
- private String mqMasterAddress;
- private String op;
- private String scheduleTime;
- private Integer taskId;
private String timeOffset;
- private String topic;
+ private String addictiveString;
- public boolean isValid() {
- return true;
+ @Data
+ public static class Dir {
+
+ private String path;
+ private String pattern;
}
+
+ @Data
+ public static class Running {
+
+ private String core;
+ }
+
+ @Data
+ public static class Thread {
+
+ private Running running;
+ }
+
+ @Data
+ public static class FileJobTaskConfig {
+
+ private String dataName;
+ private String path;
+ private int taskId;
+ private String pattern;
+ private String cycleUnit;
+ private String timeOffset;
+ private String additionalAttr;
+ }
+
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/Job.java
similarity index 82%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/Job.java
index 6cb4eca..8ebf364 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/Job.java
@@ -20,9 +20,14 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
import lombok.Data;
@Data
-public class CmdConfig {
- private String dataTime;
- private Integer id;
- private Integer op;
- private Integer taskId;
+public class Job {
+
+ private String deliveryTime;
+ private String op;
+
+ private String name;
+ private String source;
+ private String sink;
+ private String channel;
+
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index cbc1b25..cfaa4a4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.fetcher.dtos;
+import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.plugin.fetcher.constants.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static org.apache.inlong.agent.plugin.fetcher.constants.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
@@ -24,56 +25,36 @@ import com.google.gson.Gson;
import lombok.Data;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.commons.dto.DataConfig;
+import org.apache.inlong.commons.enums.TaskTypeEnum;
@Data
public class JobProfileDto {
private static final Gson GSON = new Gson();
+
private Job job;
private Proxy proxy;
public static final String DEFAULT_TRIGGER = "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger";
+
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
- public static final String DEFAULT_SOURCE = "org.apache.inlong.agent.plugin.sources.TextFileSource";
-
- @Data
- public static class Dir {
-
- private String path;
- private String pattern;
- }
- @Data
- public static class Running {
-
- private String core;
- }
-
- @Data
- public static class Thread {
-
- private Running running;
- }
+ //filesource
+ public static final String DEFAULT_SOURCE = "org.apache.inlong.agent.plugin.sources.TextFileSource";
+ //binlogsource
+ public static final String BINLOG_SOURCE = "org.apache.inlong.agent.plugin.sources.BinlogSource";
+ //kafkasource
+ public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource";
@Data
public static class Job {
- private Dir dir;
- private String trigger;
- private int id;
- private Thread thread;
- private String name;
- private String source;
- private String sink;
- private String channel;
- private String pattern;
- private String op;
- private String cycleUnit;
- private String timeOffset;
- private String deliveryTime;
- private String addictiveString;
+ private FileJob fileJob;
+ private BinlogJob binlogJob;
+ private KafkaJob kafkaJob;
}
@Data
@@ -91,27 +72,91 @@ public class JobProfileDto {
private Manager manager;
}
- private static Job getJob(DataConfig dataConfigs) {
- Job job = new Job();
- Dir dir = new Dir();
- dir.setPattern(dataConfigs.getDataName());
- job.setDir(dir);
- job.setTrigger(DEFAULT_TRIGGER);
- job.setChannel(DEFAULT_CHANNEL);
- job.setName(MANAGER_JOB);
- job.setSource(DEFAULT_SOURCE);
- job.setSink(DEFAULT_DATAPROXY_SINK);
- job.setId(dataConfigs.getTaskId());
- job.setTimeOffset(dataConfigs.getTimeOffset());
- job.setOp(dataConfigs.getOp());
- job.setDeliveryTime(dataConfigs.getDeliveryTime());
- if (!dataConfigs.getAdditionalAttr().isEmpty()) {
- job.setAddictiveString(dataConfigs.getAdditionalAttr());
+ private static BinlogJob getBinlogJob(DataConfig dataConfigs) {
+
+ BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = new BinlogJob.BinlogJobTaskConfig();
+ Gson gson = new Gson();
+ binlogJobTaskConfig = gson.fromJson(dataConfigs.getTaskConfig(), BinlogJob.BinlogJobTaskConfig.class);
+ BinlogJob binlogJob = new BinlogJob();
+
+ binlogJob.setHostname(binlogJobTaskConfig.getHostname());
+ binlogJob.setPassword(binlogJobTaskConfig.getPassword());
+ binlogJob.setTimeZone(binlogJobTaskConfig.getTimeZone());
+ binlogJob.setSnapshotMode(binlogJobTaskConfig.getSnapshotMode());
+ binlogJob.setUser(binlogJobTaskConfig.getUser());
+ binlogJob.setStoreHistoryFilename(binlogJobTaskConfig.getStoreHistoryFilename());
+ binlogJob.setIntervalMs(binlogJobTaskConfig.getIntervalMs());
+ binlogJob.setSnapshotMode(binlogJobTaskConfig.getSnapshotMode());
+ binlogJob.setOffset(binlogJobTaskConfig.getOffset());
+
+ binlogJob.setChannel(DEFAULT_CHANNEL);
+ binlogJob.setName(MANAGER_JOB);
+ binlogJob.setSource(BINLOG_SOURCE);
+ binlogJob.setSink(DEFAULT_DATAPROXY_SINK);
+ binlogJob.setDeliveryTime(dataConfigs.getDeliveryTime());
+ binlogJob.setOp(dataConfigs.getOp());
+
+ return binlogJob;
+ }
+
+ private static FileJob getFileJob(DataConfig dataConfigs) {
+
+ FileJob fileJob = new FileJob();
+ fileJob.setTrigger(DEFAULT_TRIGGER);
+ fileJob.setChannel(DEFAULT_CHANNEL);
+ fileJob.setName(MANAGER_JOB);
+ fileJob.setSource(DEFAULT_SOURCE);
+ fileJob.setSink(DEFAULT_DATAPROXY_SINK);
+
+ FileJob.FileJobTaskConfig fileJobTaskConfig = new FileJob.FileJobTaskConfig();
+ Gson gson = new Gson();
+ fileJobTaskConfig = gson.fromJson(dataConfigs.getTaskConfig(), FileJob.FileJobTaskConfig.class);
+
+ FileJob.Dir dir = new FileJob.Dir();
+ dir.setPattern(fileJobTaskConfig.getDataName());
+ dir.setPath(fileJobTaskConfig.getPath());
+ fileJob.setDir(dir);
+
+ fileJob.setId(fileJobTaskConfig.getTaskId());
+ fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
+
+ if (!fileJobTaskConfig.getAdditionalAttr().isEmpty()) {
+ fileJob.setAddictiveString(fileJobTaskConfig.getAdditionalAttr());
}
- if (dataConfigs.getCycleUnit() != null) {
- job.setCycleUnit(dataConfigs.getCycleUnit());
+ if (fileJobTaskConfig.getCycleUnit() != null) {
+ fileJob.setCycleUnit(fileJobTaskConfig.getCycleUnit());
}
- return job;
+ fileJob.setDeliveryTime(dataConfigs.getDeliveryTime());
+ fileJob.setOp(dataConfigs.getOp());
+
+ return fileJob;
+ }
+
+ private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
+
+ KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = new KafkaJob.KafkaJobTaskConfig();
+ Gson gson = new Gson();
+ kafkaJobTaskConfig = gson.fromJson(dataConfigs.getTaskConfig(), KafkaJob.KafkaJobTaskConfig.class);
+ KafkaJob kafkaJob = new KafkaJob();
+
+ kafkaJob.setTopic(kafkaJobTaskConfig.getTopic());
+ kafkaJob.setKeyDeserializer(kafkaJobTaskConfig.getValueDeserializer());
+ kafkaJob.setValueDeserializer(kafkaJobTaskConfig.getKeyDeserializer());
+ kafkaJob.setBootstrapServers(kafkaJobTaskConfig.getBootstrapServers());
+ kafkaJob.setGroupId(kafkaJobTaskConfig.getGroupId());
+ kafkaJob.setRecordSpeed(kafkaJobTaskConfig.getRecordSpeed());
+ kafkaJob.setByteSpeedLimit(kafkaJobTaskConfig.getByteSpeedLimit());
+ kafkaJob.setMinInterval(kafkaJobTaskConfig.getMinInterval());
+ kafkaJob.setOffset(kafkaJobTaskConfig.getOffset());
+
+ kafkaJob.setChannel(DEFAULT_CHANNEL);
+ kafkaJob.setName(MANAGER_JOB);
+ kafkaJob.setSource(KAFKA_SOURCE);
+ kafkaJob.setSink(DEFAULT_DATAPROXY_SINK);
+ kafkaJob.setDeliveryTime(dataConfigs.getDeliveryTime());
+ kafkaJob.setOp(dataConfigs.getOp());
+
+ return kafkaJob;
}
private static Proxy getProxy(DataConfig dataConfigs) {
@@ -130,11 +175,30 @@ public class JobProfileDto {
if (!dataConfigs.isValid()) {
throw new IllegalArgumentException("input dataConfig" + dataConfigs + "is invalid please check");
}
+ TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
JobProfileDto profileDto = new JobProfileDto();
Proxy proxy = getProxy(dataConfigs);
- Job job = getJob(dataConfigs);
profileDto.setProxy(proxy);
- profileDto.setJob(job);
+ Job job = new Job();
+ switch (requireNonNull(taskType)) {
+ case SQL:
+ case BINLOG:
+ BinlogJob binlogJob = getBinlogJob(dataConfigs);
+ job.setBinlogJob(binlogJob);
+ profileDto.setJob(job);
+ break;
+ case FILE:
+ FileJob fileJob = getFileJob(dataConfigs);
+ job.setFileJob(fileJob);
+ profileDto.setJob(job);
+ break;
+ case KAFKA:
+ KafkaJob kafkaJob = getKafkaJob(dataConfigs);
+ job.setKafkaJob(kafkaJob);
+ profileDto.setJob(job);
+ break;
+ default:
+ }
return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
}
}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/KafkaJob.java
similarity index 54%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/KafkaJob.java
index 5432f65..174a497 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/KafkaJob.java
@@ -18,26 +18,31 @@
package org.apache.inlong.agent.plugin.fetcher.dtos;
import lombok.Data;
-import org.apache.inlong.agent.conf.TriggerProfile;
-
-import java.util.ArrayList;
-import java.util.List;
@Data
-public class TaskResult {
+public class KafkaJob extends Job {
+
+ private String topic;
+ private String keyDeserializer;
+ private String valueDeserializer;
+ private String bootstrapServers;
+ private String groupId;
+ private String recordSpeed;
+ private String byteSpeedLimit;
+ private String minInterval;
+ private String offset;
- private List<CmdConfig> cmdConfigs;
- private List<DataConfig> dataConfigs;
+ @Data
+ public static class KafkaJobTaskConfig {
- public List<TriggerProfile> getTriggerProfiles() {
- List<TriggerProfile> triggerProfiles = new ArrayList<>();
- if (dataConfigs == null || dataConfigs.isEmpty()) {
- return triggerProfiles;
- }
- dataConfigs.forEach(
- dataConfig -> triggerProfiles.add(JobProfileDto
- .convertToTriggerProfile(dataConfig))
- );
- return triggerProfiles;
+ private String topic;
+ private String keyDeserializer;
+ private String valueDeserializer;
+ private String bootstrapServers;
+ private String groupId;
+ private String recordSpeed;
+ private String byteSpeedLimit;
+ private String minInterval;
+ private String offset;
}
-}
\ No newline at end of file
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
index 1c3105e..3fd8d5e 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
@@ -20,7 +20,7 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
-import org.apache.inlong.agent.db.CommandEntity;
+import org.apache.inlong.commons.db.CommandEntity;
@Data
public class TaskRequestDto {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
index 5432f65..eb9bcf1 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
@@ -19,6 +19,8 @@ package org.apache.inlong.agent.plugin.fetcher.dtos;
import lombok.Data;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.commons.dto.CmdConfig;
+import org.apache.inlong.commons.dto.DataConfig;
import java.util.ArrayList;
import java.util.List;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/ExcuteLinux.java
similarity index 51%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/ExcuteLinux.java
index 5432f65..cb12dea 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskResult.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/ExcuteLinux.java
@@ -15,29 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.agent.plugin.utils;
-import lombok.Data;
-import org.apache.inlong.agent.conf.TriggerProfile;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
+public class ExcuteLinux {
-@Data
-public class TaskResult {
+ public static String exeCmd(String commandStr) {
- private List<CmdConfig> cmdConfigs;
- private List<DataConfig> dataConfigs;
+ String result = null;
+ try {
+ String[] cmd = new String[]{"/bin/sh", "-c",commandStr};
+ Process ps = Runtime.getRuntime().exec(cmd);
- public List<TriggerProfile> getTriggerProfiles() {
- List<TriggerProfile> triggerProfiles = new ArrayList<>();
- if (dataConfigs == null || dataConfigs.isEmpty()) {
- return triggerProfiles;
+ BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
+ StringBuffer sb = new StringBuffer();
+ String line;
+ while ((line = br.readLine()) != null) {
+ sb.append(line).append("\n");
+ }
+ result = sb.toString();
+
+ } catch (Exception e) {
+ e.printStackTrace();
}
- dataConfigs.forEach(
- dataConfig -> triggerProfiles.add(JobProfileDto
- .convertToTriggerProfile(dataConfig))
- );
- return triggerProfiles;
+
+ return result;
+
}
}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index c548f5e..971775d 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -120,7 +120,8 @@ public class TestFileAgent {
@Test
public void testOneJobOnly() throws Exception {
- TriggerProfile triggerProfile = TriggerProfile.parseJsonStr(TestUtils.getTestTriggerProfile());
+ String jsonString = TestUtils.getTestTriggerProfile();
+ TriggerProfile triggerProfile = TriggerProfile.parseJsonStr(jsonString);
triggerProfile.set(JOB_DIR_FILTER_PATTERN, helper.getParentPath() + triggerProfile.get(JOB_DIR_FILTER_PATTERN));
triggerProfile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
"test[0-9].dat").toString());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index bbafd0f..db0d600 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -29,31 +29,33 @@ public class TestUtils {
public static String getTestTriggerProfile() {
return "{\n"
- + " \"job\": {\n"
- + " \"dir\": {\n"
- + " \"path\": \"\",\n"
- + " \"pattern\": \"/AgentBaseTestsHelper/"
- + "org.apache.tubemq.inlong.plugin.fetcher.TestTdmFetcher/test[0-9].dat\"\n"
- + " },\n"
- + " \"trigger\": \"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger\",\n"
- + " \"id\": 1,\n"
- + " \"thread\" : {\n"
- + "\"running\": {\n"
- + "\"core\": \"4\"\n"
- + "}\n"
- + "}, \n"
- + " \"op\": \"0\",\n"
- + " \"ip\": \"127.0.0.1\",\n"
- + " \"name\": \"fileAgentTest\",\n"
- + " \"source\": \"org.apache.inlong.agent.plugin.sources.TextFileSource\",\n"
- + " \"sink\": \"org.apache.inlong.agent.plugin.sinks.MockSink\",\n"
- + " \"channel\": \"org.apache.inlong.agent.plugin.channel.MemoryChannel\",\n"
- + " \"standalone\": true,\n"
- + " \"additionStr\": \"m=15&file=test\",\n"
- + " \"deliveryTime\": \"1231313\",\n"
- + " \"splitter\": \"&\"\n"
- + " }\n"
- + " }";
+ + " \"job\": {\n"
+ + " \"filejob\": {\n"
+ + " \"additionStr\": \"m=15&file=test\",\n"
+ + " \"trigger\": \"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger\",\n"
+ + " \"dir\": {\n"
+ + " \"path\": \"\",\n"
+ + " \"pattern\": \"/AgentBaseTestsHelper/"
+ + "org.apache.tubemq.inlong.plugin.fetcher.TestTdmFetcher/test[0-9].dat\"\n"
+ + " },\n"
+ + " \"thread\" : {\n"
+ + "\"running\": {\n"
+ + "\"core\": \"4\"\n"
+ + "}\n"
+ + "} \n"
+ + " },\n"
+ + " \"id\": 1,\n"
+ + " \"op\": \"0\",\n"
+ + " \"ip\": \"127.0.0.1\",\n"
+ + " \"name\": \"fileAgentTest\",\n"
+ + " \"source\": \"org.apache.inlong.agent.plugin.sources.TextFileSource\",\n"
+ + " \"sink\": \"org.apache.inlong.agent.plugin.sinks.MockSink\",\n"
+ + " \"channel\": \"org.apache.inlong.agent.plugin.channel.MemoryChannel\",\n"
+ + " \"standalone\": true,\n"
+ + " \"deliveryTime\": \"1231313\",\n"
+ + " \"splitter\": \"&\"\n"
+ + " }\n"
+ + " }";
}
public static void createHugeFiles(String fileName, String rootDir, String record) throws Exception {
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
index 33f1b07..8a21ba4 100755
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
+++ b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
@@ -5,10 +5,12 @@
"wait": 1
}
},
- "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger",
- "dir": {
- "path": "",
- "pattern": "/test.[0-9]"
+ "filejob": {
+ "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger",
+ "dir": {
+ "path": "",
+ "pattern": "/test.[0-9]"
+ }
},
"id": 1,
"name": "fileAgentTest",
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index d014f95..33c5eee 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -63,7 +63,11 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
- <version>1.18.12</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sleepycat</groupId>
+ <artifactId>je</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandEntity.java b/inlong-common/src/main/java/org/apache/inlong/commons/db/CommandEntity.java
similarity index 97%
rename from inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandEntity.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/db/CommandEntity.java
index eae9f87..35bb877 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandEntity.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/db/CommandEntity.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.commons.db;
import com.sleepycat.persist.model.Entity;
import com.sleepycat.persist.model.PrimaryKey;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java b/inlong-common/src/main/java/org/apache/inlong/commons/dto/CmdConfig.java
similarity index 94%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/dto/CmdConfig.java
index 6cb4eca..eab12df 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/CmdConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/dto/CmdConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.commons.dto;
import lombok.Data;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/commons/dto/DataConfig.java
similarity index 75%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/dto/DataConfig.java
index 8950199..ea0704a 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/dto/DataConfig.java
@@ -15,29 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.commons.dto;
import lombok.Data;
@Data
public class DataConfig {
- private String additionalAttr;
private String inlongGroupId;
- private String dataName;
private String inlongStreamId;
private String deliveryTime;
- private String fieldSplitter;
- private String cycleUnit;
+ private String uuid;
private String ip;
- private String middlewareType;
- private String mqMasterAddress;
private String op;
- private String scheduleTime;
private Integer taskId;
- private String timeOffset;
- private String topic;
+ private Integer taskType;
+ private String taskConfig;
public boolean isValid() {
return true;
}
-}
+}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java b/inlong-common/src/main/java/org/apache/inlong/commons/dto/TaskRequestDto.java
similarity index 89%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
copy to inlong-common/src/main/java/org/apache/inlong/commons/dto/TaskRequestDto.java
index 1c3105e..bb3ce75 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/TaskRequestDto.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/dto/TaskRequestDto.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.fetcher.dtos;
+package org.apache.inlong.commons.dto;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
-import org.apache.inlong.agent.db.CommandEntity;
+import org.apache.inlong.commons.db.CommandEntity;
@Data
public class TaskRequestDto {
private String agentIp;
+ private String uuid;
private List<CommandEntity> commandInfo = new ArrayList<>();
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/commons/enums/ManagerOpEnum.java
similarity index 97%
copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
copy to inlong-common/src/main/java/org/apache/inlong/commons/enums/ManagerOpEnum.java
index 799c474..1501404 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/enums/ManagerOpEnum.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.fetcher.enums;
+package org.apache.inlong.commons.enums;
import static java.util.Objects.requireNonNull;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/commons/enums/TaskTypeEnum.java
similarity index 57%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
rename to inlong-common/src/main/java/org/apache/inlong/commons/enums/TaskTypeEnum.java
index 799c474..e43e5c3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/enums/TaskTypeEnum.java
@@ -15,46 +15,36 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.fetcher.enums;
+package org.apache.inlong.commons.enums;
import static java.util.Objects.requireNonNull;
-public enum ManagerOpEnum {
- ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4), ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
+public enum TaskTypeEnum {
+ SQL(1), BINLOG(2), FILE(3), KAFKA(4);
private int type;
- ManagerOpEnum(int type) {
+ TaskTypeEnum(int type) {
this.type = type;
}
- public static ManagerOpEnum getOpType(int opType) {
- requireNonNull(opType);
- switch (opType) {
- case 0:
- return ADD;
+ public static TaskTypeEnum getTaskType(int taskType) {
+ requireNonNull(taskType);
+ switch (taskType) {
case 1:
- return DEL;
+ return SQL;
case 2:
- return RETRY;
+ return BINLOG;
case 3:
- return BACKTRACK;
+ return FILE;
case 4:
- return FROZEN;
- case 5:
- return ACTIVE;
- case 6:
- return CHECK;
- case 7:
- return REDOMETRIC;
- case 8:
- return MAKEUP;
+ return KAFKA;
default:
- throw new RuntimeException("such op type doesn't exist");
+ throw new RuntimeException("such task type doesn't exist");
}
}
public int getType() {
return type;
}
-}
+}
\ No newline at end of file
diff --git a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
index ca17f58..746f2af 100644
--- a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
+++ b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/InLongPulsarFetcherImplTest.java
@@ -41,9 +41,11 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
+@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
public class InLongPulsarFetcherImplTest {
diff --git a/pom.xml b/pom.xml
index f73742d..d895699 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,7 @@
<docker.organization>inlong</docker.organization>
<lombok.version>1.18.22</lombok.version>
<logback.version>1.2.10</logback.version>
+ <je.version>7.3.7</je.version>
</properties>
<dependencyManagement>
@@ -132,6 +133,11 @@
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.sleepycat</groupId>
+ <artifactId>je</artifactId>
+ <version>${je.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -293,6 +299,17 @@
</build>
<repositories>
+ <repository>
+ <id>berkeleydb-je</id>
+ <name>berkeleydb-je</name>
+ <url>https://download.dcache.org/nexus/repository/berkeleydb-je/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
</repositories>
</project>