You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/06 11:29:40 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5803][Agent] Fix NPE when collect file data (#5804)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new c61b3f69b [INLONG-5803][Agent] Fix NPE when collect file data (#5804)
c61b3f69b is described below
commit c61b3f69ba23df59508bc7cbf3a7f7b552783b6b
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Tue Sep 6 19:20:48 2022 +0800
[INLONG-5803][Agent] Fix NPE when collect file data (#5804)
---
.../sources/reader/file/FileReaderOperator.java | 4 ++--
.../sources/reader/file/MonitorTextFile.java | 22 ++++++++++++++++------
2 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 5c35f85d9..2ff0bf189 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES;
@@ -192,9 +191,10 @@ public class FileReaderOperator extends AbstractReader {
}
}
+ // default value is -1 and never stop task
private void initReadTimeout(JobProfile jobConf) {
int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
- DEFAULT_JOB_FILE_MAX_WAIT);
+ NEVER_STOP_SIGN);
if (waitTime == NEVER_STOP_SIGN) {
timeout = NEVER_STOP_SIGN;
} else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index e684b2ce7..4a2644dba 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -76,6 +77,7 @@ public final class MonitorTextFile {
*/
private class MonitorEventRunnable implements Runnable {
+ private static final int WAIT_TIME = 30;
private final FileReaderOperator fileReaderOperator;
private final TextFileReader textFileReader;
private final Long interval;
@@ -100,8 +102,9 @@ public final class MonitorTextFile {
@Override
public void run() {
- while (!this.fileReaderOperator.finished) {
- try {
+ try {
+ TimeUnit.SECONDS.sleep(WAIT_TIME);
+ while (!this.fileReaderOperator.finished) {
long expireTime = Long.parseLong(fileReaderOperator.jobConf
.get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
long currentTime = System.currentTimeMillis();
@@ -111,17 +114,24 @@ public final class MonitorTextFile {
}
listen();
TimeUnit.MILLISECONDS.sleep(interval);
- } catch (Exception e) {
- LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
}
+ } catch (Exception e) {
+ LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
}
}
private void listen() throws IOException {
BasicFileAttributes attributesAfter = Files
.readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class);
- if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0
- && !this.fileReaderOperator.iterator.hasNext()) {
+ if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0) {
+ // Not triggered during data sending
+ if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator.hasNext()) {
+ return;
+ }
+ // Set position 0 when split file
+ if (attributesBefore.creationTime().compareTo(attributesAfter.creationTime()) < 0) {
+ this.fileReaderOperator.position = 0;
+ }
this.textFileReader.getData();
this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator();
this.attributesBefore = attributesAfter;