You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/05 07:44:49 UTC
[inlong] branch master updated: [INLONG-5362][Agent] Supports the collection of data splicing metadata information (#5363)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8736a08ca [INLONG-5362][Agent] Supports the collection of data splicing metadata information (#5363)
8736a08ca is described below
commit 8736a08ca23ae40f8d154acb1b5adc1d810f42ed
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Aug 5 15:44:44 2022 +0800
[INLONG-5362][Agent] Supports the collection of data splicing metadata information (#5363)
---
.../inlong/agent/constant/CommonConstants.java | 5 +
.../apache/inlong/agent/constant/JobConstants.java | 1 +
.../inlong/agent/constant/KubernetesConstants.java | 36 +++++++
.../inlong/agent/constant/MetadataConstants.java | 27 ++++++
.../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +++
.../apache/inlong/agent/pojo/JobProfileDto.java | 8 ++
inlong-agent/agent-plugins/pom.xml | 8 +-
.../agent/plugin/sources/TextFileSource.java | 14 +--
.../sources/reader/file/AbstractFileReader.java | 44 +++++++++
.../FileReaderOperator.java} | 104 +++++++++------------
.../sources/reader/file/KubernetesFileReader.java | 102 ++++++++++++++++++++
.../plugin/sources/reader/file/TextFileReader.java | 84 +++++++++++++++++
.../inlong/agent/plugin/utils/MetaDataUtils.java | 62 ++++++++++++
.../agent/plugin/sources/TestTextFileReader.java | 10 +-
licenses/inlong-agent/LICENSE | 1 +
pom.xml | 7 ++
16 files changed, 449 insertions(+), 75 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 1d0860b8b..8a8de1780 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -160,4 +160,9 @@ public class CommonConstants {
public static final String KEY_METRICS_INDEX = "metricsIndex";
+ public static final String COMMA = ",";
+ public static final String DELIMITER_UNDERLINE = "_";
+ public static final String DELIMITER_HYPHEN = "-";
+
+
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index cd2f91f36..9ca8b4d0f 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -53,6 +53,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_FILE_COLLECT_TYPE = "job.fileJob.collectType";
public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern";
public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType";
+ public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList";
//Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
new file mode 100644
index 000000000..dc76a39a9
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * k8s information
+ */
+public class KubernetesConstants {
+
+ public static final String HTTPS = "https://";
+ public static final String KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST";
+ public static final String KUBERNETES_SERVICE_PORT = "KUBERNETES_SERVICE_PORT";
+
+ // k8s information
+ public static final String NAMESPACE = "namespace";
+ public static final String POD_NAME = "pod.name";
+ public static final String CONTAINER_NAME = "container.name";
+ public static final String CONTAINER_ID = "container.id";
+
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
new file mode 100644
index 000000000..796bfb396
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+/**
+ * Metadata type
+ */
+public class MetadataConstants {
+
+ public static final String KUBERNETES = "kubernetes";
+
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 22d716df4..2780366b3 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -19,6 +19,9 @@ package org.apache.inlong.agent.pojo;
import lombok.Data;
+import java.util.List;
+import java.util.Map;
+
@Data
public class FileJob {
@@ -36,6 +39,10 @@ public class FileJob {
private String contentCollectType;
+ private String envList;
+
+ private List<Map<String, String>> metaFields;
+
@Data
public static class Dir {
@@ -77,6 +84,10 @@ public class FileJob {
private String lineEndPattern;
private String contentCollectType;
+
+ private String envList;
+
+ private List<Map<String, String>> metaFields;
}
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 285269028..02beda28a 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -119,6 +119,14 @@ public class JobProfileDto {
FileJob.Line line = new Line();
line.setEndPattern(fileJobTaskConfig.getLineEndPattern());
}
+
+ if (null != fileJobTaskConfig.getEnvList()) {
+ fileJob.setEnvList(fileJobTaskConfig.getEnvList());
+ }
+
+ if (null != fileJobTaskConfig.getMetaFields()) {
+ fileJob.setMetaFields(fileJob.getMetaFields());
+ }
return fileJob;
}
diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index f0dee553e..48303112a 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -18,8 +18,8 @@
under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-agent</artifactId>
@@ -139,5 +139,9 @@
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index a17bb627e..0cd823cd1 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -22,7 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Source;
-import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
+import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,11 +70,11 @@ public class TextFileSource implements Source {
int startPosition = getStartPosition(jobConf, file);
LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", startPosition,
jobConf.getInstanceId(), file.getAbsolutePath());
- TextFileReader textFileReader = new TextFileReader(file, startPosition);
+ FileReaderOperator fileReader = new FileReaderOperator(file, startPosition);
long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT);
- textFileReader.setWaitMillisecond(waitTimeout);
- addValidator(filterPattern, textFileReader);
- result.add(textFileReader);
+ fileReader.setWaitMillisecond(waitTimeout);
+ addValidator(filterPattern, fileReader);
+ result.add(fileReader);
}
// increment the count of successful sources
GLOBAL_METRICS.incSourceSuccessCount(metricTagName);
@@ -98,7 +98,7 @@ public class TextFileSource implements Source {
return seekPosition;
}
- private void addValidator(String filterPattern, TextFileReader textFileReader) {
- textFileReader.addPatternValidator(filterPattern);
+ private void addValidator(String filterPattern, FileReaderOperator fileReader) {
+ fileReader.addPatternValidator(filterPattern);
}
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
new file mode 100644
index 000000000..d066ca2e7
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * File reader template
+ */
+public abstract class AbstractFileReader {
+
+ public FileReaderOperator fileReaderOperator;
+
+ public abstract void getData() throws Exception;
+
+ public void mergeData(FileReaderOperator fileReaderOperator) {
+ if (null == fileReaderOperator.metadata) {
+ return;
+ }
+
+ List<String> lines = fileReaderOperator.stream.collect(Collectors.toList());
+ lines.forEach(data -> data = MetaDataUtils.concatString(data, fileReaderOperator.metadata));
+ fileReaderOperator.stream = lines.stream();
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
old mode 100755
new mode 100644
similarity index 71%
rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 082a83f07..f5e6543f9
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.sources.reader;
+package org.apache.inlong.agent.plugin.sources.reader.file;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
@@ -24,69 +24,62 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
import org.apache.inlong.agent.plugin.except.FileException;
+import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.inlong.agent.constant.AgentConstants.GLOBAL_METRICS;
+import static org.apache.inlong.agent.constant.CommonConstants.COMMA;
import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.MetadataConstants.KUBERNETES;
/**
- * read file data
+ * File reader entrance
*/
-public class TextFileReader extends AbstractReader {
+public class FileReaderOperator extends AbstractReader {
public static final int NEVER_STOP_SIGN = -1;
private static final Logger LOGGER = LoggerFactory.getLogger(TextFileReader.class);
private static final String TEXT_FILE_READER_TAG_NAME = "AgentTextMetric";
- private final File file;
-
- private final int position;
-
- private final String md5;
- private final Map<File, String> lineStringBuffer = new ConcurrentHashMap<>();
+ public File file;
+ public int position;
+ public String md5;
+ public Stream<String> stream;
+ public String metadata;
+ public JobProfile jobConf;
private Iterator<String> iterator;
-
- private Stream<String> stream;
-
private long timeout;
-
private long waitTimeout;
private long lastTime = 0;
private List<Validator> validators = new ArrayList<>();
- public TextFileReader(File file, int position) {
+ public FileReaderOperator(File file, int position) {
this(file, position, "");
}
- public TextFileReader(File file, int position, String md5) {
+ public FileReaderOperator(File file, int position, String md5) {
this.file = file;
this.position = position;
this.md5 = md5;
}
- public TextFileReader(File file) {
+ public FileReaderOperator(File file) {
this(file, 0);
}
@@ -171,6 +164,7 @@ public class TextFileReader extends AbstractReader {
@Override
public void init(JobProfile jobConf) {
try {
+ this.jobConf = jobConf;
super.init(jobConf);
metricTagName = TEXT_FILE_READER_TAG_NAME + "_" + inlongGroupId;
initReadTimeout(jobConf);
@@ -179,8 +173,15 @@ public class TextFileReader extends AbstractReader {
LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, md5);
}
LOGGER.info("file name for task is {}, md5 is {}", file, md5);
- //split file line
- getFileStream(jobConf);
+ List<AbstractFileReader> fileReaders = getInstance(this, jobConf);
+ fileReaders.forEach(fileReader -> {
+ try {
+ fileReader.getData();
+ fileReader.mergeData(this);
+ } catch (Exception ex) {
+ LOGGER.error("read file data error:{}", ex.getMessage());
+ }
+ });
if (Objects.nonNull(stream)) {
iterator = stream.iterator();
}
@@ -188,41 +189,7 @@ public class TextFileReader extends AbstractReader {
throw new FileException("error init stream for " + file.getPath(), ex);
}
}
-
- private void getFileStream(JobProfile jobConf) throws IOException {
- List<String> lines = Files.newBufferedReader(file.toPath()).lines().skip(position).collect(Collectors.toList());
- List<String> resultLines = new ArrayList<>();
- //TODO line regular expression matching
- if (jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
- Pattern pattern = Pattern.compile(jobConf.get(JOB_FILE_LINE_END_PATTERN));
- lines.forEach(line -> {
- lineStringBuffer.put(file,
- lineStringBuffer.isEmpty() ? line : lineStringBuffer.get(file).concat(" ").concat(line));
- String data = lineStringBuffer.get(file);
- Matcher matcher = pattern.matcher(data);
- if (matcher.find() && StringUtils.isNoneBlank(matcher.group())) {
- String[] splitLines = data.split(matcher.group());
- int length = splitLines.length;
- for (int i = 0; i < length; i++) {
- if (i > 0 && i == length - 1 && null != splitLines[i]) {
- lineStringBuffer.put(file, splitLines[i]);
- break;
- }
- resultLines.add(splitLines[i].trim());
- }
- if (1 == length) {
- lineStringBuffer.remove(file);
- }
- }
- });
- if (resultLines.isEmpty()) {
- return;
- }
- }
- lines = resultLines.isEmpty() ? lines : resultLines;
- stream = lines.stream();
- }
-
+
private void initReadTimeout(JobProfile jobConf) {
int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
DEFAULT_JOB_FILE_MAX_WAIT);
@@ -242,4 +209,19 @@ public class TextFileReader extends AbstractReader {
LOGGER.info("destroy reader with read {} num {}",
metricTagName, GLOBAL_METRICS.getReadNum(metricTagName));
}
+
+ public List<AbstractFileReader> getInstance(FileReaderOperator reader, JobProfile jobConf) {
+ List<AbstractFileReader> fileReaders = new ArrayList<>();
+ fileReaders.add(new TextFileReader(this));
+ if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) {
+ return fileReaders;
+ }
+ String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
+ Arrays.stream(env).forEach(data -> {
+ if (data.equalsIgnoreCase(KUBERNETES)) {
+ fileReaders.add(new KubernetesFileReader(this));
+ }
+ });
+ return fileReaders;
+ }
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
new file mode 100644
index 000000000..9f0053351
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.dsl.MixedOperation;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
+import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
+import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
+
+/**
+ * k8s file reader
+ */
+public final class KubernetesFileReader extends AbstractFileReader {
+
+ private static final Logger log = LoggerFactory.getLogger(KubernetesFileReader.class);
+
+ private KubernetesClient client;
+
+ KubernetesFileReader(FileReaderOperator fileReaderOperator) {
+ super.fileReaderOperator = fileReaderOperator;
+ }
+
+ public void getData() {
+ if (Objects.nonNull(client) && Objects.nonNull(fileReaderOperator.metadata)) {
+ return;
+ }
+ client = getKubernetesClient();
+ Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+ ObjectMeta objectMeta = getPodMetadata(k8sInfo.get(NAMESPACE), k8sInfo.get(POD_NAME));
+ fileReaderOperator.metadata = Objects.nonNull(objectMeta) ? objectMeta.toString() : null;
+ }
+
+ private KubernetesClient getKubernetesClient() {
+ String ip = System.getProperty(KUBERNETES_SERVICE_HOST);
+ String port = System.getProperty(KUBERNETES_SERVICE_PORT);
+ if (Objects.isNull(ip) || Objects.isNull(port)) {
+ log.warn("k8s env ip and port is null,can not connect k8s master");
+ return null;
+ }
+ String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
+ Config cofig = new ConfigBuilder().withMasterUrl(maserUrl).build();
+ return new KubernetesClientBuilder().withConfig(cofig).build();
+ }
+
+ /**
+ * get PODS of kubernetes information
+ */
+ public PodList getPods() {
+ if (Objects.isNull(client)) {
+ return null;
+ }
+ MixedOperation<Pod, PodList, PodResource> pods = client.pods();
+ return pods.list();
+ }
+
+ /**
+ * get pod metadata by namespace and pod name
+ */
+ public ObjectMeta getPodMetadata(String namespace, String podName) {
+ List<ObjectMeta> objectMetas = client.pods().list().getItems().stream().map(Pod::getMetadata)
+ .filter(data -> data.getName().equalsIgnoreCase(podName) && data.getNamespace()
+ .equalsIgnoreCase(namespace)).collect(Collectors.toList());
+ return CollectionUtils.isNotEmpty(objectMetas) ? objectMetas.get(0) : null;
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
new file mode 100644
index 000000000..e811efc39
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader.file;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
+
+/**
+ * Text file reader
+ */
+public final class TextFileReader extends AbstractFileReader {
+
+ private final Map<File, String> lineStringBuffer = new ConcurrentHashMap<>();
+
+ public TextFileReader(FileReaderOperator fileReaderOperator) {
+ super.fileReaderOperator = fileReaderOperator;
+ }
+
+ public void getData() throws IOException {
+ List<String> lines = Files.newBufferedReader(fileReaderOperator.file.toPath()).lines().skip(
+ fileReaderOperator.position)
+ .collect(Collectors.toList());
+ List<String> resultLines = new ArrayList<>();
+ //TODO line regular expression matching
+ if (fileReaderOperator.jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
+ Pattern pattern = Pattern.compile(fileReaderOperator.jobConf.get(JOB_FILE_LINE_END_PATTERN));
+ lines.forEach(line -> {
+ lineStringBuffer.put(fileReaderOperator.file,
+ lineStringBuffer.isEmpty() ? line
+ : lineStringBuffer.get(fileReaderOperator.file).concat(" ").concat(line));
+ String data = lineStringBuffer.get(fileReaderOperator.file);
+ Matcher matcher = pattern.matcher(data);
+ if (matcher.find() && StringUtils.isNoneBlank(matcher.group())) {
+ String[] splitLines = data.split(matcher.group());
+ int length = splitLines.length;
+ for (int i = 0; i < length; i++) {
+ if (i > 0 && i == length - 1 && null != splitLines[i]) {
+ lineStringBuffer.put(fileReaderOperator.file, splitLines[i]);
+ break;
+ }
+ resultLines.add(splitLines[i].trim());
+ }
+ if (1 == length) {
+ lineStringBuffer.remove(fileReaderOperator.file);
+ }
+ }
+ });
+ if (resultLines.isEmpty()) {
+ return;
+ }
+ }
+ lines = resultLines.isEmpty() ? lines : resultLines;
+ fileReaderOperator.stream = lines.stream();
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
new file mode 100644
index 000000000..d493ef726
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.constant.CommonConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
+import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
+import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
+import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
+
+/**
+ * Metadata utils
+ */
+public class MetaDataUtils {
+
+ /**
+ * standard log for k8s
+ *
+ * get pod_name,namespace,container_name,container_id
+ */
+ public static Map<String, String> getLogInfo(String fileName) {
+ Map<String, String> podInf = new HashMap<>();
+ if (!StringUtils.isNoneBlank(fileName) && fileName.contains(CommonConstants.DELIMITER_UNDERLINE)) {
+ return podInf;
+ }
+ // file name example: /var/log/containers/<pod_name>_<namespace>_<container_name>-<continer_id>.log
+ String[] str = fileName.split(CommonConstants.DELIMITER_UNDERLINE);
+ podInf.put(POD_NAME, str[0]);
+ podInf.put(NAMESPACE, str[1]);
+ String[] containerInfo = str[2].split(CommonConstants.DELIMITER_HYPHEN);
+ podInf.put(CONTAINER_NAME, containerInfo[0]);
+ podInf.put(CONTAINER_ID, containerInfo[1]);
+ return podInf;
+ }
+
+ public static String concatString(String str1, String str2) {
+ if (!StringUtils.isNoneBlank(str2)) {
+ return str1;
+ }
+ return str1.concat("\n").concat(str2);
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 6ad27e738..d73c25933 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -23,7 +23,7 @@ import org.apache.inlong.agent.constant.FileCollectType;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
-import org.apache.inlong.agent.plugin.sources.reader.TextFileReader;
+import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.junit.AfterClass;
@@ -203,13 +203,13 @@ public class TestTextFileReader {
afterList.add("world");
}
Files.write(localPath, afterList, StandardOpenOption.APPEND);
- TextFileReader reader = new TextFileReader(localPath.toFile(), 1000);
+ FileReaderOperator fileReaderOperator = new FileReaderOperator(localPath.toFile(), 1000);
JobProfile jobProfile = new JobProfile();
jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
- reader.init(jobProfile);
+ fileReaderOperator.init(jobProfile);
- Assert.assertEquals("world", new String(reader.read().getBody()));
+ Assert.assertEquals("world", new String(fileReaderOperator.read().getBody()));
}
@@ -220,7 +220,7 @@ public class TestTextFileReader {
jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
Path localPath = Paths.get(testDir.toString(), "test1.txt");
- TextFileReader reader = new TextFileReader(localPath.toFile(), 0);
+ FileReaderOperator reader = new FileReaderOperator(localPath.toFile(), 0);
if (localPath.toFile().exists()) {
localPath.toFile().delete();
}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index a63ca9bf9..94420705e 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -400,6 +400,7 @@ The text of each license is the standard Apache 2.0 license.
joda-time:joda-time:2.9.9 - Joda-Time (http://www.joda.org/joda-time/), (Apache 2)
org.apache.kafka:kafka-log4j-appender:3.0.0 - Apache Kafka (https://kafka.apache.org), (The Apache Software License, Version 2.0)
org.apache.kafka:kafka-tools:3.0.0 - Apache Kafka (https://kafka.apache.org), (The Apache Software License, Version 2.0)
+ io.fabric8:kubernetes-client:6.0.0 - kubernetes-client (https://github.com/fabric8io/kubernetes-client/tree/v6.0.0), (The Apache Software License, Version 2.0)
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava - Guava ListenableFuture only (https://github.com/google/guava/listenablefuture), (The Apache Software License, Version 2.0)
org.apache.logging.log4j:log4j-api:2.17.2 - Apache Log4j API (https://logging.apache.org/log4j/2.x/log4j-api/), (Apache License, Version 2.0)
org.apache.logging.log4j:log4j-slf4j-impl:2.17.2 - Apache Log4j SLF4J Binding (https://logging.apache.org/log4j/2.x/log4j-slf4j-impl/), (Apache License, Version 2.0)
diff --git a/pom.xml b/pom.xml
index d87cf1be6..327dfd362 100644
--- a/pom.xml
+++ b/pom.xml
@@ -250,6 +250,8 @@
<HikariCP.version>4.0.3</HikariCP.version>
<caffeine.version>2.9.3</caffeine.version>
<kafka.clients.version>3.0.0</kafka.clients.version>
+
+ <kubernetes.client.version>6.0.0</kubernetes.client.version>
</properties>
<dependencyManagement>
@@ -1558,6 +1560,11 @@
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ <version>${kubernetes.client.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>