You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/06 06:47:22 UTC
[incubator-inlong] branch master updated: [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager (#2929)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 43cfef7 [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager (#2929)
43cfef7 is described below
commit 43cfef77b432971bc1ee7ec5ebe96b7fe8e9c9fa
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Sun Mar 6 14:47:14 2022 +0800
[INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager (#2929)
* [INLONG-2928][Agent] Fix multiple bugs when the agent communicates with the manager
* [INLONG-2928][Agent] Fix Kafka consumer multiple thread modification bugs
---
.../org/apache/inlong/agent/conf/JobProfile.java | 3 +-
.../apache/inlong/agent/conf/TriggerProfile.java | 2 +-
.../org/apache/inlong/agent/db/JobProfileDb.java | 12 +++---
.../org/apache/inlong/agent/plugin/Reader.java | 5 +++
.../org/apache/inlong/agent/pojo/BinlogJob.java | 6 +--
.../apache/inlong/agent/pojo/JobProfileDto.java | 15 +++----
.../org/apache/inlong/agent/utils/AgentUtils.java | 6 +--
.../apache/inlong/agent/db/TestBerkeleyDBImp.java | 2 +-
.../org/apache/inlong/agent/core/AgentManager.java | 2 +
.../apache/inlong/agent/core/HeartbeatManager.java | 46 ++++++++++------------
.../apache/inlong/agent/core/job/JobManager.java | 4 +-
.../apache/inlong/agent/core/task/TaskWrapper.java | 3 +-
.../apache/inlong/agent/task/TestTaskWrapper.java | 5 +++
.../agent/plugin/fetcher/ManagerFetcher.java | 12 +++---
.../agent/plugin/sources/reader/BinlogReader.java | 24 ++++++++---
.../agent/plugin/sources/reader/KafkaReader.java | 17 ++++++--
.../agent/plugin/sources/reader/SqlReader.java | 5 +++
.../plugin/sources/reader/TextFileReader.java | 5 +++
18 files changed, 109 insertions(+), 65 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
index 8448284..3d9d24b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/JobProfile.java
@@ -71,8 +71,7 @@ public class JobProfile extends AbstractConfiguration {
@Override
public boolean allRequiredKeyExist() {
return hasKey(JobConstants.JOB_ID) && hasKey(JobConstants.JOB_SOURCE_CLASS)
- && hasKey(JobConstants.JOB_SINK) && hasKey(JobConstants.JOB_CHANNEL) && hasKey(
- JobConstants.JOB_NAME);
+ && hasKey(JobConstants.JOB_SINK) && hasKey(JobConstants.JOB_CHANNEL);
}
public String toJsonStr() {
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
index 6d1aa79..6c09e82 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
@@ -56,7 +56,7 @@ public class TriggerProfile extends JobProfile {
@Override
public boolean allRequiredKeyExist() {
- return hasKey(JobConstants.JOB_TRIGGER) && super.allRequiredKeyExist();
+ return super.allRequiredKeyExist();
}
public String getTriggerId() {
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
index 6469efc..93769ec 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/JobProfileDb.java
@@ -36,8 +36,11 @@ public class JobProfileDb {
this.db = db;
}
- public List<JobProfile> getAcceptedJobs() {
- return getJobsByState(StateSearchKey.ACCEPTED);
+ public List<JobProfile> getRestartJobs() {
+ List<JobProfile> jobsByState = getJobsByState(StateSearchKey.ACCEPTED);
+ jobsByState.addAll(getJobsByState(StateSearchKey.RUNNING));
+ LOGGER.info("try to get restart jobs from db {}", jobsByState);
+ return jobsByState;
}
/**
@@ -64,6 +67,7 @@ public class JobProfileDb {
KeyValueEntity entity = new KeyValueEntity(keyName,
jobProfile.toJsonStr(), jobProfile.get(JobConstants.JOB_DIR_FILTER_PATTERN, ""));
entity.setStateSearchKey(StateSearchKey.ACCEPTED);
+ LOGGER.info("store job {} to db", jobProfile.toJsonStr());
db.put(entity);
}
}
@@ -164,9 +168,7 @@ public class JobProfileDb {
List<KeyValueEntity> entityList = db.search(stateSearchKey);
List<JobProfile> profileList = new ArrayList<>();
for (KeyValueEntity entity : entityList) {
- if (entity.getKey().startsWith(JobConstants.JOB_ID_PREFIX)) {
- profileList.add(entity.getAsJobProfile());
- }
+ profileList.add(entity.getAsJobProfile());
}
return profileList;
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
index cec8285..4206b6b 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Reader.java
@@ -58,4 +58,9 @@ public interface Reader extends Stage {
* @return
*/
String getSnapshot();
+
+ /**
+ * finish read
+ */
+ void finishRead();
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java
index 8edaa53..8c8ae69 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/BinlogJob.java
@@ -61,7 +61,7 @@ public class BinlogJob {
private String password;
private String hostname;
private String port;
- private String schema;
+ private String includeSchema;
private String databaseWhiteList;
private String tableWhiteList;
@@ -69,8 +69,8 @@ public class BinlogJob {
private String intervalMs;
private String offsetFilename;
private String historyFilename;
- private String mode;
- private String ddl;
+ private String snapshotMode;
+ private String monitoredDdl;
}
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index ffca852..2b02ab2 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -17,6 +17,10 @@
package org.apache.inlong.agent.pojo;
+import static java.util.Objects.requireNonNull;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+
import com.google.gson.Gson;
import lombok.Data;
import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -24,10 +28,6 @@ import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
-import static java.util.Objects.requireNonNull;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-
@Data
public class JobProfileDto {
@@ -64,10 +64,10 @@ public class JobProfileDto {
binlogJob.setUser(binlogJobTaskConfig.getUser());
binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList());
binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList());
- binlogJob.setSchema(binlogJobTaskConfig.getSchema());
+ binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema());
binlogJob.setPort(binlogJobTaskConfig.getPort());
binlogJob.setOffsets(dataConfigs.getSnapshot());
- binlogJob.setDdl(binlogJobTaskConfig.getDdl());
+ binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl());
binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone());
BinlogJob.Offset offset = new BinlogJob.Offset();
@@ -77,7 +77,7 @@ public class JobProfileDto {
binlogJob.setOffset(offset);
BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot();
- snapshot.setMode(binlogJobTaskConfig.getMode());
+ snapshot.setMode(binlogJobTaskConfig.getSnapshotMode());
binlogJob.setSnapshot(snapshot);
@@ -174,6 +174,7 @@ public class JobProfileDto {
job.setOp(dataConfigs.getOp());
job.setDeliveryTime(dataConfigs.getDeliveryTime());
job.setUuid(dataConfigs.getUuid());
+ job.setSink(DEFAULT_DATAPROXY_SINK);
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index c60bdcc..5d22f4c 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -337,8 +337,7 @@ public class AgentUtils {
* check agent ip from manager
*/
public static String fetchLocalIp() {
- String localIp = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP, DEFAULT_LOCAL_IP);
- return localIp;
+ return AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP, DEFAULT_LOCAL_IP);
}
/**
@@ -346,8 +345,7 @@ public class AgentUtils {
*/
public static String fetchLocalUuid() {
String result = ExcuteLinux.exeCmd("dmidecode | grep UUID");
- String uuid = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID, result);
- return uuid;
+ return AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID, result);
}
/**
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
index 2fda763..4458f0c 100755
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
@@ -80,7 +80,7 @@ public class TestBerkeleyDBImp {
@Test
public void testCommandDb() {
- CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "2022-03-05 12:10:10");
+ CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "1");
db.putCommand(commandEntity);
CommandEntity command = db.getCommand("1");
Assert.assertEquals("1", command.getId());
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index df0d6a2..64b1cff 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -168,6 +168,7 @@ public class AgentManager extends AbstractDaemon {
triggerManager.start();
jobManager.start();
taskManager.start();
+ heartbeatManager.start();
taskPositionManager.start();
// read job profiles from local
List<JobProfile> profileList = localProfile.readFromLocal();
@@ -207,6 +208,7 @@ public class AgentManager extends AbstractDaemon {
triggerManager.stop();
jobManager.stop();
taskManager.stop();
+ heartbeatManager.stop();
taskPositionManager.stop();
this.db.close();
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index fd129d1..783168f 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.core;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -26,10 +27,8 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
-import org.springframework.scheduling.annotation.Scheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
@@ -42,8 +41,8 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VI
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
+import static org.apache.inlong.agent.core.task.TaskPositionManager.DEFAULT_FLUSH_TIMEOUT;
-@Component
public class HeartbeatManager extends AbstractDaemon {
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
@@ -72,14 +71,10 @@ public class HeartbeatManager extends AbstractDaemon {
* @return
*/
private TaskSnapshotRequest getHeartBeat() {
- AgentManager agentManager = new AgentManager();
- HeartbeatManager heartbeatManager = new HeartbeatManager(agentManager);
- JobManager jobManager = agentManager.getJobManager();
- Map<String, JobWrapper> jobWrapperMap = jobManager.getJobs();
+ Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
List<TaskSnapshotMessage> taskSnapshotMessageList = new ArrayList<>();
TaskSnapshotRequest taskSnapshotRequest = new TaskSnapshotRequest();
- TaskSnapshotMessage snapshotMessage = new TaskSnapshotMessage();
Date date = new Date(System.currentTimeMillis());
@@ -90,10 +85,10 @@ public class HeartbeatManager extends AbstractDaemon {
}
String offset = entry.getValue().getSnapshot();
String jobId = entry.getKey();
+ TaskSnapshotMessage snapshotMessage = new TaskSnapshotMessage();
snapshotMessage.setSnapshot(offset);
snapshotMessage.setJobId(Integer.valueOf(jobId));
taskSnapshotMessageList.add(snapshotMessage);
-
}
taskSnapshotRequest.setSnapshotList(taskSnapshotMessageList);
taskSnapshotRequest.setReportTime(date);
@@ -103,21 +98,6 @@ public class HeartbeatManager extends AbstractDaemon {
}
/**
- * report heartbeat (default : per minute)
- */
- @Scheduled(cron = "${agent.scheduled.snapshotreport:0 0/1 * * * ? *}")
- private void sendHeartBeat() {
- TaskSnapshotRequest taskSnapshotRequest = getHeartBeat();
- try {
- String returnStr = httpManager.doSentPost(reportSnapshotUrl,taskSnapshotRequest);
- LOGGER.info(" {} report to manager",taskSnapshotRequest);
- } catch (Throwable e) {
- LOGGER.error(" sendHeartBeat to" + reportSnapshotUrl
- + " exception {}, {}", e.toString(), e.getStackTrace());
- }
- }
-
- /**
* build base url for manager according to config
*
* @example - http://127.0.0.1:8080/api/inlong/manager/openapi
@@ -135,11 +115,27 @@ public class HeartbeatManager extends AbstractDaemon {
@Override
public void start() throws Exception {
+ submitWorker(heartBeatReportThread());
+ }
+ private Runnable heartBeatReportThread() {
+ return () -> {
+ while (isRunnable()) {
+ try {
+ TaskSnapshotRequest taskSnapshotRequest = getHeartBeat();
+ httpManager.doSentPost(reportSnapshotUrl,taskSnapshotRequest);
+ LOGGER.info(" {} report to manager",taskSnapshotRequest);
+ TimeUnit.SECONDS.sleep(DEFAULT_FLUSH_TIMEOUT);
+ } catch (Exception ex) {
+ LOGGER.error("error caught", ex);
+ }
+ }
+ };
}
@Override
public void stop() throws Exception {
-
+ waitForTerminate();
}
+
}
\ No newline at end of file
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 80e4181..1735a1c 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -178,8 +178,8 @@ public class JobManager extends AbstractDaemon {
* @param jobInstancId
*/
public boolean deleteJob(String jobInstancId) {
- JobWrapper jobWrapper = jobs.remove(jobInstancId);
LOGGER.info("start to delete job, job id set {}", jobs.keySet());
+ JobWrapper jobWrapper = jobs.remove(jobInstancId);
if (jobWrapper != null) {
LOGGER.info("delete job instance with job id {}", jobInstancId);
jobWrapper.cleanup();
@@ -193,7 +193,7 @@ public class JobManager extends AbstractDaemon {
* start all accepted jobs.
*/
private void startJobs() {
- List<JobProfile> profileList = getJobConfDb().getAcceptedJobs();
+ List<JobProfile> profileList = getJobConfDb().getRestartJobs();
for (JobProfile profile : profileList) {
LOGGER.info("init starting job from db {}", profile.toJsonStr());
addJob(new Job(profile));
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index 4e8c029..7bdfc7b 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -91,6 +91,7 @@ public class TaskWrapper extends AbstractStateWrapper {
isException(), task.isReadFinished());
// write end message
task.getChannel().push(new EndMessage());
+ task.getReader().destroy();
}, executorService);
}
@@ -149,7 +150,7 @@ public class TaskWrapper extends AbstractStateWrapper {
*/
void destroyTask() {
LOGGER.info("destroy task id is {}", task.getTaskId());
- task.getReader().destroy();
+ task.getReader().finishRead();
}
/**
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
index 11a3400..8a9131b 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/task/TestTaskWrapper.java
@@ -149,6 +149,11 @@ public class TestTaskWrapper {
return null;
}
+ @Override
+ public void finishRead() {
+
+ }
+
public int getCount() {
return count;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 4ce560c..84063da 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.fetcher;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@@ -96,7 +97,8 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
public static final String AGENT = "agent";
private static final Logger LOGGER = LoggerFactory.getLogger(ManagerFetcher.class);
- private static final Gson GSON = new Gson();
+ private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
+ private static final Gson GSON = gsonBuilder.create();
private static final int MAX_RETRY = 2;
private final String managerVipUrl;
private final String baseManagerUrl;
@@ -261,7 +263,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
}
JobProfile profile = taskResult.getJobProfile();
if (profile == null) {
- LOGGER.error("profile is null");
return;
}
agentManager.getJobManager().submitSqlJobProfile(profile);
@@ -272,10 +273,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
*/
private void dealWithFileTaskResult(TaskResult taskResult) {
LOGGER.info("deal with fetch result {}", taskResult);
-
for (DataConfig dataConfig : taskResult.getDataConfigs()) {
TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
- LOGGER.info("the triggerProfile: {}", profile);
+ LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
if (profile.hasKey(JOB_TRIGGER)) {
dealWithTdmTriggerProfile(profile);
} else {
@@ -497,8 +497,8 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
@Override
public void start() throws Exception {
// when agent start, check local ip and fetch manager ip list;
- fetchLocalIp();
- fetchLocalUuid();
+ localIp = fetchLocalIp();
+ uuid = fetchLocalUuid();
fetchTdmList(true, 0);
submitWorker(profileFetchThread());
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 052695b..6a3bd06 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -98,7 +98,7 @@ public class BinlogReader implements Reader {
private BinlogSnapshotBase binlogSnapshot;
private JobProfile jobProfile;
private static final Gson gson = new Gson();
-
+ private boolean destroyed = false;
private boolean enableReportConfigLog;
private StreamConfigLogMetric streamConfigLogMetric;
@@ -231,6 +231,7 @@ public class BinlogReader implements Reader {
props.setProperty("database.history.file.filename", databaseStoreHistoryName);
props.setProperty("database.snapshot.mode", snapshotMode);
props.setProperty("database.history.store.only.monitored.tables.ddl", historyMonitorDdl);
+ props.setProperty("database.allowPublicKeyRetrieval", "true");
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");
props.setProperty("include.schema.changes", includeSchemaChanges);
@@ -242,9 +243,13 @@ public class BinlogReader implements Reader {
@Override
public void destroy() {
- finished = true;
- executor.shutdownNow();
- binlogSnapshot.close();
+ synchronized (this) {
+ if (!destroyed) {
+ executor.shutdownNow();
+ binlogSnapshot.close();
+ destroyed = true;
+ }
+ }
}
@Override
@@ -269,7 +274,16 @@ public class BinlogReader implements Reader {
@Override
public String getSnapshot() {
- return binlogSnapshot.getSnapshot();
+ if (binlogSnapshot != null) {
+ return binlogSnapshot.getSnapshot();
+ } else {
+ return "";
+ }
+ }
+
+ @Override
+ public void finishRead() {
+ finished = true;
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 8374617..4ec53ae 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -23,6 +23,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROU
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_BYTE_SPEED_LIMIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_OFFSET;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_PARTITION_OFFSET_DELIMITER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_KAFKA_RECORD_SPEED_LIMIT;
import java.nio.charset.StandardCharsets;
@@ -89,6 +90,7 @@ public class KafkaReader<K, V> implements Reader {
private String inlongStreamId;
private String snapshot;
private boolean isFinished = false;
+ private boolean destroyed = false;
/**
* init attribute
@@ -137,7 +139,7 @@ public class KafkaReader<K, V> implements Reader {
// commit offset
consumer.commitAsync();
// commit succeed,then record current offset
- snapshot = String.valueOf(record.offset());
+ snapshot = record.partition() + JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
DefaultMessage message = new DefaultMessage(recordValue.getBytes(StandardCharsets.UTF_8), headerMap);
recordReadLimit(1L, message.getBody().length);
return message;
@@ -190,8 +192,12 @@ public class KafkaReader<K, V> implements Reader {
@Override
public void destroy() {
- isFinished = true;
- consumer.close();
+ synchronized (this) {
+ if (!destroyed) {
+ consumer.close();
+ destroyed = true;
+ }
+ }
}
private void initReadTimeout(JobProfile jobConf) {
@@ -222,6 +228,11 @@ public class KafkaReader<K, V> implements Reader {
return snapshot;
}
+ @Override
+ public void finishRead() {
+ isFinished = true;
+ }
+
private boolean fetchData(long fetchDataTimeout) {
// cosume data
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(fetchDataTimeout));
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index fea6908..70b4328 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -163,6 +163,11 @@ public class SqlReader extends AbstractReader {
return StringUtils.EMPTY;
}
+ @Override
+ public void finishRead() {
+ destroy();
+ }
+
/**
* Init column meta data.
*
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index d987d33..3223dc2 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -145,6 +145,11 @@ public class TextFileReader extends AbstractReader {
return StringUtils.EMPTY;
}
+ @Override
+ public void finishRead() {
+ destroy();
+ }
+
public void addPatternValidator(String pattern) {
if (pattern.isEmpty()) {
return;