You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/25 08:02:23 UTC
[incubator-inlong] branch master updated: [INLONG-748][Agent]
Support file name pattern YYYYDDMM (#1472)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2b7d70d [INLONG-748][Agent] Support file name pattern YYYYDDMM (#1472)
2b7d70d is described below
commit 2b7d70d1dad23ce26b83bb5c97ed9b5154ec0d46
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Wed Aug 25 16:02:20 2021 +0800
[INLONG-748][Agent] Support file name pattern YYYYDDMM (#1472)
Co-authored-by: stingpeng <st...@tencent.com>
---
.../plugin/fetcher/ManagerResultFormatter.java | 3 ++
.../agent/plugin/fetcher/dtos/DataConfig.java | 1 +
.../agent/plugin/fetcher/dtos/JobProfileDto.java | 1 +
.../inlong/agent/plugin/utils/PluginUtils.java | 2 +-
.../apache/inlong/agent/plugin/TestFileAgent.java | 34 ++++++++++++++++++++--
.../apache/inlong/agent/plugin/sinks/MockSink.java | 10 ++++++-
6 files changed, 46 insertions(+), 5 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index 6af439e..d232b27 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -118,6 +118,9 @@ public class ManagerResultFormatter {
if (!dataConfigs.getAdditionalAttr().isEmpty()) {
job.setAddictiveString(dataConfigs.getAdditionalAttr());
}
+ if (!dataConfigs.getCycleUnit().isEmpty()) {
+ job.setCycleUnit(dataConfigs.getCycleUnit());
+ }
return job;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
index e367489..c108d02 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
@@ -28,6 +28,7 @@ public class DataConfig {
private String dataStreamIdentifier;
private String deliveryTime;
private String fieldSplitter;
+ private String cycleUnit;
private String ip;
private String middlewareType;
private String mqMasterAddress;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index 0fee5e1..4261375 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -52,6 +52,7 @@ public class JobProfileDto {
private String channel;
private String pattern;
private String op;
+ private String cycleUnit;
private String deliveryTime;
private String addictiveString;
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 0867922..64cc070 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -83,7 +83,7 @@ public class PluginUtils {
public static Collection<File> findSuitFiles(JobProfile jobConf) {
String dirPattern = jobConf.get(JOB_DIR_FILTER_PATTERN);
- LOGGER.info("find files with dir pattern {}", dirPattern);
+ LOGGER.info("start to find files with dir pattern {}", dirPattern);
PathPattern pattern = new PathPattern(dirPattern);
updateRetryTime(jobConf, pattern);
int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 0adfb27..ce9cc35 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin;
+import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
import static org.awaitility.Awaitility.await;
@@ -37,6 +38,7 @@ import org.apache.inlong.agent.core.job.JobWrapper;
import org.apache.inlong.agent.core.trigger.TriggerManager;
import org.apache.inlong.agent.db.StateSearchKey;
import org.apache.inlong.agent.plugin.utils.TestUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -67,10 +69,10 @@ public class TestFileAgent {
helper.teardownAgentHome();
}
- private void createHugeFiles(String fileName) throws Exception {
+ private void createFiles(String fileName) throws Exception {
final Path hugeFile = Paths.get(testRootDir.toString(), fileName);
FileWriter writer = new FileWriter(hugeFile.toFile());
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 10; i++) {
writer.write(RECORD);
}
writer.flush();
@@ -80,7 +82,7 @@ public class TestFileAgent {
@Test
public void testFileAgent() throws Exception {
for (int i = 0; i < 10; i++) {
- createHugeFiles(String.format("hugeFile.%s.txt", i));
+ createFiles(String.format("hugeFile.%s.txt", i));
}
try (InputStream stream = LOADER.getResourceAsStream("fileAgentJob.json")) {
if (stream != null) {
@@ -130,5 +132,31 @@ public class TestFileAgent {
return result.get();
}
+ @Test
+ public void testCycleUnit() throws Exception {
+
+ String nowDate = AgentUtils.formatCurrentTimeWithoutOffset("yyyyMMdd");
+
+ try (InputStream stream = LOADER.getResourceAsStream("fileAgentJob.json")) {
+ if (stream != null) {
+ String jobJson = IOUtils.toString(stream, StandardCharsets.UTF_8);
+ JobProfile profile = JobProfile.parseJsonStr(jobJson);
+ profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
+ "YYYYMMDD").toString());
+ profile.set(JOB_CYCLE_UNIT, "D");
+ agent.submitTriggerJob(profile);
+ }
+ }
+ createFiles(nowDate);
+ await().atMost(2, TimeUnit.MINUTES).until(() -> {
+ JobProfile jobConf = agent.getManager().getJobManager()
+ .getJobConfDb().getJob(StateSearchKey.SUCCESS);
+ return jobConf != null;
+ });
+ JobProfile jobConf = agent.getManager().getJobManager()
+ .getJobConfDb().getJob(StateSearchKey.SUCCESS);
+ Assert.assertEquals(1, jobConf.getInt("job.id"));
+ }
+
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 414f1fd..7fe473c 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.plugin.sinks;
+import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
+import static org.apache.inlong.agent.constants.JobConstants.JOB_DATA_TIME;
import static org.apache.inlong.agent.constants.JobConstants.JOB_INSTANCE_ID;
import java.util.concurrent.atomic.AtomicLong;
@@ -24,6 +26,7 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Sink;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +37,7 @@ public class MockSink implements Sink {
private TaskPositionManager taskPositionManager;
private String sourceFileName;
private String jobInstanceId;
-
+ private long dataTime;
@Override
public void write(Message message) {
@@ -53,10 +56,15 @@ public class MockSink implements Sink {
public void init(JobProfile jobConf) {
taskPositionManager = TaskPositionManager.getTaskPositionManager();
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
+ dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
+ jobConf.get(JOB_CYCLE_UNIT, ""));
+ LOGGER.info("get dataTime is : {}", dataTime);
}
@Override
public void destroy() {
LOGGER.info("destroy mockSink, sink line number is : {}", number.get());
}
+
+
}