You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/05 07:44:49 UTC

[inlong] branch master updated: [INLONG-5362][Agent] Supports the collection of data splicing metadata information (#5363)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8736a08ca [INLONG-5362][Agent] Supports the collection of data splicing metadata information (#5363)
8736a08ca is described below

commit 8736a08ca23ae40f8d154acb1b5adc1d810f42ed
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Aug 5 15:44:44 2022 +0800

    [INLONG-5362][Agent] Supports the collection of data splicing metadata information (#5363)
---
 .../inlong/agent/constant/CommonConstants.java     |   5 +
 .../apache/inlong/agent/constant/JobConstants.java |   1 +
 .../inlong/agent/constant/KubernetesConstants.java |  36 +++++++
 .../inlong/agent/constant/MetadataConstants.java   |  27 ++++++
 .../java/org/apache/inlong/agent/pojo/FileJob.java |  11 +++
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   8 ++
 inlong-agent/agent-plugins/pom.xml                 |   8 +-
 .../agent/plugin/sources/TextFileSource.java       |  14 +--
 .../sources/reader/file/AbstractFileReader.java    |  44 +++++++++
 .../FileReaderOperator.java}                       | 104 +++++++++------------
 .../sources/reader/file/KubernetesFileReader.java  | 102 ++++++++++++++++++++
 .../plugin/sources/reader/file/TextFileReader.java |  84 +++++++++++++++++
 .../inlong/agent/plugin/utils/MetaDataUtils.java   |  62 ++++++++++++
 .../agent/plugin/sources/TestTextFileReader.java   |  10 +-
 licenses/inlong-agent/LICENSE                      |   1 +
 pom.xml                                            |   7 ++
 16 files changed, 449 insertions(+), 75 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 1d0860b8b..8a8de1780 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -160,4 +160,9 @@ public class CommonConstants {
 
     public static final String KEY_METRICS_INDEX = "metricsIndex";
 
+    public static final String COMMA = ",";
+    public static final String DELIMITER_UNDERLINE = "_";
+    public static final String DELIMITER_HYPHEN = "-";
+
+
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index cd2f91f36..9ca8b4d0f 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -53,6 +53,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_FILE_COLLECT_TYPE = "job.fileJob.collectType";
     public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern";
     public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType";
+    public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
new file mode 100644
index 000000000..dc76a39a9
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ *  k8s information
+ */
+public class KubernetesConstants {
+
+    public static final String HTTPS = "https://";
+    public static final String KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST";
+    public static final String KUBERNETES_SERVICE_PORT = "KUBERNETES_SERVICE_PORT";
+
+    // k8s information
+    public static final String NAMESPACE = "namespace";
+    public static final String POD_NAME = "pod.name";
+    public static final String CONTAINER_NAME = "container.name";
+    public static final String CONTAINER_ID = "container.id";
+
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
new file mode 100644
index 000000000..796bfb396
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ *  Metadata type
+ */
+public class MetadataConstants {
+
+    public static final String KUBERNETES = "kubernetes";
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 22d716df4..2780366b3 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -19,6 +19,9 @@ package org.apache.inlong.agent.pojo;
 
 import lombok.Data;
 
+import java.util.List;
+import java.util.Map;
+
 @Data
 public class FileJob {
 
@@ -36,6 +39,10 @@ public class FileJob {
 
     private String contentCollectType;
 
+    private String envList;
+
+    private List<Map<String, String>> metaFields;
+
     @Data
     public static class Dir {
 
@@ -77,6 +84,10 @@ public class FileJob {
         private String lineEndPattern;
 
         private String contentCollectType;
+
+        private String envList;
+
+        private List<Map<String, String>> metaFields;
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 285269028..02beda28a 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -119,6 +119,14 @@ public class JobProfileDto {
             FileJob.Line line = new Line();
             line.setEndPattern(fileJobTaskConfig.getLineEndPattern());
         }
+
+        if (null != fileJobTaskConfig.getEnvList()) {
+            fileJob.setEnvList(fileJobTaskConfig.getEnvList());
+        }
+
+        if (null != fileJobTaskConfig.getMetaFields()) {
+            fileJob.setMetaFields(fileJob.getMetaFields());
+        }
         return fileJob;
     }
 
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index f0dee553e..48303112a 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -18,8 +18,8 @@
     under the License.
 -->
 <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xmlns="http://maven.apache.org/POM/4.0.0"
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+        xmlns="http://maven.apache.org/POM/4.0.0"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.inlong</groupId>
         <artifactId>inlong-agent</artifactId>
@@ -139,5 +139,9 @@
             <artifactId>powermock-api-mockito2</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index a17bb627e..0cd823cd1 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
+import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,11 +70,11 @@ public class TextFileSource implements Source {
             int startPosition = getStartPosition(jobConf, file);
             LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", startPosition,
                     jobConf.getInstanceId(), file.getAbsolutePath());
-            TextFileReader textFileReader = new TextFileReader(file, startPosition);
+            FileReaderOperator fileReader = new FileReaderOperator(file, startPosition);
             long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT);
-            textFileReader.setWaitMillisecond(waitTimeout);
-            addValidator(filterPattern, textFileReader);
-            result.add(textFileReader);
+            fileReader.setWaitMillisecond(waitTimeout);
+            addValidator(filterPattern, fileReader);
+            result.add(fileReader);
         }
         // increment the count of successful sources
         GLOBAL_METRICS.incSourceSuccessCount(metricTagName);
@@ -98,7 +98,7 @@ public class TextFileSource implements Source {
         return seekPosition;
     }
 
-    private void addValidator(String filterPattern, TextFileReader textFileReader) {
-        textFileReader.addPatternValidator(filterPattern);
+    private void addValidator(String filterPattern, FileReaderOperator fileReader) {
+        fileReader.addPatternValidator(filterPattern);
     }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
new file mode 100644
index 000000000..d066ca2e7
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * File reader template
+ */
+public abstract class AbstractFileReader {
+
+    public FileReaderOperator fileReaderOperator;
+
+    public abstract void getData() throws Exception;
+
+    public void mergeData(FileReaderOperator fileReaderOperator) {
+        if (null == fileReaderOperator.metadata) {
+            return;
+        }
+
+        List<String> lines = fileReaderOperator.stream.collect(Collectors.toList());
+        lines.forEach(data -> data = MetaDataUtils.concatString(data, fileReaderOperator.metadata));
+        fileReaderOperator.stream = lines.stream();
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
old mode 100755
new mode 100644
similarity index 71%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 082a83f07..f5e6543f9
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.sources.reader;
+package org.apache.inlong.agent.plugin.sources.reader.file;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
@@ -24,69 +24,62 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Validator;
 import org.apache.inlong.agent.plugin.except.FileException;
+import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
+import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
 import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES;
 
 /**
- * read file data
+ * File reader entrance
  */
-public class TextFileReader extends AbstractReader {
+public class FileReaderOperator extends AbstractReader {
 
     public static final int NEVER_STOP_SIGN = -1;
     private static final Logger LOGGER = LoggerFactory.getLogger(TextFileReader.class);
     private static final String TEXT_FILE_READER_TAG_NAME = "AgentTextMetric";
-    private final File file;
-
-    private final int position;
-
-    private final String md5;
-    private final Map<File, String> lineStringBuffer = new ConcurrentHashMap<>();
+    public File file;
+    public int position;
+    public String md5;
+    public Stream<String> stream;
+    public String metadata;
+    public JobProfile jobConf;
     private Iterator<String> iterator;
-
-    private Stream<String> stream;
-
     private long timeout;
-
     private long waitTimeout;
 
     private long lastTime = 0;
 
     private List<Validator> validators = new ArrayList<>();
 
-    public TextFileReader(File file, int position) {
+    public FileReaderOperator(File file, int position) {
         this(file, position, "");
     }
 
-    public TextFileReader(File file, int position, String md5) {
+    public FileReaderOperator(File file, int position, String md5) {
         this.file = file;
         this.position = position;
         this.md5 = md5;
     }
 
-    public TextFileReader(File file) {
+    public FileReaderOperator(File file) {
         this(file, 0);
     }
 
@@ -171,6 +164,7 @@ public class TextFileReader extends AbstractReader {
     @Override
     public void init(JobProfile jobConf) {
         try {
+            this.jobConf = jobConf;
             super.init(jobConf);
             metricTagName = TEXT_FILE_READER_TAG_NAME + "_" + inlongGroupId;
             initReadTimeout(jobConf);
@@ -179,8 +173,15 @@ public class TextFileReader extends AbstractReader {
                 LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, md5);
             }
             LOGGER.info("file name for task is {}, md5 is {}", file, md5);
-            //split file line
-            getFileStream(jobConf);
+            List<AbstractFileReader> fileReaders = getInstance(this, jobConf);
+            fileReaders.forEach(fileReader -> {
+                try {
+                    fileReader.getData();
+                    fileReader.mergeData(this);
+                } catch (Exception ex) {
+                    LOGGER.error("read file data error:{}", ex.getMessage());
+                }
+            });
             if (Objects.nonNull(stream)) {
                 iterator = stream.iterator();
             }
@@ -188,41 +189,7 @@ public class TextFileReader extends AbstractReader {
             throw new FileException("error init stream for " + file.getPath(), ex);
         }
     }
-
-    private void getFileStream(JobProfile jobConf) throws IOException {
-        List<String> lines = Files.newBufferedReader(file.toPath()).lines().skip(position).collect(Collectors.toList());
-        List<String> resultLines = new ArrayList<>();
-        //TODO line regular expression matching
-        if (jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
-            Pattern pattern = Pattern.compile(jobConf.get(JOB_FILE_LINE_END_PATTERN));
-            lines.forEach(line -> {
-                lineStringBuffer.put(file,
-                        lineStringBuffer.isEmpty() ? line : lineStringBuffer.get(file).concat(" ").concat(line));
-                String data = lineStringBuffer.get(file);
-                Matcher matcher = pattern.matcher(data);
-                if (matcher.find() && StringUtils.isNoneBlank(matcher.group())) {
-                    String[] splitLines = data.split(matcher.group());
-                    int length = splitLines.length;
-                    for (int i = 0; i < length; i++) {
-                        if (i > 0 && i == length - 1 && null != splitLines[i]) {
-                            lineStringBuffer.put(file, splitLines[i]);
-                            break;
-                        }
-                        resultLines.add(splitLines[i].trim());
-                    }
-                    if (1 == length) {
-                        lineStringBuffer.remove(file);
-                    }
-                }
-            });
-            if (resultLines.isEmpty()) {
-                return;
-            }
-        }
-        lines = resultLines.isEmpty() ? lines : resultLines;
-        stream = lines.stream();
-    }
-
+    
     private void initReadTimeout(JobProfile jobConf) {
         int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
                 DEFAULT_JOB_FILE_MAX_WAIT);
@@ -242,4 +209,19 @@ public class TextFileReader extends AbstractReader {
         LOGGER.info("destroy reader with read {} num {}",
                 metricTagName, GLOBAL_METRICS.getReadNum(metricTagName));
     }
+
+    public List<AbstractFileReader> getInstance(FileReaderOperator reader, JobProfile jobConf) {
+        List<AbstractFileReader> fileReaders = new ArrayList<>();
+        fileReaders.add(new TextFileReader(this));
+        if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) {
+            return fileReaders;
+        }
+        String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
+        Arrays.stream(env).forEach(data -> {
+            if (data.equalsIgnoreCase(KUBERNETES)) {
+                fileReaders.add(new KubernetesFileReader(this));
+            }
+        });
+        return fileReaders;
+    }
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
new file mode 100644
index 000000000..9f0053351
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
+import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
+import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
+
+/**
+ * k8s file reader
+ */
+public final class KubernetesFileReader extends AbstractFileReader {
+
+    private static final Logger log = LoggerFactory.getLogger(KubernetesFileReader.class);
+
+    private KubernetesClient client;
+
+    KubernetesFileReader(FileReaderOperator fileReaderOperator) {
+        super.fileReaderOperator = fileReaderOperator;
+    }
+
+    public void getData() {
+        if (Objects.nonNull(client) && Objects.nonNull(fileReaderOperator.metadata)) {
+            return;
+        }
+        client = getKubernetesClient();
+        Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+        ObjectMeta objectMeta = getPodMetadata(k8sInfo.get(NAMESPACE), k8sInfo.get(POD_NAME));
+        fileReaderOperator.metadata = Objects.nonNull(objectMeta) ? objectMeta.toString() : null;
+    }
+
+    private KubernetesClient getKubernetesClient() {
+        String ip = System.getProperty(KUBERNETES_SERVICE_HOST);
+        String port = System.getProperty(KUBERNETES_SERVICE_PORT);
+        if (Objects.isNull(ip) || Objects.isNull(port)) {
+            log.warn("k8s env ip and port is null,can not connect k8s master");
+            return null;
+        }
+        String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
+        Config cofig = new ConfigBuilder().withMasterUrl(maserUrl).build();
+        return new KubernetesClientBuilder().withConfig(cofig).build();
+    }
+
+    /**
+     * get PODS of kubernetes information
+     */
+    public PodList getPods() {
+        if (Objects.isNull(client)) {
+            return null;
+        }
+        MixedOperation<Pod, PodList, PodResource> pods = client.pods();
+        return pods.list();
+    }
+
+    /**
+     * get pod metadata by namespace and pod name
+     */
+    public ObjectMeta getPodMetadata(String namespace, String podName) {
+        List<ObjectMeta> objectMetas = client.pods().list().getItems().stream().map(Pod::getMetadata)
+                .filter(data -> data.getName().equalsIgnoreCase(podName) && data.getNamespace()
+                        .equalsIgnoreCase(namespace)).collect(Collectors.toList());
+        return CollectionUtils.isNotEmpty(objectMetas) ? objectMetas.get(0) : null;
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
new file mode 100644
index 000000000..e811efc39
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
+
+/**
+ * Text file reader
+ */
+public final class TextFileReader extends AbstractFileReader {
+
+    private final Map<File, String> lineStringBuffer = new ConcurrentHashMap<>();
+
+    public TextFileReader(FileReaderOperator fileReaderOperator) {
+        super.fileReaderOperator = fileReaderOperator;
+    }
+
+    public void getData() throws IOException {
+        List<String> lines = Files.newBufferedReader(fileReaderOperator.file.toPath()).lines().skip(
+                fileReaderOperator.position)
+                .collect(Collectors.toList());
+        List<String> resultLines = new ArrayList<>();
+        //TODO line regular expression matching
+        if (fileReaderOperator.jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
+            Pattern pattern = Pattern.compile(fileReaderOperator.jobConf.get(JOB_FILE_LINE_END_PATTERN));
+            lines.forEach(line -> {
+                lineStringBuffer.put(fileReaderOperator.file,
+                        lineStringBuffer.isEmpty() ? line
+                                : lineStringBuffer.get(fileReaderOperator.file).concat(" ").concat(line));
+                String data = lineStringBuffer.get(fileReaderOperator.file);
+                Matcher matcher = pattern.matcher(data);
+                if (matcher.find() && StringUtils.isNoneBlank(matcher.group())) {
+                    String[] splitLines = data.split(matcher.group());
+                    int length = splitLines.length;
+                    for (int i = 0; i < length; i++) {
+                        if (i > 0 && i == length - 1 && null != splitLines[i]) {
+                            lineStringBuffer.put(fileReaderOperator.file, splitLines[i]);
+                            break;
+                        }
+                        resultLines.add(splitLines[i].trim());
+                    }
+                    if (1 == length) {
+                        lineStringBuffer.remove(fileReaderOperator.file);
+                    }
+                }
+            });
+            if (resultLines.isEmpty()) {
+                return;
+            }
+        }
+        lines = resultLines.isEmpty() ? lines : resultLines;
+        fileReaderOperator.stream = lines.stream();
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
new file mode 100644
index 000000000..d493ef726
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.constant.CommonConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
+import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
+import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
+import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
+
+/**
+ * Metadata utils
+ */
+public class MetaDataUtils {
+
+    /**
+     * standard log for k8s
+     * 
+     * get pod_name,namespace,container_name,container_id
+     */
+    public static Map<String, String> getLogInfo(String fileName) {
+        Map<String, String> podInf = new HashMap<>();
+        if (!StringUtils.isNoneBlank(fileName) && fileName.contains(CommonConstants.DELIMITER_UNDERLINE)) {
+            return podInf;
+        }
+        // file name example: /var/log/containers/<pod_name>_<namespace>_<container_name>-<continer_id>.log
+        String[] str = fileName.split(CommonConstants.DELIMITER_UNDERLINE);
+        podInf.put(POD_NAME, str[0]);
+        podInf.put(NAMESPACE, str[1]);
+        String[] containerInfo = str[2].split(CommonConstants.DELIMITER_HYPHEN);
+        podInf.put(CONTAINER_NAME, containerInfo[0]);
+        podInf.put(CONTAINER_ID, containerInfo[1]);
+        return podInf;
+    }
+
+    public static String concatString(String str1, String str2) {
+        if (!StringUtils.isNoneBlank(str2)) {
+            return str1;
+        }
+        return str1.concat("\n").concat(str2);
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 6ad27e738..d73c25933 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -23,7 +23,7 @@ import org.apache.inlong.agent.constant.FileCollectType;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
-import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
+import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.junit.AfterClass;
@@ -203,13 +203,13 @@ public class TestTextFileReader {
             afterList.add("world");
         }
         Files.write(localPath, afterList, StandardOpenOption.APPEND);
-        TextFileReader reader = new TextFileReader(localPath.toFile(), 1000);
+        FileReaderOperator fileReaderOperator = new FileReaderOperator(localPath.toFile(), 1000);
         JobProfile jobProfile = new JobProfile();
         jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
-        reader.init(jobProfile);
+        fileReaderOperator.init(jobProfile);
 
-        Assert.assertEquals("world", new String(reader.read().getBody()));
+        Assert.assertEquals("world", new String(fileReaderOperator.read().getBody()));
 
     }
 
@@ -220,7 +220,7 @@ public class TestTextFileReader {
         jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
         Path localPath = Paths.get(testDir.toString(), "test1.txt");
-        TextFileReader reader = new TextFileReader(localPath.toFile(), 0);
+        FileReaderOperator reader = new FileReaderOperator(localPath.toFile(), 0);
         if (localPath.toFile().exists()) {
             localPath.toFile().delete();
         }
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index a63ca9bf9..94420705e 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -400,6 +400,7 @@ The text of each license is the standard Apache 2.0 license.
   joda-time:joda-time:2.9.9 - Joda-Time (http://www.joda.org/joda-time/), (Apache 2)
   org.apache.kafka:kafka-log4j-appender:3.0.0 - Apache Kafka (https://kafka.apache.org), (The Apache Software License, Version 2.0)
   org.apache.kafka:kafka-tools:3.0.0 - Apache Kafka (https://kafka.apache.org), (The Apache Software License, Version 2.0)
+  io.fabric8:kubernetes-client:6.0.0 - kubernetes-client (https://github.com/fabric8io/kubernetes-client/tree/v6.0.0), (The Apache Software License, Version 2.0)
   com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava - Guava ListenableFuture only (https://github.com/google/guava/listenablefuture), (The Apache Software License, Version 2.0)
   org.apache.logging.log4j:log4j-api:2.17.2 - Apache Log4j API (https://logging.apache.org/log4j/2.x/log4j-api/), (Apache License, Version 2.0)
   org.apache.logging.log4j:log4j-slf4j-impl:2.17.2 - Apache Log4j SLF4J Binding (https://logging.apache.org/log4j/2.x/log4j-slf4j-impl/), (Apache License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index d87cf1be6..327dfd362 100644
--- a/pom.xml
+++ b/pom.xml
@@ -250,6 +250,8 @@
         <HikariCP.version>4.0.3</HikariCP.version>
         <caffeine.version>2.9.3</caffeine.version>
         <kafka.clients.version>3.0.0</kafka.clients.version>
+
+        <kubernetes.client.version>6.0.0</kubernetes.client.version>
     </properties>
 
     <dependencyManagement>
@@ -1558,6 +1560,11 @@
                 <artifactId>caffeine</artifactId>
                 <version>${caffeine.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.fabric8</groupId>
+                <artifactId>kubernetes-client</artifactId>
+                <version>${kubernetes.client.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>