You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by lu...@apache.org on 2023/11/16 09:32:28 UTC
(inlong) branch master updated: [INLONG-9300][Agent] Divide data time into source time and sink time (#9301)
This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f7827601fe [INLONG-9300][Agent] Divide data time into source time and sink time (#9301)
f7827601fe is described below
commit f7827601fec3f1d772a7065ba7e0afae23d998a6
Author: justinwwhuang <hw...@163.com>
AuthorDate: Thu Nov 16 17:32:22 2023 +0800
[INLONG-9300][Agent] Divide data time into source time and sink time (#9301)
---
.../apache/inlong/agent/conf/InstanceProfile.java | 18 +++++++++++++-----
.../org/apache/inlong/agent/conf/TaskProfile.java | 22 +++++++++++++++++++++-
.../inlong/agent/constant/TaskConstants.java | 8 ++++++--
.../message/filecollect/ProxyMessageCache.java | 2 +-
.../org/apache/inlong/agent/pojo/FileTask.java | 3 +++
.../apache/inlong/agent/pojo/TaskProfileDto.java | 3 +++
.../inlong/agent/core/AgentBaseTestsHelper.java | 4 +++-
.../agent/core/instance/TestInstanceManager.java | 4 ++--
.../inlong/agent/plugin/sources/LogFileSource.java | 2 +-
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 2 ++
10 files changed, 55 insertions(+), 13 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 5c3e74fe86..acc6444aba 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -155,18 +155,26 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable
set(TaskConstants.INSTANCE_ID, instanceId);
}
- public void setDataTime(String dataTime) {
- set(TaskConstants.JOB_DATA_TIME, dataTime);
+ public void setSourceDataTime(String dataTime) {
+ set(TaskConstants.SOURCE_DATA_TIME, dataTime);
}
- public String getDataTime() {
- return get(TaskConstants.JOB_DATA_TIME);
+ public String getSourceDataTime() {
+ return get(TaskConstants.SOURCE_DATA_TIME);
+ }
+
+ public void setSinkDataTime(Long dataTime) {
+ setLong(TaskConstants.SINK_DATA_TIME, dataTime);
+ }
+
+ public Long getSinkDataTime() {
+ return getLong(TaskConstants.SINK_DATA_TIME, 0);
}
@Override
public int compareTo(InstanceProfile object) {
int ret = ComparisonChain.start()
- .compare(getDataTime(), object.getDataTime())
+ .compare(getSourceDataTime(), object.getSourceDataTime())
.compare(FileUtils.getFileCreationTime(getInstanceId()),
FileUtils.getFileCreationTime(object.getInstanceId()))
.compare(FileUtils.getFileLastModifyTime(getInstanceId()),
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 319f1abf56..be9b8cd1f3 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -20,11 +20,17 @@ package org.apache.inlong.agent.conf;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.pojo.TaskProfileDto;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.TimeZone;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
@@ -35,6 +41,7 @@ import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
public class TaskProfile extends AbstractConfiguration {
private static final Gson GSON = new Gson();
+ private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class);
/**
* Get a TaskProfile from a DataConfig
@@ -58,6 +65,10 @@ public class TaskProfile extends AbstractConfiguration {
return get(TaskConstants.TASK_FILE_TIME_OFFSET);
}
+ public String getTimeZone() {
+ return get(TaskConstants.TASK_FILE_TIME_ZONE);
+ }
+
public TaskStateEnum getState() {
return TaskStateEnum.getTaskState(getInt(TASK_STATE));
}
@@ -111,7 +122,16 @@ public class TaskProfile extends AbstractConfiguration {
InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
- instanceProfile.setDataTime(dataTime);
+ instanceProfile.setSourceDataTime(dataTime);
+ Long sinkDataTime = 0L;
+ try {
+ sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime, getCycleUnit(),
+ TimeZone.getTimeZone(getTimeZone()));
+ } catch (ParseException e) {
+ logger.error("createInstanceProfile error: ", e);
+ return null;
+ }
+ instanceProfile.setSinkDataTime(sinkDataTime);
instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
instanceProfile.setState(InstanceStateEnum.DEFAULT);
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 3285936951..872c42319f 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -64,6 +64,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
+ public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType";
@@ -179,8 +180,11 @@ public class TaskConstants extends CommonConstants {
// job delivery time
public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
- // job time reading file
- public static final String JOB_DATA_TIME = "job.dataTime";
+ // data time reading file
+ public static final String SOURCE_DATA_TIME = "source.dataTime";
+
+ // data time for sink
+ public static final String SINK_DATA_TIME = "sink.dataTime";
// job of the number of seconds to wait before starting the task
public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 29ebcc75b0..7e2d3c8b8a 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -83,7 +83,7 @@ public class ProxyMessageCache {
this.streamId = streamId;
this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
try {
- dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(),
+ dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(),
instanceProfile.get(TASK_CYCLE_UNIT));
} catch (ParseException e) {
LOGGER.info("trans dataTime error", e);
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index 7942e74bef..ec8ce9f47f 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -33,6 +33,7 @@ public class FileTask {
private Long startTime;
private Long endTime;
private String timeOffset;
+ private String timeZone;
private String addictiveString;
private String collectType;
private Line line;
@@ -109,6 +110,8 @@ public class FileTask {
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
+ // Asia/Shanghai
+ private String timeZone;
// For example: a=b&c=b&e=f
private String additionalAttr;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 0fcf7a95f4..6d8cd16816 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -148,6 +148,9 @@ public class TaskProfileDto {
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
}
+ if (taskConfig.getTimeZone() != null) {
+ fileTask.setTimeZone(taskConfig.getTimeZone());
+ }
if (taskConfig.getAdditionalAttr() != null) {
fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index d3d593d345..73017c4215 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -91,8 +91,10 @@ public class AgentBaseTestsHelper {
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
+ // GMT-8:00 same with Asia/Shanghai
+ fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
- fileTaskConfig.setCycleUnit("D");
+ fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index aae1c15da9..0a87660c1f 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -65,7 +65,7 @@ public class TestInstanceManager {
public void testInstanceManager() {
long timeBefore = AgentUtils.getCurrentTime();
InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime());
+ helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime());
String instanceId = profile.getInstanceId();
InstanceAction action = new InstanceAction();
action.setActionType(ActionType.ADD);
@@ -87,7 +87,7 @@ public class TestInstanceManager {
// test continue
profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime());
+ helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime());
action = new InstanceAction();
action.setActionType(ActionType.ADD);
action.setProfile(profile);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index cbc1cff372..1d19ffc7de 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -156,7 +156,7 @@ public class LogFileSource extends AbstractSource {
linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
bytePosition = getBytePositionByLine(linePosition);
queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
- dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(),
+ dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(),
profile.get(TASK_CYCLE_UNIT));
try {
registerMeta(profile);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index eccfe50b8c..2410c07ff3 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -97,6 +97,8 @@ public class AgentBaseTestsHelper {
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0d");
+ // GMT-8:00 same with Asia/Shanghai
+ fileTaskConfig.setTimeZone("GMT-8:00");
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("D");
fileTaskConfig.setRetry(retry);