You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/25 08:02:23 UTC

[incubator-inlong] branch master updated: [INLONG-748][Agent] Support file name pattern YYYYDDMM (#1472)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b7d70d  [INLONG-748][Agent] Support file name pattern YYYYDDMM (#1472)
2b7d70d is described below

commit 2b7d70d1dad23ce26b83bb5c97ed9b5154ec0d46
Author: ziruipeng <zp...@connect.ust.hk>
AuthorDate: Wed Aug 25 16:02:20 2021 +0800

    [INLONG-748][Agent] Support file name pattern YYYYDDMM (#1472)
    
    Co-authored-by: stingpeng <st...@tencent.com>
---
 .../plugin/fetcher/ManagerResultFormatter.java     |  3 ++
 .../agent/plugin/fetcher/dtos/DataConfig.java      |  1 +
 .../agent/plugin/fetcher/dtos/JobProfileDto.java   |  1 +
 .../inlong/agent/plugin/utils/PluginUtils.java     |  2 +-
 .../apache/inlong/agent/plugin/TestFileAgent.java  | 34 ++++++++++++++++++++--
 .../apache/inlong/agent/plugin/sinks/MockSink.java | 10 ++++++-
 6 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
index 6af439e..d232b27 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerResultFormatter.java
@@ -118,6 +118,9 @@ public class ManagerResultFormatter {
         if (!dataConfigs.getAdditionalAttr().isEmpty()) {
             job.setAddictiveString(dataConfigs.getAdditionalAttr());
         }
+        if (!dataConfigs.getCycleUnit().isEmpty()) {
+            job.setCycleUnit(dataConfigs.getCycleUnit());
+        }
         return job;
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
index e367489..c108d02 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/DataConfig.java
@@ -28,6 +28,7 @@ public class DataConfig {
     private String dataStreamIdentifier;
     private String deliveryTime;
     private String fieldSplitter;
+    private String cycleUnit;
     private String ip;
     private String middlewareType;
     private String mqMasterAddress;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
index 0fee5e1..4261375 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/dtos/JobProfileDto.java
@@ -52,6 +52,7 @@ public class JobProfileDto {
         private String channel;
         private String pattern;
         private String op;
+        private String cycleUnit;
         private String deliveryTime;
         private String addictiveString;
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 0867922..64cc070 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -83,7 +83,7 @@ public class PluginUtils {
 
     public static Collection<File> findSuitFiles(JobProfile jobConf) {
         String dirPattern = jobConf.get(JOB_DIR_FILTER_PATTERN);
-        LOGGER.info("find files with dir pattern {}", dirPattern);
+        LOGGER.info("start to find files with dir pattern {}", dirPattern);
         PathPattern pattern = new PathPattern(dirPattern);
         updateRetryTime(jobConf, pattern);
         int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 0adfb27..ce9cc35 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.agent.plugin;
 
+import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
 import static org.awaitility.Awaitility.await;
@@ -37,6 +38,7 @@ import org.apache.inlong.agent.core.job.JobWrapper;
 import org.apache.inlong.agent.core.trigger.TriggerManager;
 import org.apache.inlong.agent.db.StateSearchKey;
 import org.apache.inlong.agent.plugin.utils.TestUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -67,10 +69,10 @@ public class TestFileAgent {
         helper.teardownAgentHome();
     }
 
-    private void createHugeFiles(String fileName) throws Exception {
+    private void createFiles(String fileName) throws Exception {
         final Path hugeFile = Paths.get(testRootDir.toString(), fileName);
         FileWriter writer = new FileWriter(hugeFile.toFile());
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 10; i++) {
             writer.write(RECORD);
         }
         writer.flush();
@@ -80,7 +82,7 @@ public class TestFileAgent {
     @Test
     public void testFileAgent() throws Exception {
         for (int i = 0; i < 10; i++) {
-            createHugeFiles(String.format("hugeFile.%s.txt", i));
+            createFiles(String.format("hugeFile.%s.txt", i));
         }
         try (InputStream stream = LOADER.getResourceAsStream("fileAgentJob.json")) {
             if (stream != null) {
@@ -130,5 +132,31 @@ public class TestFileAgent {
         return result.get();
     }
 
+    @Test
+    public void testCycleUnit() throws Exception {
+
+        String nowDate = AgentUtils.formatCurrentTimeWithoutOffset("yyyyMMdd");
+
+        try (InputStream stream = LOADER.getResourceAsStream("fileAgentJob.json")) {
+            if (stream != null) {
+                String jobJson = IOUtils.toString(stream, StandardCharsets.UTF_8);
+                JobProfile profile = JobProfile.parseJsonStr(jobJson);
+                profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
+                    "YYYYMMDD").toString());
+                profile.set(JOB_CYCLE_UNIT, "D");
+                agent.submitTriggerJob(profile);
+            }
+        }
+        createFiles(nowDate);
+        await().atMost(2, TimeUnit.MINUTES).until(() -> {
+            JobProfile jobConf = agent.getManager().getJobManager()
+                .getJobConfDb().getJob(StateSearchKey.SUCCESS);
+            return jobConf != null;
+        });
+        JobProfile jobConf = agent.getManager().getJobManager()
+            .getJobConfDb().getJob(StateSearchKey.SUCCESS);
+        Assert.assertEquals(1, jobConf.getInt("job.id"));
+    }
+
 
 }
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index 414f1fd..7fe473c 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
+import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
+import static org.apache.inlong.agent.constants.JobConstants.JOB_DATA_TIME;
 import static org.apache.inlong.agent.constants.JobConstants.JOB_INSTANCE_ID;
 
 import java.util.concurrent.atomic.AtomicLong;
@@ -24,6 +26,7 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Sink;
+import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +37,7 @@ public class MockSink implements Sink {
     private TaskPositionManager taskPositionManager;
     private String sourceFileName;
     private String jobInstanceId;
-
+    private long dataTime;
 
     @Override
     public void write(Message message) {
@@ -53,10 +56,15 @@ public class MockSink implements Sink {
     public void init(JobProfile jobConf) {
         taskPositionManager = TaskPositionManager.getTaskPositionManager();
         jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
+        dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
+            jobConf.get(JOB_CYCLE_UNIT, ""));
+        LOGGER.info("get dataTime is : {}", dataTime);
     }
 
     @Override
     public void destroy() {
         LOGGER.info("destroy mockSink, sink line number is : {}", number.get());
     }
+
+
 }