You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/08/26 07:32:55 UTC

[incubator-inlong] branch master updated: [INLONG-749][Agent] Support read data from different time offset (#1480)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cbbb29d  [INLONG-749][Agent] Support read data from different time offset (#1480)
cbbb29d is described below

commit cbbb29dc4d2bb712e8d020b0ce28c6f383501d39
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Thu Aug 26 15:32:50 2021 +0800

    [INLONG-749][Agent] Support read data from different time offset (#1480)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../inlong/agent/constants/JobConstants.java       |  2 +-
 .../plugin/fetcher/ManagerResultFormatter.java     |  1 +
 .../agent/plugin/fetcher/dtos/JobProfileDto.java   |  1 +
 .../agent/plugin/filter/DateFormatRegex.java       |  2 +-
 .../agent/plugin/trigger/DirectoryTrigger.java     | 16 ++++++++--
 .../inlong/agent/plugin/trigger/PathPattern.java   |  7 +++++
 .../apache/inlong/agent/plugin/TestFileAgent.java  | 35 +++++++++++++++-------
 .../agent/plugin/filter/TestDateFormatRegex.java   |  2 +-
 8 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
index b131f3a..b519799 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
@@ -47,7 +47,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_CHECKPOINT = "job.checkpoint";
 
     // offset for time
-    public static final String JOB_FILE_TIME_OFFSET = "job.file.time.offset";
+    public static final String JOB_FILE_TIME_OFFSET = "job.timeOffset";
 
     public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index d232b27..04e88ed 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -113,6 +113,7 @@ public class ManagerResultFormatter {
         job.setSource(DEFAULT_SOURCE);
         job.setSink(DEFAULT_BUS_SINK);
         job.setId(dataConfigs.getTaskId());
+        job.setTimeOffset(dataConfigs.getTimeOffset());
         job.setOp(dataConfigs.getOp());
         job.setDeliveryTime(dataConfigs.getDeliveryTime());
         if (!dataConfigs.getAdditionalAttr().isEmpty()) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index 4261375..056e8e3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -53,6 +53,7 @@ public class JobProfileDto {
         private String pattern;
         private String op;
         private String cycleUnit;
+        private String timeOffset;
         private String deliveryTime;
         private String addictiveString;
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
index 387797f..da9604e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
@@ -35,7 +35,7 @@ public class DateFormatRegex implements Filter {
     private static final String YEAR = "YYYY";
     private static final String MONTH = "MM";
     private static final String DAY = "DD";
-    private static final String HOUR = "hh";
+    private static final String HOUR = "HH";
 
     private static final String NORMAL_FORMATTER = "yyyyMMddHHmm";
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 85336d1..3ea477c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -216,9 +216,17 @@ public class DirectoryTrigger extends AbstractDaemon implements Trigger {
         submitWorker(watchEventHandler());
     }
 
-
     public void register(String pathPattern) throws IOException {
         PathPattern entity = new PathPattern(pathPattern);
+        innerRegister(pathPattern, entity);
+    }
+
+    public void register(String pathPattern, String offset) throws IOException {
+        PathPattern entity = new PathPattern(pathPattern, offset);
+        innerRegister(pathPattern, entity);
+    }
+
+    private void innerRegister(String pathPattern, PathPattern entity) throws IOException {
         List<WatchKey> tmpKeyList = new ArrayList<>();
         List<WatchKey> keyList = allWatchers.putIfAbsent(entity, tmpKeyList);
         if (keyList == null) {
@@ -255,7 +263,11 @@ public class DirectoryTrigger extends AbstractDaemon implements Trigger {
         this.profile = profile;
         if (this.profile.hasKey(JobConstants.JOB_DIR_FILTER_PATTERN)) {
             String pathPattern = this.profile.get(JobConstants.JOB_DIR_FILTER_PATTERN);
-            register(pathPattern);
+            String timeOffset = this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
+            if (timeOffset.isEmpty()) {
+                register(pathPattern);
+            }
+            register(pathPattern, timeOffset);
         }
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
index ea24fae..ef373fb 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
@@ -51,6 +51,13 @@ public class PathPattern {
         dateFormatRegex = DateFormatRegex.ofRegex(watchDir);
     }
 
+    public PathPattern(String watchDir, String offset) {
+        this.watchDir = watchDir;
+        rootDir = findRoot(watchDir);
+        subDirs = new HashSet<>();
+        dateFormatRegex = DateFormatRegex.ofRegex(watchDir).withOffset(offset);
+    }
+
     /**
      * find last existing path by pattern.
      * @param watchDir
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index ce9cc35..f842416 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_TIME_OFFSET;
 import static org.awaitility.Awaitility.await;
 
 import java.io.FileWriter;
@@ -93,15 +94,7 @@ public class TestFileAgent {
                 agent.submitJob(profile);
             }
         }
-        await().atMost(5, TimeUnit.MINUTES).until(() -> {
-            JobProfile jobConf = agent.getManager().getJobManager()
-                    .getJobConfDb().getJob(StateSearchKey.SUCCESS);
-            return jobConf != null;
-        });
-
-        JobProfile jobConf = agent.getManager().getJobManager()
-                .getJobConfDb().getJob(StateSearchKey.SUCCESS);
-        Assert.assertEquals(1, jobConf.getInt("job.id"));
+        assertJobSuccess();
     }
 
     @Test
@@ -148,7 +141,29 @@ public class TestFileAgent {
             }
         }
         createFiles(nowDate);
-        await().atMost(2, TimeUnit.MINUTES).until(() -> {
+        assertJobSuccess();
+    }
+
+    @Test
+    public void testTimeOffset() throws Exception {
+        String theDateBefore = AgentUtils.formatCurrentTimeWithOffset("yyyyMMdd", -1, 0, 0);
+        try (InputStream stream = LOADER.getResourceAsStream("fileAgentJob.json")) {
+            if (stream != null) {
+                String jobJson = IOUtils.toString(stream, StandardCharsets.UTF_8);
+                JobProfile profile = JobProfile.parseJsonStr(jobJson);
+                profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
+                    "YYYYMMDD").toString());
+                profile.set(JOB_FILE_TIME_OFFSET, "-1d");
+                profile.set(JOB_CYCLE_UNIT, "D");
+                agent.submitTriggerJob(profile);
+            }
+        }
+        createFiles(theDateBefore);
+        assertJobSuccess();
+    }
+
+    private void assertJobSuccess() {
+        await().atMost(5, TimeUnit.MINUTES).until(() -> {
             JobProfile jobConf = agent.getManager().getJobManager()
                 .getJobConfDb().getJob(StateSearchKey.SUCCESS);
             return jobConf != null;
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
index 6bd16d3..1417bbc 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
@@ -54,7 +54,7 @@ public class TestDateFormatRegex {
     public void testRegex() {
         File file = Paths.get(helper.getParentPath().toString(), "aad20201201_11.log").toFile();
         DateFormatRegex dateFormatRegex = DateFormatRegex
-            .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_hh.log").withFile(file);
+            .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_HH.log").withFile(file);
         dateFormatRegex.match();
         dateFormatRegex.getFormattedTime();
         Assert.assertEquals(helper.getParentPath().toString() + "/\\w{3}"