You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/05 03:36:04 UTC

[inlong] branch master updated: [INLONG-5347][Agent] Incremental and full reads of file contents (#5348)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 474b90434 [INLONG-5347][Agent] Incremental and full reads of file contents (#5348)
474b90434 is described below

commit 474b90434653964fe8052fdf16888c199d3fbc57
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Aug 5 11:35:59 2022 +0800

    [INLONG-5347][Agent] Incremental and full reads of file contents (#5348)
---
 .../inlong/agent/constant/DataCollectType.java     | 35 ++++++++++++++++++++++
 .../apache/inlong/agent/constant/JobConstants.java |  1 +
 .../java/org/apache/inlong/agent/pojo/FileJob.java |  6 ++++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  1 +
 .../agent/plugin/sources/TextFileSource.java       | 28 +++++++++++++++--
 .../plugin/sources/reader/TextFileReader.java      |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 33 ++++++++++++++++++++
 7 files changed, 102 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/DataCollectType.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/DataCollectType.java
new file mode 100644
index 000000000..77759a51c
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/DataCollectType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Collection type of data content
+ */
+public class DataCollectType {
+
+    /**
+     * increment of data
+     */
+    public static final String INCREMENT = "INCREMENT";
+
+    /**
+     * full data
+     */
+    public static final String FULL = "FULL";
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index cb7db876c..cd2f91f36 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -52,6 +52,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
     public static final String JOB_FILE_COLLECT_TYPE = "job.fileJob.collectType";
     public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern";
+    public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index a37e992bf..22d716df4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -31,6 +31,10 @@ public class FileJob {
     private String addictiveString;
     private String collectType;
     private Line line;
+    // INCREMENT 
+    // FULL
+
+    private String contentCollectType;
 
     @Data
     public static class Dir {
@@ -71,6 +75,8 @@ public class FileJob {
         private String collectType;
 
         private String lineEndPattern;
+
+        private String contentCollectType;
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 754c465cf..285269028 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -105,6 +105,7 @@ public class JobProfileDto {
         dir.setPattern(fileJobTaskConfig.getPattern());
         fileJob.setDir(dir);
         fileJob.setCollectType(fileJobTaskConfig.getCollectType());
+        fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
 
         if (fileJobTaskConfig.getTimeOffset() != null) {
             fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 9d5adf674..a17bb627e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.DataCollectType;
+import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
 import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
@@ -26,6 +28,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -62,10 +67,10 @@ public class TextFileSource implements Source {
         List<Reader> result = new ArrayList<>();
         String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
         for (File file : allFiles) {
-            int seekPosition = jobConf.getInt(file.getAbsolutePath() + POSITION_SUFFIX, 0);
-            LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", seekPosition,
+            int startPosition = getStartPosition(jobConf, file);
+            LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", startPosition,
                     jobConf.getInstanceId(), file.getAbsolutePath());
-            TextFileReader textFileReader = new TextFileReader(file, seekPosition);
+            TextFileReader textFileReader = new TextFileReader(file, startPosition);
             long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT);
             textFileReader.setWaitMillisecond(waitTimeout);
             addValidator(filterPattern, textFileReader);
@@ -76,6 +81,23 @@ public class TextFileSource implements Source {
         return result;
     }
 
+    private int getStartPosition(JobProfile jobConf, File file) {
+        int seekPosition;
+        if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) && DataCollectType.INCREMENT
+                .equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))) {
+            try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(file.getPath()))) {
+                lineNumberReader.skip(Long.MAX_VALUE);
+                seekPosition = lineNumberReader.getLineNumber() + 1;
+                return seekPosition;
+            } catch (IOException ex) {
+                LOGGER.error("get position error, file absolute path: {}", file.getAbsolutePath());
+                throw new RuntimeException(ex);
+            }
+        }
+        seekPosition = jobConf.getInt(file.getAbsolutePath() + POSITION_SUFFIX, 0);
+        return seekPosition;
+    }
+
     private void addValidator(String filterPattern, TextFileReader textFileReader) {
         textFileReader.addPatternValidator(filterPattern);
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index a72e258b1..082a83f07 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -179,7 +179,7 @@ public class TextFileReader extends AbstractReader {
                 LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, md5);
             }
             LOGGER.info("file name for task is {}, md5 is {}", file, md5);
-            //split line and column
+            //split file line
             getFileStream(jobConf);
             if (Objects.nonNull(stream)) {
                 iterator = stream.iterator();
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index c8c08eceb..6ad27e738 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.FileCollectType;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
@@ -52,6 +53,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROU
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYPE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
@@ -156,6 +158,37 @@ public class TestTextFileReader {
         }
     }
 
+    /**
+     * increment of file data
+     */
+    @Test
+    public void testIncrementData() throws Exception {
+        URI uri = getClass().getClassLoader().getResource("test").toURI();
+        JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
+        String mainPath = Paths.get(uri).toString();
+        jobConfiguration.set(JOB_DIR_FILTER_PATTERN, Paths.get(mainPath,
+                "[1].txt").toFile().getAbsolutePath());
+        jobConfiguration.set(JOB_INSTANCE_ID, "test");
+        jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+        jobConfiguration.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
+        jobConfiguration.set(JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
+        TextFileSource fileSource = new TextFileSource();
+        List<Reader> readerList = fileSource.split(jobConfiguration);
+        Assert.assertEquals(1, readerList.size());
+        Reader reader = readerList.get(0);
+        reader.init(jobConfiguration);
+
+        while (!reader.isFinished()) {
+            Message message = reader.read();
+            if (null != message) {
+                LOGGER.info("message is {}", message.toString());
+            }
+            Assert.assertNull(message);
+            break;
+        }
+    }
+
     @Test
     public void testTextSeekReader() throws Exception {
         Path localPath = Paths.get(testDir.toString(), "test.txt");