You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/31 11:06:20 UTC

[inlong] 01/02: [INLONG-5743][Agent] Support function read file data continuously through the monitoring file (#5744)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 6f84779eaa7845b0ba675710f6b380c364917d17
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Wed Aug 31 19:04:10 2022 +0800

    [INLONG-5743][Agent] Support function read file data continuously through the monitoring file (#5744)
---
 .../apache/inlong/agent/constant/JobConstants.java |  16 +++
 .../java/org/apache/inlong/agent/pojo/FileJob.java |  22 +++-
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   8 ++
 .../sources/reader/file/FileReaderOperator.java    |   8 +-
 .../sources/reader/file/MonitorTextFile.java       | 131 +++++++++++++++++++++
 .../plugin/sources/reader/file/TextFileReader.java |   7 ++
 .../src/test/resources/agent.properties            |   5 +-
 7 files changed, 191 insertions(+), 6 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index d913c0257..1894d1507 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -57,6 +57,9 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_FILE_META_FILTER_BY_LABELS = "job.fileJob.filterMetaByLabels";
     public static final String JOB_FILE_PROPERTIES = "job.fileJob.properties";
     public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileJob.dataSeparator";
+    public static final String JOB_FILE_MONITOR_INTERVAL = "job.fileJob.monitorInterval";
+    public static final String JOB_FILE_MONITOR_STATUS = "job.fileJob.monitorStatus";
+    public static final String JOB_FILE_MONITOR_EXPIRE = "job.fileJob.monitorExpire";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
@@ -141,5 +144,18 @@ public class JobConstants extends CommonConstants {
      */
     public static final int SYNC_SEND_OPEN = 1;
 
+    public static final String INTERVAL_MILLISECONDS = "500";
+
+    /**
+     * monitor switch, 1 true and 0 false
+     */
+    public static final String JOB_FILE_MONITOR_DEFAULT_STATUS = "1";
+
+    /**
+     * monitor expire time and the time in milliseconds.
+     * default value is -1 and stand for not expire time.
+     */
+    public static final String JOB_FILE_MONITOR_DEFAULT_EXPIRE = "-1";
+
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index bde908dbd..436dc3dd5 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -34,9 +34,9 @@ public class FileJob {
     private String addictiveString;
     private String collectType;
     private Line line;
+
     // INCREMENT 
     // FULL
-
     private String contentCollectType;
 
     private String envList;
@@ -49,6 +49,15 @@ public class FileJob {
 
     private Map<String, Object> properties;
 
+    // Monitor interval for file
+    private Long monitorInterval;
+
+    // Monitor switch, 1 true and 0 false
+    private Integer monitorStatus;
+
+    // Monitor expire time and the time in milliseconds
+    private Long monitorExpire;
+
     @Data
     public static class Dir {
 
@@ -108,9 +117,18 @@ public class FileJob {
         // Metadata filters by label, special parameters for K8S
         private Map<String, String> filterMetaByLabels;
 
-        // Properties for File
+        // Properties for file
         private Map<String, Object> properties;
 
+        // Monitor interval for file
+        private Long monitorInterval;
+
+        // Monitor switch, 1 true and 0 false
+        private Integer monitorStatus;
+        
+        // Monitor expire time and the time in milliseconds
+        private Long monitorExpire;
+
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index aa53b8346..e1a1482f0 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -132,6 +132,14 @@ public class JobProfileDto {
         if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
             fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels());
         }
+
+        if (null != fileJobTaskConfig.getMonitorInterval()) {
+            fileJob.setMonitorInterval(fileJobTaskConfig.getMonitorInterval());
+        }
+
+        if (null != fileJobTaskConfig.getMonitorStatus()) {
+            fileJob.setMonitorStatus(fileJobTaskConfig.getMonitorStatus());
+        }
         return fileJob;
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 927d06daf..5c35f85d9 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -61,10 +61,10 @@ public class FileReaderOperator extends AbstractReader {
     public Stream<String> stream;
     public Map<String, String> metadata;
     public JobProfile jobConf;
-    private Iterator<String> iterator;
+    public Iterator<String> iterator;
+    public volatile boolean finished = false;
     private long timeout;
     private long waitTimeout;
-
     private long lastTime = 0;
 
     private List<Validator> validators = new ArrayList<>();
@@ -107,6 +107,9 @@ public class FileReaderOperator extends AbstractReader {
 
     @Override
     public boolean isFinished() {
+        if (finished) {
+            return true;
+        }
         if (timeout == NEVER_STOP_SIGN) {
             return false;
         }
@@ -201,6 +204,7 @@ public class FileReaderOperator extends AbstractReader {
 
     @Override
     public void destroy() {
+        finished = true;
         if (stream == null) {
             return;
         }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
new file mode 100644
index 000000000..e684b2ce7
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.JobConstants.INTERVAL_MILLISECONDS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_DEFAULT_EXPIRE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXPIRE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL;
+
+/**
+ * monitor files
+ */
+public final class MonitorTextFile {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class);
+    private static volatile MonitorTextFile monitorTextFile = null;
+    // monitor thread pool
+    private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            60L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("monitor-file"));
+
+    private MonitorTextFile() {
+
+    }
+
+    /**
+     * Mode of singleton
+     * @return MonitorTextFile instance
+     */
+    public static MonitorTextFile getInstance() {
+        if (monitorTextFile == null) {
+            synchronized (MonitorTextFile.class) {
+                if (monitorTextFile == null) {
+                    monitorTextFile = new MonitorTextFile();
+                }
+            }
+        }
+        return monitorTextFile;
+    }
+
+    public void monitor(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) {
+        MonitorEventRunnable monitorEvent = new MonitorEventRunnable(fileReaderOperator, textFileReader);
+        runningPool.execute(monitorEvent);
+    }
+
+    /**
+     * monitor file event
+     */
+    private class MonitorEventRunnable implements Runnable {
+
+        private final FileReaderOperator fileReaderOperator;
+        private final TextFileReader textFileReader;
+        private final Long interval;
+        private final long startTime = System.currentTimeMillis();
+        /**
+         * the last modify time of the file
+         */
+        private BasicFileAttributes attributesBefore;
+
+        public MonitorEventRunnable(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) {
+            this.fileReaderOperator = fileReaderOperator;
+            this.textFileReader = textFileReader;
+            this.interval = Long
+                    .parseLong(fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, INTERVAL_MILLISECONDS));
+            try {
+                this.attributesBefore = Files
+                        .readAttributes(fileReaderOperator.file.toPath(), BasicFileAttributes.class);
+            } catch (IOException e) {
+                LOGGER.error("get {} last modify time error:", fileReaderOperator.file.getName(), e);
+            }
+        }
+
+        @Override
+        public void run() {
+            while (!this.fileReaderOperator.finished) {
+                try {
+                    long expireTime = Long.parseLong(fileReaderOperator.jobConf
+                            .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
+                    long currentTime = System.currentTimeMillis();
+                    if (expireTime != Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
+                            && currentTime - this.startTime > expireTime) {
+                        break;
+                    }
+                    listen();
+                    TimeUnit.MILLISECONDS.sleep(interval);
+                } catch (Exception e) {
+                    LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
+                }
+            }
+        }
+
+        private void listen() throws IOException {
+            BasicFileAttributes attributesAfter = Files
+                    .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class);
+            if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0
+                    && !this.fileReaderOperator.iterator.hasNext()) {
+                this.textFileReader.getData();
+                this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator();
+                this.attributesBefore = attributesAfter;
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
index 73ed01c98..bda3a5436 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -34,6 +34,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_DEFAULT_STATUS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_STATUS;
 
 /**
  * Text file reader
@@ -46,6 +48,10 @@ public final class TextFileReader extends AbstractFileReader {
 
     public TextFileReader(FileReaderOperator fileReaderOperator) {
         super.fileReaderOperator = fileReaderOperator;
+        if (fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_STATUS, JOB_FILE_MONITOR_DEFAULT_STATUS)
+                .equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) {
+            MonitorTextFile.getInstance().monitor(fileReaderOperator, this);
+        }
     }
 
     public void getData() throws IOException {
@@ -84,6 +90,7 @@ public final class TextFileReader extends AbstractFileReader {
         }
         lines = resultLines.isEmpty() ? lines : resultLines;
         fileReaderOperator.stream = lines.stream();
+        fileReaderOperator.position = fileReaderOperator.position + lines.size();
     }
 
 }
diff --git a/inlong-agent/agent-plugins/src/test/resources/agent.properties b/inlong-agent/agent-plugins/src/test/resources/agent.properties
index aedb395f8..0d2524f26 100755
--- a/inlong-agent/agent-plugins/src/test/resources/agent.properties
+++ b/inlong-agent/agent-plugins/src/test/resources/agent.properties
@@ -23,5 +23,6 @@ job.thread.running.core=10
 agent.manager.vip.http.host=127.0.0.1
 agent.manager.vip.http.port=8083
 agent.fetcher.classname=org.apache.inlong.agent.plugin.fetcher.ManagerFetcher
-agent.manager.auth.secretId=
-agent.manager.auth.secretKey=
+agent.manager.auth.secretId=admin
+agent.manager.auth.secretKey=admin
+agent.cluster.name=default_agent