You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by lu...@apache.org on 2023/11/16 09:32:28 UTC

(inlong) branch master updated: [INLONG-9300][Agent] Divide data time into source time and sink time (#9301)

This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f7827601fe [INLONG-9300][Agent] Divide data time into source time and sink time (#9301)
f7827601fe is described below

commit f7827601fec3f1d772a7065ba7e0afae23d998a6
Author: justinwwhuang <hw...@163.com>
AuthorDate: Thu Nov 16 17:32:22 2023 +0800

    [INLONG-9300][Agent] Divide data time into source time and sink time (#9301)
---
 .../apache/inlong/agent/conf/InstanceProfile.java  | 18 +++++++++++++-----
 .../org/apache/inlong/agent/conf/TaskProfile.java  | 22 +++++++++++++++++++++-
 .../inlong/agent/constant/TaskConstants.java       |  8 ++++++--
 .../message/filecollect/ProxyMessageCache.java     |  2 +-
 .../org/apache/inlong/agent/pojo/FileTask.java     |  3 +++
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  3 +++
 .../inlong/agent/core/AgentBaseTestsHelper.java    |  4 +++-
 .../agent/core/instance/TestInstanceManager.java   |  4 ++--
 .../inlong/agent/plugin/sources/LogFileSource.java |  2 +-
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  |  2 ++
 10 files changed, 55 insertions(+), 13 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 5c3e74fe86..acc6444aba 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -155,18 +155,26 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable
         set(TaskConstants.INSTANCE_ID, instanceId);
     }
 
-    public void setDataTime(String dataTime) {
-        set(TaskConstants.JOB_DATA_TIME, dataTime);
+    public void setSourceDataTime(String dataTime) {
+        set(TaskConstants.SOURCE_DATA_TIME, dataTime);
     }
 
-    public String getDataTime() {
-        return get(TaskConstants.JOB_DATA_TIME);
+    public String getSourceDataTime() {
+        return get(TaskConstants.SOURCE_DATA_TIME);
+    }
+
+    public void setSinkDataTime(Long dataTime) {
+        setLong(TaskConstants.SINK_DATA_TIME, dataTime);
+    }
+
+    public Long getSinkDataTime() {
+        return getLong(TaskConstants.SINK_DATA_TIME, 0);
     }
 
     @Override
     public int compareTo(InstanceProfile object) {
         int ret = ComparisonChain.start()
-                .compare(getDataTime(), object.getDataTime())
+                .compare(getSourceDataTime(), object.getSourceDataTime())
                 .compare(FileUtils.getFileCreationTime(getInstanceId()),
                         FileUtils.getFileCreationTime(object.getInstanceId()))
                 .compare(FileUtils.getFileLastModifyTime(getInstanceId()),
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index 319f1abf56..be9b8cd1f3 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -20,11 +20,17 @@ package org.apache.inlong.agent.conf;
 import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.pojo.TaskProfileDto;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
 import org.apache.inlong.common.enums.TaskStateEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
 import com.google.gson.Gson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.TimeZone;
 
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
@@ -35,6 +41,7 @@ import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
 public class TaskProfile extends AbstractConfiguration {
 
     private static final Gson GSON = new Gson();
+    private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class);
 
     /**
      * Get a TaskProfile from a DataConfig
@@ -58,6 +65,10 @@ public class TaskProfile extends AbstractConfiguration {
         return get(TaskConstants.TASK_FILE_TIME_OFFSET);
     }
 
+    public String getTimeZone() {
+        return get(TaskConstants.TASK_FILE_TIME_ZONE);
+    }
+
     public TaskStateEnum getState() {
         return TaskStateEnum.getTaskState(getInt(TASK_STATE));
     }
@@ -111,7 +122,16 @@ public class TaskProfile extends AbstractConfiguration {
         InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr());
         instanceProfile.setInstanceClass(instanceClass);
         instanceProfile.setInstanceId(fileName);
-        instanceProfile.setDataTime(dataTime);
+        instanceProfile.setSourceDataTime(dataTime);
+        Long sinkDataTime = 0L;
+        try {
+            sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime, getCycleUnit(),
+                    TimeZone.getTimeZone(getTimeZone()));
+        } catch (ParseException e) {
+            logger.error("createInstanceProfile error: ", e);
+            return null;
+        }
+        instanceProfile.setSinkDataTime(sinkDataTime);
         instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
         instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
         instanceProfile.setState(InstanceStateEnum.DEFAULT);
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 3285936951..872c42319f 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -64,6 +64,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
     public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
     public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
+    public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
     public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
     public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
     public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType";
@@ -179,8 +180,11 @@ public class TaskConstants extends CommonConstants {
     // job delivery time
     public static final String JOB_DELIVERY_TIME = "job.deliveryTime";
 
-    // job time reading file
-    public static final String JOB_DATA_TIME = "job.dataTime";
+    // data time reading file
+    public static final String SOURCE_DATA_TIME = "source.dataTime";
+
+    // data time for sink
+    public static final String SINK_DATA_TIME = "sink.dataTime";
 
     // job of the number of seconds to wait before starting the task
     public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 29ebcc75b0..7e2d3c8b8a 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -83,7 +83,7 @@ public class ProxyMessageCache {
         this.streamId = streamId;
         this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
         try {
-            dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(),
+            dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(),
                     instanceProfile.get(TASK_CYCLE_UNIT));
         } catch (ParseException e) {
             LOGGER.info("trans dataTime error", e);
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
index 7942e74bef..ec8ce9f47f 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java
@@ -33,6 +33,7 @@ public class FileTask {
     private Long startTime;
     private Long endTime;
     private String timeOffset;
+    private String timeZone;
     private String addictiveString;
     private String collectType;
     private Line line;
@@ -109,6 +110,8 @@ public class FileTask {
         // '1d' means one day after, '-1d' means one day before
         // Null means from current timestamp
         private String timeOffset;
+        // Asia/Shanghai
+        private String timeZone;
         // For example: a=b&c=b&e=f
         private String additionalAttr;
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 0fcf7a95f4..6d8cd16816 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -148,6 +148,9 @@ public class TaskProfileDto {
         if (taskConfig.getTimeOffset() != null) {
             fileTask.setTimeOffset(taskConfig.getTimeOffset());
         }
+        if (taskConfig.getTimeZone() != null) {
+            fileTask.setTimeZone(taskConfig.getTimeZone());
+        }
 
         if (taskConfig.getAdditionalAttr() != null) {
             fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index d3d593d345..73017c4215 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -91,8 +91,10 @@ public class AgentBaseTestsHelper {
         FileTaskConfig fileTaskConfig = new FileTaskConfig();
         fileTaskConfig.setPattern(pattern);
         fileTaskConfig.setTimeOffset("0d");
+        // GMT-8:00 same with Asia/Shanghai
+        fileTaskConfig.setTimeZone("GMT-8:00");
         fileTaskConfig.setMaxFileCount(100);
-        fileTaskConfig.setCycleUnit("D");
+        fileTaskConfig.setCycleUnit("h");
         fileTaskConfig.setRetry(retry);
         fileTaskConfig.setStartTime(startTime);
         fileTaskConfig.setEndTime(endTime);
diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index aae1c15da9..0a87660c1f 100755
--- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -65,7 +65,7 @@ public class TestInstanceManager {
     public void testInstanceManager() {
         long timeBefore = AgentUtils.getCurrentTime();
         InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime());
+                helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime());
         String instanceId = profile.getInstanceId();
         InstanceAction action = new InstanceAction();
         action.setActionType(ActionType.ADD);
@@ -87,7 +87,7 @@ public class TestInstanceManager {
 
         // test continue
         profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
-                helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime());
+                helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime());
         action = new InstanceAction();
         action.setActionType(ActionType.ADD);
         action.setProfile(profile);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index cbc1cff372..1d19ffc7de 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -156,7 +156,7 @@ public class LogFileSource extends AbstractSource {
             linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo);
             bytePosition = getBytePositionByLine(linePosition);
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
-            dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(),
+            dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(),
                     profile.get(TASK_CYCLE_UNIT));
             try {
                 registerMeta(profile);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index eccfe50b8c..2410c07ff3 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -97,6 +97,8 @@ public class AgentBaseTestsHelper {
         FileTaskConfig fileTaskConfig = new FileTaskConfig();
         fileTaskConfig.setPattern(pattern);
         fileTaskConfig.setTimeOffset("0d");
+        // GMT-8:00 same with Asia/Shanghai
+        fileTaskConfig.setTimeZone("GMT-8:00");
         fileTaskConfig.setMaxFileCount(100);
         fileTaskConfig.setCycleUnit("D");
         fileTaskConfig.setRetry(retry);