You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/05 03:36:04 UTC
[inlong] branch master updated: [INLONG-5347][Agent] Incremental and full reads of file contents (#5348)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 474b90434 [INLONG-5347][Agent] Incremental and full reads of file contents (#5348)
474b90434 is described below
commit 474b90434653964fe8052fdf16888c199d3fbc57
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Aug 5 11:35:59 2022 +0800
[INLONG-5347][Agent] Incremental and full reads of file contents (#5348)
---
.../inlong/agent/constant/DataCollectType.java | 35 ++++++++++++++++++++++
.../apache/inlong/agent/constant/JobConstants.java | 1 +
.../java/org/apache/inlong/agent/pojo/FileJob.java | 6 ++++
.../apache/inlong/agent/pojo/JobProfileDto.java | 1 +
.../agent/plugin/sources/TextFileSource.java | 28 +++++++++++++++--
.../plugin/sources/reader/TextFileReader.java | 2 +-
.../agent/plugin/sources/TestTextFileReader.java | 33 ++++++++++++++++++++
7 files changed, 102 insertions(+), 4 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/DataCollectType.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/DataCollectType.java
new file mode 100644
index 000000000..77759a51c
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/DataCollectType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Collection type of data content
+ */
+public class DataCollectType {
+
+ /**
+ * increment of data
+ */
+ public static final String INCREMENT = "INCREMENT";
+
+ /**
+ * full data
+ */
+ public static final String FULL = "FULL";
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index cb7db876c..cd2f91f36 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -52,6 +52,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
public static final String JOB_FILE_COLLECT_TYPE = "job.fileJob.collectType";
public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern";
+ public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType";
//Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index a37e992bf..22d716df4 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -31,6 +31,10 @@ public class FileJob {
private String addictiveString;
private String collectType;
private Line line;
+ // INCREMENT
+ // FULL
+
+ private String contentCollectType;
@Data
public static class Dir {
@@ -71,6 +75,8 @@ public class FileJob {
private String collectType;
private String lineEndPattern;
+
+ private String contentCollectType;
}
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 754c465cf..285269028 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -105,6 +105,7 @@ public class JobProfileDto {
dir.setPattern(fileJobTaskConfig.getPattern());
fileJob.setDir(dir);
fileJob.setCollectType(fileJobTaskConfig.getCollectType());
+ fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
if (fileJobTaskConfig.getTimeOffset() != null) {
fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 9d5adf674..a17bb627e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -18,6 +18,8 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.DataCollectType;
+import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
@@ -26,6 +28,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -62,10 +67,10 @@ public class TextFileSource implements Source {
List<Reader> result = new ArrayList<>();
String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
for (File file : allFiles) {
- int seekPosition = jobConf.getInt(file.getAbsolutePath() + POSITION_SUFFIX, 0);
- LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", seekPosition,
+ int startPosition = getStartPosition(jobConf, file);
+ LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", startPosition,
jobConf.getInstanceId(), file.getAbsolutePath());
- TextFileReader textFileReader = new TextFileReader(file, seekPosition);
+ TextFileReader textFileReader = new TextFileReader(file, startPosition);
long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT);
textFileReader.setWaitMillisecond(waitTimeout);
addValidator(filterPattern, textFileReader);
@@ -76,6 +81,23 @@ public class TextFileSource implements Source {
return result;
}
+ private int getStartPosition(JobProfile jobConf, File file) {
+ int seekPosition;
+ if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) && DataCollectType.INCREMENT
+ .equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))) {
+ try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(file.getPath()))) {
+ lineNumberReader.skip(Long.MAX_VALUE);
+ seekPosition = lineNumberReader.getLineNumber() + 1;
+ return seekPosition;
+ } catch (IOException ex) {
+ LOGGER.error("get position error, file absolute path: {}", file.getAbsolutePath());
+ throw new RuntimeException(ex);
+ }
+ }
+ seekPosition = jobConf.getInt(file.getAbsolutePath() + POSITION_SUFFIX, 0);
+ return seekPosition;
+ }
+
private void addValidator(String filterPattern, TextFileReader textFileReader) {
textFileReader.addPatternValidator(filterPattern);
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index a72e258b1..082a83f07 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -179,7 +179,7 @@ public class TextFileReader extends AbstractReader {
LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, md5);
}
LOGGER.info("file name for task is {}, md5 is {}", file, md5);
- //split line and column
+ //split file line
getFileStream(jobConf);
if (Objects.nonNull(stream)) {
iterator = stream.iterator();
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index c8c08eceb..6ad27e738 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.FileCollectType;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
@@ -52,6 +53,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROU
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_COLLECT_TYPE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
@@ -156,6 +158,37 @@ public class TestTextFileReader {
}
}
+ /**
+ * increment of file data
+ */
+ @Test
+ public void testIncrementData() throws Exception {
+ URI uri = getClass().getClassLoader().getResource("test").toURI();
+ JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
+ String mainPath = Paths.get(uri).toString();
+ jobConfiguration.set(JOB_DIR_FILTER_PATTERN, Paths.get(mainPath,
+ "[1].txt").toFile().getAbsolutePath());
+ jobConfiguration.set(JOB_INSTANCE_ID, "test");
+ jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+ jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+ jobConfiguration.set(JOB_FILE_COLLECT_TYPE, FileCollectType.FULL);
+ jobConfiguration.set(JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
+ TextFileSource fileSource = new TextFileSource();
+ List<Reader> readerList = fileSource.split(jobConfiguration);
+ Assert.assertEquals(1, readerList.size());
+ Reader reader = readerList.get(0);
+ reader.init(jobConfiguration);
+
+ while (!reader.isFinished()) {
+ Message message = reader.read();
+ if (null != message) {
+ LOGGER.info("message is {}", message.toString());
+ }
+ Assert.assertNull(message);
+ break;
+ }
+ }
+
@Test
public void testTextSeekReader() throws Exception {
Path localPath = Paths.get(testDir.toString(), "test.txt");