You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/24 03:37:28 UTC
[inlong] branch master updated: [INLONG-5663][Agent] To update data structure for k8s log (#5664)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f150ddbeb [INLONG-5663][Agent] To update data structure for k8s log (#5664)
f150ddbeb is described below
commit f150ddbeb8f3c1143afc164849cb485bedca5707
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Wed Aug 24 11:37:23 2022 +0800
[INLONG-5663][Agent] To update data structure for k8s log (#5664)
---
.../inlong/agent/constant/KubernetesConstants.java | 19 +++++------
.../sources/reader/file/FileReaderOperator.java | 6 ++--
.../sources/reader/file/KubernetesFileReader.java | 39 +++++++++++-----------
.../inlong/agent/plugin/utils/MetaDataUtils.java | 17 ++++++++--
4 files changed, 45 insertions(+), 36 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
index e71ae4bae..fb2b41c54 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
@@ -33,14 +33,13 @@ public class KubernetesConstants {
public static final String CONTAINER_ID = "container.id";
// k8s metadata
- public static final String METADATA_CONTAINER_ID = "_container_id_";
- public static final String METADATA_CONTAINER_NAME = "_container_name_";
- public static final String METADATA_NAMESPACE = "_namespace_";
- public static final String METADATA_POD_UID = "_pod_uid_";
- public static final String METADATA_POD_NAME = "_pod_name_";
- public static final String METADATA_POD_LABEL = "_pod_label_";
- public static final String DATA_CONTENT = "_content_";
- public static final String DATA_CONTENT_TIME = "_time_";
-
-
+ public static final String METADATA_CONTAINER_ID = "__container_id__";
+ public static final String METADATA_CONTAINER_NAME = "__container_name__";
+ public static final String METADATA_NAMESPACE = "__namespace__";
+ public static final String METADATA_POD_UID = "__pod_uid__";
+ public static final String METADATA_POD_NAME = "__pod_name__";
+ public static final String METADATA_POD_LABEL = "__pod_label__";
+ public static final String DATA_CONTENT = "__content__";
+ public static final String DATA_CONTENT_TIME = "__LogTime__";
+
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index f3d7d91bc..927d06daf 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -178,7 +178,7 @@ public class FileReaderOperator extends AbstractReader {
fileReader.getData();
fileReader.mergeData(this);
} catch (Exception ex) {
- LOGGER.error("read file data error:{}", ex.getMessage());
+ LOGGER.error("read file data error", ex);
}
});
if (Objects.nonNull(stream)) {
@@ -211,14 +211,14 @@ public class FileReaderOperator extends AbstractReader {
public List<AbstractFileReader> getInstance(FileReaderOperator reader, JobProfile jobConf) {
List<AbstractFileReader> fileReaders = new ArrayList<>();
- fileReaders.add(new TextFileReader(this));
+ fileReaders.add(new TextFileReader(reader));
if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) {
return fileReaders;
}
String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
Arrays.stream(env).forEach(data -> {
if (data.equalsIgnoreCase(KUBERNETES)) {
- fileReaders.add(new KubernetesFileReader(this));
+ fileReaders.add(new KubernetesFileReader(reader));
}
});
return fileReaders;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
index 04331278d..026bdc805 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -36,7 +36,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -75,18 +74,17 @@ public final class KubernetesFileReader extends AbstractFileReader {
try {
client = getKubernetesClient();
} catch (IOException e) {
- log.error("Get k8s client error: ", e);
+ log.error("get k8s client error: ", e);
}
fileReaderOperator.metadata = getK8sMetadata(fileReaderOperator.jobConf);
}
// TODO only support default config in the POD
private KubernetesClient getKubernetesClient() throws IOException {
- String ip = System.getProperty(KUBERNETES_SERVICE_HOST);
- String port = System.getProperty(KUBERNETES_SERVICE_PORT);
- if (Objects.isNull(ip) || Objects.isNull(port)) {
- log.warn("k8s env ip and port is null,can not connect k8s master");
- return null;
+ String ip = System.getenv(KUBERNETES_SERVICE_HOST);
+ String port = System.getenv(KUBERNETES_SERVICE_PORT);
+ if (Objects.isNull(ip) && Objects.isNull(port)) {
+ throw new RuntimeException("get k8s client error,k8s env ip and port is null");
}
String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
Config config = new ConfigBuilder()
@@ -117,26 +115,27 @@ public final class KubernetesFileReader extends AbstractFileReader {
return null;
}
Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+ log.info("k8s information size:{}", k8sInfo.size());
+ Map<String, String> metadata = new HashMap<>();
if (k8sInfo.isEmpty()) {
- return null;
- }
- List<String> namespaces = MetaDataUtils.getNamespace(jobConf);
- if (Objects.isNull(namespaces) || namespaces.isEmpty()) {
- return null;
+ return metadata;
}
- if (!namespaces.contains(k8sInfo.get(NAMESPACE))) {
- return null;
+
+ metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
+ metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME));
+ metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
+ metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
+
+ PodResource podResource = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
+ .withName(k8sInfo.get(POD_NAME));
+ if (Objects.isNull(podResource)) {
+ return metadata;
}
- Pod pod = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).withName(k8sInfo.get(POD_NAME)).get();
+ Pod pod = podResource.get();
PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
.withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
- Map<String, String> metadata = new HashMap<>();
podList.getItems().forEach(data -> {
if (data.equals(pod)) {
- metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
- metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME));
- metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
- metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
metadata.put(METADATA_POD_LABEL, GSON.toJson(pod.getMetadata().getLabels()));
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index 7c3201f58..b3e76d201 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -44,6 +44,8 @@ public class MetaDataUtils {
private static final Gson GSON = new Gson();
+ private static final String LOG_MARK = ".log";
+
/**
* standard log for k8s
*
@@ -59,8 +61,17 @@ public class MetaDataUtils {
podInf.put(POD_NAME, str[0]);
podInf.put(NAMESPACE, str[1]);
String[] containerInfo = str[2].split(CommonConstants.DELIMITER_HYPHEN);
- podInf.put(CONTAINER_NAME, containerInfo[0]);
- podInf.put(CONTAINER_ID, containerInfo[1]);
+ String containerId = containerInfo[containerInfo.length - 1].replace(LOG_MARK, "");
+ String containerName = "";
+ for (int i = 0; i < containerInfo.length - 1; i++) {
+ if (i == containerInfo.length - 2) {
+ containerName = containerName.concat(containerInfo[i]);
+ break;
+ }
+ containerName = containerName.concat(containerInfo[i]).concat(CommonConstants.DELIMITER_HYPHEN);
+ }
+ podInf.put(CONTAINER_NAME, containerName);
+ podInf.put(CONTAINER_ID, containerId);
return podInf;
}
@@ -71,7 +82,7 @@ public class MetaDataUtils {
*/
public static Map<String, String> getPodLabels(JobProfile jobProfile) {
if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
- return null;
+ return new HashMap<>();
}
String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
Type type = new TypeToken<HashMap<Integer, String>>() {