You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/06 11:29:40 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5803][Agent] Fix NPE when collect file data (#5804)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new c61b3f69b [INLONG-5803][Agent] Fix NPE when collect file data (#5804)
c61b3f69b is described below

commit c61b3f69ba23df59508bc7cbf3a7f7b552783b6b
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Tue Sep 6 19:20:48 2022 +0800

    [INLONG-5803][Agent] Fix NPE when collect file data (#5804)
---
 .../sources/reader/file/FileReaderOperator.java    |  4 ++--
 .../sources/reader/file/MonitorTextFile.java       | 22 ++++++++++++++++------
 2 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 5c35f85d9..2ff0bf189 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
 import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES;
@@ -192,9 +191,10 @@ public class FileReaderOperator extends AbstractReader {
         }
     }
 
+    // default value is -1 and never stop task
     private void initReadTimeout(JobProfile jobConf) {
         int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
-                DEFAULT_JOB_FILE_MAX_WAIT);
+                NEVER_STOP_SIGN);
         if (waitTime == NEVER_STOP_SIGN) {
             timeout = NEVER_STOP_SIGN;
         } else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index e684b2ce7..4a2644dba 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Objects;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +77,7 @@ public final class MonitorTextFile {
      */
     private class MonitorEventRunnable implements Runnable {
 
+        private static final int WAIT_TIME = 30;
         private final FileReaderOperator fileReaderOperator;
         private final TextFileReader textFileReader;
         private final Long interval;
@@ -100,8 +102,9 @@ public final class MonitorTextFile {
 
         @Override
         public void run() {
-            while (!this.fileReaderOperator.finished) {
-                try {
+            try {
+                TimeUnit.SECONDS.sleep(WAIT_TIME);
+                while (!this.fileReaderOperator.finished) {
                     long expireTime = Long.parseLong(fileReaderOperator.jobConf
                             .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
                     long currentTime = System.currentTimeMillis();
@@ -111,17 +114,24 @@ public final class MonitorTextFile {
                     }
                     listen();
                     TimeUnit.MILLISECONDS.sleep(interval);
-                } catch (Exception e) {
-                    LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
                 }
+            } catch (Exception e) {
+                LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
             }
         }
 
         private void listen() throws IOException {
             BasicFileAttributes attributesAfter = Files
                     .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class);
-            if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0
-                    && !this.fileReaderOperator.iterator.hasNext()) {
+            if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0) {
+                // Not triggered during data sending
+                if (Objects.nonNull(this.fileReaderOperator.iterator) && this.fileReaderOperator.iterator.hasNext()) {
+                    return;
+                }
+                // Set position 0 when split file
+                if (attributesBefore.creationTime().compareTo(attributesAfter.creationTime()) < 0) {
+                    this.fileReaderOperator.position = 0;
+                }
                 this.textFileReader.getData();
                 this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator();
                 this.attributesBefore = attributesAfter;