You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/31 11:06:19 UTC

[inlong] branch release-1.3.0 updated (b5df3e01f -> d20a0ae69)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from b5df3e01f [INLONG-5746]][Sort] Add spi dictionary of Inlong pb msg (#5747)
     new 6f84779ea [INLONG-5743][Agent] Support function read file data continuously through the monitoring file (#5744)
     new d20a0ae69 [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/inlong/agent/constant/JobConstants.java |  16 +++
 .../java/org/apache/inlong/agent/pojo/FileJob.java |  22 +++-
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   8 ++
 .../sources/reader/file/FileReaderOperator.java    |   8 +-
 .../sources/reader/file/MonitorTextFile.java       | 131 +++++++++++++++++++++
 .../plugin/sources/reader/file/TextFileReader.java |   7 ++
 .../src/test/resources/agent.properties            |   5 +-
 .../sort/pulsar/withoutadmin/PulsarFetcher.java    |   6 +-
 .../sort/pulsar/withoutadmin/ReaderThread.java     |  42 ++++++-
 .../inlongmsg/InLongMsgDeserializationSchema.java  |   2 +
 10 files changed, 235 insertions(+), 12 deletions(-)
 create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java


[inlong] 02/02: [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d20a0ae69309e6c1045cacb04a16a628c941350c
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Wed Aug 31 19:04:56 2022 +0800

    [INLONG-5754][Sort] Fix Pulsar connecotr deserialize complex format with multiple data will cause data loss (#5755)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../sort/pulsar/withoutadmin/PulsarFetcher.java    |  6 ++--
 .../sort/pulsar/withoutadmin/ReaderThread.java     | 42 +++++++++++++++++++---
 .../inlongmsg/InLongMsgDeserializationSchema.java  |  2 ++
 3 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
index cb5933d56..bcaa66301 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
@@ -50,6 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
@@ -398,14 +399,15 @@ public class PulsarFetcher<T> {
      * @param pulsarEventTimestamp The timestamp of the pulsar record
      */
     protected void emitRecordsWithTimestamps(
-            T record,
+            Queue<T> records,
             PulsarTopicState<T> partitionState,
             MessageId offset,
             long pulsarEventTimestamp) {
         // emit the records, using the checkpoint lock to guarantee
         // atomicity of record emission and offset state update
         synchronized (checkpointLock) {
-            if (record != null) {
+            T record;
+            while ((record = records.poll()) != null) {
                 long timestamp = partitionState.extractTimestamp(record, pulsarEventTimestamp);
                 sourceContext.collectWithTimestamp(record, timestamp);
 
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
index cff23fc00..af1fac9fc 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
 import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
 import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.util.Collector;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,7 +37,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -59,6 +62,7 @@ public class ReaderThread<T> extends Thread {
     protected boolean excludeMessageId = false;
     private boolean failOnDataLoss = true;
     private boolean useEarliestWhenDataLoss = false;
+    private final PulsarCollector pulsarCollector;
 
     protected volatile boolean running = true;
     protected volatile boolean closed = false;
@@ -85,6 +89,7 @@ public class ReaderThread<T> extends Thread {
 
         this.topicRange = state.getTopicRange();
         this.startMessageId = state.getOffset();
+        this.pulsarCollector = new PulsarCollector();
     }
 
     public ReaderThread(
@@ -151,12 +156,13 @@ public class ReaderThread<T> extends Thread {
 
     protected void emitRecord(Message<T> message) throws IOException {
         MessageId messageId = message.getMessageId();
-        final T record = deserializer.deserialize(message);
-        if (deserializer.isEndOfStream(record)) {
+        deserializer.deserialize(message, pulsarCollector);
+
+        owner.emitRecordsWithTimestamps(pulsarCollector.getRecords(), state, messageId, message.getEventTime());
+        if (pulsarCollector.isEndOfStreamSignalled()) {
+            // end of stream signaled
             running = false;
-            return;
         }
-        owner.emitRecordsWithTimestamps(record, state, messageId, message.getEventTime());
     }
 
     public void cancel() throws IOException {
@@ -219,4 +225,32 @@ public class ReaderThread<T> extends Thread {
                             r.getClass().toString()));
         }
     }
+
+    private class PulsarCollector implements Collector<T> {
+        private final Queue<T> records = new ArrayDeque<>();
+
+        private boolean endOfStreamSignalled = false;
+
+        @Override
+        public void collect(T record) {
+            // do not emit subsequent elements if the end of the stream reached
+            if (endOfStreamSignalled || deserializer.isEndOfStream(record)) {
+                endOfStreamSignalled = true;
+                return;
+            }
+            records.add(record);
+        }
+
+        public Queue<T> getRecords() {
+            return records;
+        }
+
+        public boolean isEndOfStreamSignalled() {
+            return endOfStreamSignalled;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
 }
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index ba41a7447..d14c982e9 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import com.esotericsoftware.minlog.Log;
 import com.google.common.base.Objects;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -82,6 +83,7 @@ public class InLongMsgDeserializationSchema implements DeserializationSchema<Row
                 head = InLongMsgUtils.parseHead(attr);
             } catch (Throwable t) {
                 if (ignoreErrors) {
+                    Log.warn("Ignore inlong msg attr({})parse error.", attr, t);
                     continue;
                 }
                 throw new IOException(


[inlong] 01/02: [INLONG-5743][Agent] Support function read file data continuously through the monitoring file (#5744)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 6f84779eaa7845b0ba675710f6b380c364917d17
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Wed Aug 31 19:04:10 2022 +0800

    [INLONG-5743][Agent] Support function read file data continuously through the monitoring file (#5744)
---
 .../apache/inlong/agent/constant/JobConstants.java |  16 +++
 .../java/org/apache/inlong/agent/pojo/FileJob.java |  22 +++-
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   8 ++
 .../sources/reader/file/FileReaderOperator.java    |   8 +-
 .../sources/reader/file/MonitorTextFile.java       | 131 +++++++++++++++++++++
 .../plugin/sources/reader/file/TextFileReader.java |   7 ++
 .../src/test/resources/agent.properties            |   5 +-
 7 files changed, 191 insertions(+), 6 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index d913c0257..1894d1507 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -57,6 +57,9 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_FILE_META_FILTER_BY_LABELS = "job.fileJob.filterMetaByLabels";
     public static final String JOB_FILE_PROPERTIES = "job.fileJob.properties";
     public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileJob.dataSeparator";
+    public static final String JOB_FILE_MONITOR_INTERVAL = "job.fileJob.monitorInterval";
+    public static final String JOB_FILE_MONITOR_STATUS = "job.fileJob.monitorStatus";
+    public static final String JOB_FILE_MONITOR_EXPIRE = "job.fileJob.monitorExpire";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
@@ -141,5 +144,18 @@ public class JobConstants extends CommonConstants {
      */
     public static final int SYNC_SEND_OPEN = 1;
 
+    public static final String INTERVAL_MILLISECONDS = "500";
+
+    /**
+     * monitor switch, 1 true and 0 false
+     */
+    public static final String JOB_FILE_MONITOR_DEFAULT_STATUS = "1";
+
+    /**
+     * monitor expire time and the time in milliseconds.
+     * default value is -1 and stand for not expire time.
+     */
+    public static final String JOB_FILE_MONITOR_DEFAULT_EXPIRE = "-1";
+
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index bde908dbd..436dc3dd5 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -34,9 +34,9 @@ public class FileJob {
     private String addictiveString;
     private String collectType;
     private Line line;
+
     // INCREMENT 
     // FULL
-
     private String contentCollectType;
 
     private String envList;
@@ -49,6 +49,15 @@ public class FileJob {
 
     private Map<String, Object> properties;
 
+    // Monitor interval for file
+    private Long monitorInterval;
+
+    // Monitor switch, 1 true and 0 false
+    private Integer monitorStatus;
+
+    // Monitor expire time and the time in milliseconds
+    private Long monitorExpire;
+
     @Data
     public static class Dir {
 
@@ -108,9 +117,18 @@ public class FileJob {
         // Metadata filters by label, special parameters for K8S
         private Map<String, String> filterMetaByLabels;
 
-        // Properties for File
+        // Properties for file
         private Map<String, Object> properties;
 
+        // Monitor interval for file
+        private Long monitorInterval;
+
+        // Monitor switch, 1 true and 0 false
+        private Integer monitorStatus;
+        
+        // Monitor expire time and the time in milliseconds
+        private Long monitorExpire;
+
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index aa53b8346..e1a1482f0 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -132,6 +132,14 @@ public class JobProfileDto {
         if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
             fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels());
         }
+
+        if (null != fileJobTaskConfig.getMonitorInterval()) {
+            fileJob.setMonitorInterval(fileJobTaskConfig.getMonitorInterval());
+        }
+
+        if (null != fileJobTaskConfig.getMonitorStatus()) {
+            fileJob.setMonitorStatus(fileJobTaskConfig.getMonitorStatus());
+        }
         return fileJob;
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 927d06daf..5c35f85d9 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -61,10 +61,10 @@ public class FileReaderOperator extends AbstractReader {
     public Stream<String> stream;
     public Map<String, String> metadata;
     public JobProfile jobConf;
-    private Iterator<String> iterator;
+    public Iterator<String> iterator;
+    public volatile boolean finished = false;
     private long timeout;
     private long waitTimeout;
-
     private long lastTime = 0;
 
     private List<Validator> validators = new ArrayList<>();
@@ -107,6 +107,9 @@ public class FileReaderOperator extends AbstractReader {
 
     @Override
     public boolean isFinished() {
+        if (finished) {
+            return true;
+        }
         if (timeout == NEVER_STOP_SIGN) {
             return false;
         }
@@ -201,6 +204,7 @@ public class FileReaderOperator extends AbstractReader {
 
     @Override
     public void destroy() {
+        finished = true;
         if (stream == null) {
             return;
         }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
new file mode 100644
index 000000000..e684b2ce7
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.JobConstants.INTERVAL_MILLISECONDS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_DEFAULT_EXPIRE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_EXPIRE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INTERVAL;
+
+/**
+ * monitor files
+ */
+public final class MonitorTextFile {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class);
+    private static volatile MonitorTextFile monitorTextFile = null;
+    // monitor thread pool
+    private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            60L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("monitor-file"));
+
+    private MonitorTextFile() {
+
+    }
+
+    /**
+     * Mode of singleton
+     * @return MonitorTextFile instance
+     */
+    public static MonitorTextFile getInstance() {
+        if (monitorTextFile == null) {
+            synchronized (MonitorTextFile.class) {
+                if (monitorTextFile == null) {
+                    monitorTextFile = new MonitorTextFile();
+                }
+            }
+        }
+        return monitorTextFile;
+    }
+
+    public void monitor(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) {
+        MonitorEventRunnable monitorEvent = new MonitorEventRunnable(fileReaderOperator, textFileReader);
+        runningPool.execute(monitorEvent);
+    }
+
+    /**
+     * monitor file event
+     */
+    private class MonitorEventRunnable implements Runnable {
+
+        private final FileReaderOperator fileReaderOperator;
+        private final TextFileReader textFileReader;
+        private final Long interval;
+        private final long startTime = System.currentTimeMillis();
+        /**
+         * the last modify time of the file
+         */
+        private BasicFileAttributes attributesBefore;
+
+        public MonitorEventRunnable(FileReaderOperator fileReaderOperator, TextFileReader textFileReader) {
+            this.fileReaderOperator = fileReaderOperator;
+            this.textFileReader = textFileReader;
+            this.interval = Long
+                    .parseLong(fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_INTERVAL, INTERVAL_MILLISECONDS));
+            try {
+                this.attributesBefore = Files
+                        .readAttributes(fileReaderOperator.file.toPath(), BasicFileAttributes.class);
+            } catch (IOException e) {
+                LOGGER.error("get {} last modify time error:", fileReaderOperator.file.getName(), e);
+            }
+        }
+
+        @Override
+        public void run() {
+            while (!this.fileReaderOperator.finished) {
+                try {
+                    long expireTime = Long.parseLong(fileReaderOperator.jobConf
+                            .get(JOB_FILE_MONITOR_EXPIRE, JOB_FILE_MONITOR_DEFAULT_EXPIRE));
+                    long currentTime = System.currentTimeMillis();
+                    if (expireTime != Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE)
+                            && currentTime - this.startTime > expireTime) {
+                        break;
+                    }
+                    listen();
+                    TimeUnit.MILLISECONDS.sleep(interval);
+                } catch (Exception e) {
+                    LOGGER.error("monitor {} error:", this.fileReaderOperator.file.getName(), e);
+                }
+            }
+        }
+
+        private void listen() throws IOException {
+            BasicFileAttributes attributesAfter = Files
+                    .readAttributes(this.fileReaderOperator.file.toPath(), BasicFileAttributes.class);
+            if (attributesBefore.lastModifiedTime().compareTo(attributesAfter.lastModifiedTime()) < 0
+                    && !this.fileReaderOperator.iterator.hasNext()) {
+                this.textFileReader.getData();
+                this.fileReaderOperator.iterator = fileReaderOperator.stream.iterator();
+                this.attributesBefore = attributesAfter;
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
index 73ed01c98..bda3a5436 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -34,6 +34,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_DEFAULT_STATUS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_STATUS;
 
 /**
  * Text file reader
@@ -46,6 +48,10 @@ public final class TextFileReader extends AbstractFileReader {
 
     public TextFileReader(FileReaderOperator fileReaderOperator) {
         super.fileReaderOperator = fileReaderOperator;
+        if (fileReaderOperator.jobConf.get(JOB_FILE_MONITOR_STATUS, JOB_FILE_MONITOR_DEFAULT_STATUS)
+                .equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) {
+            MonitorTextFile.getInstance().monitor(fileReaderOperator, this);
+        }
     }
 
     public void getData() throws IOException {
@@ -84,6 +90,7 @@ public final class TextFileReader extends AbstractFileReader {
         }
         lines = resultLines.isEmpty() ? lines : resultLines;
         fileReaderOperator.stream = lines.stream();
+        fileReaderOperator.position = fileReaderOperator.position + lines.size();
     }
 
 }
diff --git a/inlong-agent/agent-plugins/src/test/resources/agent.properties b/inlong-agent/agent-plugins/src/test/resources/agent.properties
index aedb395f8..0d2524f26 100755
--- a/inlong-agent/agent-plugins/src/test/resources/agent.properties
+++ b/inlong-agent/agent-plugins/src/test/resources/agent.properties
@@ -23,5 +23,6 @@ job.thread.running.core=10
 agent.manager.vip.http.host=127.0.0.1
 agent.manager.vip.http.port=8083
 agent.fetcher.classname=org.apache.inlong.agent.plugin.fetcher.ManagerFetcher
-agent.manager.auth.secretId=
-agent.manager.auth.secretKey=
+agent.manager.auth.secretId=admin
+agent.manager.auth.secretKey=admin
+agent.cluster.name=default_agent