You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/08/26 07:32:55 UTC
[incubator-inlong] branch master updated: [INLONG-749][Agent]
Support read data from different time offset (#1480)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cbbb29d [INLONG-749][Agent] Support read data from different time offset (#1480)
cbbb29d is described below
commit cbbb29dc4d2bb712e8d020b0ce28c6f383501d39
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Thu Aug 26 15:32:50 2021 +0800
[INLONG-749][Agent] Support read data from different time offset (#1480)
Co-authored-by: stingpeng <st...@tencent.com>
---
.../inlong/agent/constants/JobConstants.java | 2 +-
.../plugin/fetcher/ManagerResultFormatter.java | 1 +
.../agent/plugin/fetcher/dtos/JobProfileDto.java | 1 +
.../agent/plugin/filter/DateFormatRegex.java | 2 +-
.../agent/plugin/trigger/DirectoryTrigger.java | 16 ++++++++--
.../inlong/agent/plugin/trigger/PathPattern.java | 7 +++++
.../apache/inlong/agent/plugin/TestFileAgent.java | 35 +++++++++++++++-------
.../agent/plugin/filter/TestDateFormatRegex.java | 2 +-
8 files changed, 51 insertions(+), 15 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
index b131f3a..b519799 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/JobConstants.java
@@ -47,7 +47,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_CHECKPOINT = "job.checkpoint";
// offset for time
- public static final String JOB_FILE_TIME_OFFSET = "job.file.time.offset";
+ public static final String JOB_FILE_TIME_OFFSET = "job.timeOffset";
public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d";
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index d232b27..04e88ed 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -113,6 +113,7 @@ public class ManagerResultFormatter {
job.setSource(DEFAULT_SOURCE);
job.setSink(DEFAULT_BUS_SINK);
job.setId(dataConfigs.getTaskId());
+ job.setTimeOffset(dataConfigs.getTimeOffset());
job.setOp(dataConfigs.getOp());
job.setDeliveryTime(dataConfigs.getDeliveryTime());
if (!dataConfigs.getAdditionalAttr().isEmpty()) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index 4261375..056e8e3 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -53,6 +53,7 @@ public class JobProfileDto {
private String pattern;
private String op;
private String cycleUnit;
+ private String timeOffset;
private String deliveryTime;
private String addictiveString;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
index 387797f..da9604e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/filter/DateFormatRegex.java
@@ -35,7 +35,7 @@ public class DateFormatRegex implements Filter {
private static final String YEAR = "YYYY";
private static final String MONTH = "MM";
private static final String DAY = "DD";
- private static final String HOUR = "hh";
+ private static final String HOUR = "HH";
private static final String NORMAL_FORMATTER = "yyyyMMddHHmm";
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 85336d1..3ea477c 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -216,9 +216,17 @@ public class DirectoryTrigger extends AbstractDaemon implements Trigger {
submitWorker(watchEventHandler());
}
-
public void register(String pathPattern) throws IOException {
PathPattern entity = new PathPattern(pathPattern);
+ innerRegister(pathPattern, entity);
+ }
+
+ public void register(String pathPattern, String offset) throws IOException {
+ PathPattern entity = new PathPattern(pathPattern, offset);
+ innerRegister(pathPattern, entity);
+ }
+
+ private void innerRegister(String pathPattern, PathPattern entity) throws IOException {
List<WatchKey> tmpKeyList = new ArrayList<>();
List<WatchKey> keyList = allWatchers.putIfAbsent(entity, tmpKeyList);
if (keyList == null) {
@@ -255,7 +263,11 @@ public class DirectoryTrigger extends AbstractDaemon implements Trigger {
this.profile = profile;
if (this.profile.hasKey(JobConstants.JOB_DIR_FILTER_PATTERN)) {
String pathPattern = this.profile.get(JobConstants.JOB_DIR_FILTER_PATTERN);
- register(pathPattern);
+ String timeOffset = this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
+ if (timeOffset.isEmpty()) {
+ register(pathPattern);
+ }
+ register(pathPattern, timeOffset);
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
index ea24fae..ef373fb 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java
@@ -51,6 +51,13 @@ public class PathPattern {
dateFormatRegex = DateFormatRegex.ofRegex(watchDir);
}
+ public PathPattern(String watchDir, String offset) {
+ this.watchDir = watchDir;
+ rootDir = findRoot(watchDir);
+ subDirs = new HashSet<>();
+ dateFormatRegex = DateFormatRegex.ofRegex(watchDir).withOffset(offset);
+ }
+
/**
* find last existing path by pattern.
* @param watchDir
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index ce9cc35..f842416 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin;
import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_TIME_OFFSET;
import static org.awaitility.Awaitility.await;
import java.io.FileWriter;
@@ -93,15 +94,7 @@ public class TestFileAgent {
agent.submitJob(profile);
}
}
- await().atMost(5, TimeUnit.MINUTES).until(() -> {
- JobProfile jobConf = agent.getManager().getJobManager()
- .getJobConfDb().getJob(StateSearchKey.SUCCESS);
- return jobConf != null;
- });
-
- JobProfile jobConf = agent.getManager().getJobManager()
- .getJobConfDb().getJob(StateSearchKey.SUCCESS);
- Assert.assertEquals(1, jobConf.getInt("job.id"));
+ assertJobSuccess();
}
@Test
@@ -148,7 +141,29 @@ public class TestFileAgent {
}
}
createFiles(nowDate);
- await().atMost(2, TimeUnit.MINUTES).until(() -> {
+ assertJobSuccess();
+ }
+
+ @Test
+ public void testTimeOffset() throws Exception {
+ String theDateBefore = AgentUtils.formatCurrentTimeWithOffset("yyyyMMdd", -1, 0, 0);
+ try (InputStream stream = LOADER.getResourceAsStream("fileAgentJob.json")) {
+ if (stream != null) {
+ String jobJson = IOUtils.toString(stream, StandardCharsets.UTF_8);
+ JobProfile profile = JobProfile.parseJsonStr(jobJson);
+ profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
+ "YYYYMMDD").toString());
+ profile.set(JOB_FILE_TIME_OFFSET, "-1d");
+ profile.set(JOB_CYCLE_UNIT, "D");
+ agent.submitTriggerJob(profile);
+ }
+ }
+ createFiles(theDateBefore);
+ assertJobSuccess();
+ }
+
+ private void assertJobSuccess() {
+ await().atMost(5, TimeUnit.MINUTES).until(() -> {
JobProfile jobConf = agent.getManager().getJobManager()
.getJobConfDb().getJob(StateSearchKey.SUCCESS);
return jobConf != null;
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
index 6bd16d3..1417bbc 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java
@@ -54,7 +54,7 @@ public class TestDateFormatRegex {
public void testRegex() {
File file = Paths.get(helper.getParentPath().toString(), "aad20201201_11.log").toFile();
DateFormatRegex dateFormatRegex = DateFormatRegex
- .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_hh.log").withFile(file);
+ .ofRegex(helper.getParentPath().toString() + "/\\w{3}YYYYMMDD_HH.log").withFile(file);
dateFormatRegex.match();
dateFormatRegex.getFormattedTime();
Assert.assertEquals(helper.getParentPath().toString() + "/\\w{3}"