You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/24 06:33:44 UTC

[inlong] 03/03: [INLONG-5663][Agent] To update data structure for k8s log (#5664)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f1346a65726fdf38d3edde41193d09ee2b39a130
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Wed Aug 24 11:37:23 2022 +0800

    [INLONG-5663][Agent] To update data structure for k8s log (#5664)
---
 .../inlong/agent/constant/KubernetesConstants.java | 19 +++++------
 .../sources/reader/file/FileReaderOperator.java    |  6 ++--
 .../sources/reader/file/KubernetesFileReader.java  | 39 +++++++++++-----------
 .../inlong/agent/plugin/utils/MetaDataUtils.java   | 17 ++++++++--
 4 files changed, 45 insertions(+), 36 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
index e71ae4bae..fb2b41c54 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java
@@ -33,14 +33,13 @@ public class KubernetesConstants {
     public static final String CONTAINER_ID = "container.id";
 
     // k8s metadata
-    public static final String METADATA_CONTAINER_ID = "_container_id_";
-    public static final String METADATA_CONTAINER_NAME = "_container_name_";
-    public static final String METADATA_NAMESPACE = "_namespace_";
-    public static final String METADATA_POD_UID = "_pod_uid_";
-    public static final String METADATA_POD_NAME = "_pod_name_";
-    public static final String METADATA_POD_LABEL = "_pod_label_";
-    public static final String DATA_CONTENT = "_content_";
-    public static final String DATA_CONTENT_TIME = "_time_";
-
-
+    public static final String METADATA_CONTAINER_ID = "__container_id__";
+    public static final String METADATA_CONTAINER_NAME = "__container_name__";
+    public static final String METADATA_NAMESPACE = "__namespace__";
+    public static final String METADATA_POD_UID = "__pod_uid__";
+    public static final String METADATA_POD_NAME = "__pod_name__";
+    public static final String METADATA_POD_LABEL = "__pod_label__";
+    public static final String DATA_CONTENT = "__content__";
+    public static final String DATA_CONTENT_TIME = "__LogTime__";
+    
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index f3d7d91bc..927d06daf 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -178,7 +178,7 @@ public class FileReaderOperator extends AbstractReader {
                     fileReader.getData();
                     fileReader.mergeData(this);
                 } catch (Exception ex) {
-                    LOGGER.error("read file data error:{}", ex.getMessage());
+                    LOGGER.error("read file data error", ex);
                 }
             });
             if (Objects.nonNull(stream)) {
@@ -211,14 +211,14 @@ public class FileReaderOperator extends AbstractReader {
 
     public List<AbstractFileReader> getInstance(FileReaderOperator reader, JobProfile jobConf) {
         List<AbstractFileReader> fileReaders = new ArrayList<>();
-        fileReaders.add(new TextFileReader(this));
+        fileReaders.add(new TextFileReader(reader));
         if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) {
             return fileReaders;
         }
         String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
         Arrays.stream(env).forEach(data -> {
             if (data.equalsIgnoreCase(KUBERNETES)) {
-                fileReaders.add(new KubernetesFileReader(this));
+                fileReaders.add(new KubernetesFileReader(reader));
             }
         });
         return fileReaders;
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
index 04331278d..026bdc805 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -36,7 +36,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -75,18 +74,17 @@ public final class KubernetesFileReader extends AbstractFileReader {
         try {
             client = getKubernetesClient();
         } catch (IOException e) {
-            log.error("Get k8s client error: ", e);
+            log.error("get k8s client error: ", e);
         }
         fileReaderOperator.metadata = getK8sMetadata(fileReaderOperator.jobConf);
     }
 
     // TODO only support default config in the POD
     private KubernetesClient getKubernetesClient() throws IOException {
-        String ip = System.getProperty(KUBERNETES_SERVICE_HOST);
-        String port = System.getProperty(KUBERNETES_SERVICE_PORT);
-        if (Objects.isNull(ip) || Objects.isNull(port)) {
-            log.warn("k8s env ip and port is null,can not connect k8s master");
-            return null;
+        String ip = System.getenv(KUBERNETES_SERVICE_HOST);
+        String port = System.getenv(KUBERNETES_SERVICE_PORT);
+        if (Objects.isNull(ip) && Objects.isNull(port)) {
+            throw new RuntimeException("get k8s client error,k8s env ip and port is null");
         }
         String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
         Config config = new ConfigBuilder()
@@ -117,26 +115,27 @@ public final class KubernetesFileReader extends AbstractFileReader {
             return null;
         }
         Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+        log.info("k8s information size:{}", k8sInfo.size());
+        Map<String, String> metadata = new HashMap<>();
         if (k8sInfo.isEmpty()) {
-            return null;
-        }
-        List<String> namespaces = MetaDataUtils.getNamespace(jobConf);
-        if (Objects.isNull(namespaces) || namespaces.isEmpty()) {
-            return null;
+            return metadata;
         }
-        if (!namespaces.contains(k8sInfo.get(NAMESPACE))) {
-            return null;
+
+        metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
+        metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME));
+        metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
+        metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
+
+        PodResource podResource = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
+                .withName(k8sInfo.get(POD_NAME));
+        if (Objects.isNull(podResource)) {
+            return metadata;
         }
-        Pod pod = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).withName(k8sInfo.get(POD_NAME)).get();
+        Pod pod = podResource.get();
         PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
                 .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
-        Map<String, String> metadata = new HashMap<>();
         podList.getItems().forEach(data -> {
             if (data.equals(pod)) {
-                metadata.put(METADATA_NAMESPACE, k8sInfo.get(NAMESPACE));
-                metadata.put(METADATA_CONTAINER_NAME, k8sInfo.get(CONTAINER_NAME));
-                metadata.put(METADATA_CONTAINER_ID, k8sInfo.get(CONTAINER_ID));
-                metadata.put(METADATA_POD_NAME, k8sInfo.get(POD_NAME));
                 metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
                 metadata.put(METADATA_POD_LABEL, GSON.toJson(pod.getMetadata().getLabels()));
             }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index 7c3201f58..b3e76d201 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -44,6 +44,8 @@ public class MetaDataUtils {
 
     private static final Gson GSON = new Gson();
 
+    private static final String LOG_MARK = ".log";
+
     /**
      * standard log for k8s
      *
@@ -59,8 +61,17 @@ public class MetaDataUtils {
         podInf.put(POD_NAME, str[0]);
         podInf.put(NAMESPACE, str[1]);
         String[] containerInfo = str[2].split(CommonConstants.DELIMITER_HYPHEN);
-        podInf.put(CONTAINER_NAME, containerInfo[0]);
-        podInf.put(CONTAINER_ID, containerInfo[1]);
+        String containerId = containerInfo[containerInfo.length - 1].replace(LOG_MARK, "");
+        String containerName = "";
+        for (int i = 0; i < containerInfo.length - 1; i++) {
+            if (i == containerInfo.length - 2) {
+                containerName = containerName.concat(containerInfo[i]);
+                break;
+            }
+            containerName = containerName.concat(containerInfo[i]).concat(CommonConstants.DELIMITER_HYPHEN);
+        }
+        podInf.put(CONTAINER_NAME, containerName);
+        podInf.put(CONTAINER_ID, containerId);
         return podInf;
     }
 
@@ -71,7 +82,7 @@ public class MetaDataUtils {
      */
     public static Map<String, String> getPodLabels(JobProfile jobProfile) {
         if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
-            return null;
+            return new HashMap<>();
         }
         String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
         Type type = new TypeToken<HashMap<Integer, String>>() {