You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/08/12 10:12:52 UTC

[GitHub] [inlong] GanfengTan opened a new pull request, #5534: [INLONG-5533][Agent] Support Structured output in the kubernetes

GanfengTan opened a new pull request, #5534:
URL: https://github.com/apache/inlong/pull/5534

   File collects in the k8s, data and metadata of k8s need to connect a message.
   
   - Fixes #5533 
   
   ### Motivation
   File data and k8s metadata to connect a message.
   {
   "_content_":"data",
   "_time_":"time",
   "_pod_name_":"test"
   }
   
   ### Modifications
   
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5534: [INLONG-5533][Agent] Support Structured output in the kubernetes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944320716


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java:
##########
@@ -60,22 +71,30 @@ public void getData() {
         if (Objects.nonNull(client) && Objects.nonNull(fileReaderOperator.metadata)) {
             return;
         }
-        client = getKubernetesClient();
-        Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
-        ObjectMeta objectMeta = getPodMetadata(k8sInfo.get(NAMESPACE), k8sInfo.get(POD_NAME));
-        fileReaderOperator.metadata = Objects.nonNull(objectMeta) ? objectMeta.toString() : null;
+        try {
+            client = getKubernetesClient();
+        } catch (IOException e) {
+            log.error("Get k8s client error: {}", e.getMessage());

Review Comment:
   It is recommended to print all stack traces in error logs.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {

Review Comment:
   For public static method, it is recommended to use Java doc.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {
+        if (!StringUtils.isNoneBlank(log)) {
+            return null;
+        }
+        Gson gson = new Gson();

Review Comment:
   Use a global static Gson object.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java:
##########
@@ -53,10 +62,59 @@ public static Map<String, String> getLogInfo(String fileName) {
         return podInf;
     }
 
-    public static String concatString(String str1, String str2) {
-        if (!StringUtils.isNoneBlank(str2)) {
-            return str1;
+    /**
+     * standard log for k8s
+     *
+     * get labels of pod
+     */
+    public static Map<String, String> getPodLabels(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+            return null;
+        }
+        String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();
+        return gson.fromJson(labels, type);
+    }
+
+    public static List<String> getNamespace(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
+        }
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();

Review Comment:
   Use a global static Gson object.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {

Review Comment:
   For public static method, it is recommended to use Java doc.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java:
##########
@@ -92,11 +111,36 @@ public PodList getPods() {
     /**
      * get pod metadata by namespace and pod name
      */
-    public ObjectMeta getPodMetadata(String namespace, String podName) {
-        List<ObjectMeta> objectMetas = client.pods().list().getItems().stream().map(Pod::getMetadata)
-                .filter(data -> data.getName().equalsIgnoreCase(podName) && data.getNamespace()
-                        .equalsIgnoreCase(namespace)).collect(Collectors.toList());
-        return CollectionUtils.isNotEmpty(objectMetas) ? objectMetas.get(0) : null;
+    public Map<String, String> getK8sMetadata(JobProfile jobConf) {
+        if (Objects.isNull(jobConf)) {
+            return null;
+        }
+        Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+        if (k8sInfo.isEmpty()) {
+            return null;
+        }
+        List<String> namespaces = MetaDataUtils.getNamespace(jobConf);
+        if (Objects.isNull(namespaces) || namespaces.isEmpty()) {
+            return null;
+        }
+        if (!namespaces.contains(k8sInfo.get(NAMESPACE))) {
+            return null;
+        }
+        Pod pod = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).withName(k8sInfo.get(POD_NAME)).get();
+        PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
+                .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
+        Gson gson = new Gson();

Review Comment:
   It is recommended to use a global static Gson object.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java:
##########
@@ -53,10 +62,59 @@ public static Map<String, String> getLogInfo(String fileName) {
         return podInf;
     }
 
-    public static String concatString(String str1, String str2) {
-        if (!StringUtils.isNoneBlank(str2)) {
-            return str1;
+    /**
+     * standard log for k8s
+     *
+     * get labels of pod
+     */
+    public static Map<String, String> getPodLabels(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+            return null;
+        }
+        String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();
+        return gson.fromJson(labels, type);
+    }
+
+    public static List<String> getNamespace(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
+        }
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();
+        Map<String, String> properties = gson.fromJson(property, type);
+        return properties.keySet().stream().map(data -> {
+            if (data.contains(NAMESPACE)) {
+                return properties.get(data);
+            }
+            return null;
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+
+    /**
+     * standard log for k8s
+     *
+     * get name of pod
+     */
+    public static String getPodName(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
         }
-        return str1.concat("\n").concat(str2);
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();

Review Comment:
   Use a global static Gson object.



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java:
##########
@@ -35,9 +44,26 @@ public void mergeData(FileReaderOperator fileReaderOperator) {
         if (null == fileReaderOperator.metadata) {
             return;
         }
-
         List<String> lines = fileReaderOperator.stream.collect(Collectors.toList());
-        lines.forEach(data -> data = MetaDataUtils.concatString(data, fileReaderOperator.metadata));
+        if (fileReaderOperator.jobConf.hasKey(JOB_FILE_CONTENT_COLLECT_TYPE)) {
+            long timestamp = System.currentTimeMillis();
+            Gson gson = new Gson();

Review Comment:
   It is recommended to use a global static Gson object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944356133


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944357229


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {
+        if (!StringUtils.isNoneBlank(log)) {
+            return null;
+        }
+        Gson gson = new Gson();

Review Comment:
   done



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java:
##########
@@ -53,10 +62,59 @@ public static Map<String, String> getLogInfo(String fileName) {
         return podInf;
     }
 
-    public static String concatString(String str1, String str2) {
-        if (!StringUtils.isNoneBlank(str2)) {
-            return str1;
+    /**
+     * standard log for k8s
+     *
+     * get labels of pod
+     */
+    public static Map<String, String> getPodLabels(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+            return null;
+        }
+        String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();
+        return gson.fromJson(labels, type);
+    }
+
+    public static List<String> getNamespace(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
+        }
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944356405


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944321807


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {

Review Comment:
   For public static method, it is recommended to use Java doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944354038


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java:
##########
@@ -35,9 +44,26 @@ public void mergeData(FileReaderOperator fileReaderOperator) {
         if (null == fileReaderOperator.metadata) {
             return;
         }
-
         List<String> lines = fileReaderOperator.stream.collect(Collectors.toList());
-        lines.forEach(data -> data = MetaDataUtils.concatString(data, fileReaderOperator.metadata));
+        if (fileReaderOperator.jobConf.hasKey(JOB_FILE_CONTENT_COLLECT_TYPE)) {
+            long timestamp = System.currentTimeMillis();
+            Gson gson = new Gson();

Review Comment:
   done



##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java:
##########
@@ -60,22 +71,30 @@ public void getData() {
         if (Objects.nonNull(client) && Objects.nonNull(fileReaderOperator.metadata)) {
             return;
         }
-        client = getKubernetesClient();
-        Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
-        ObjectMeta objectMeta = getPodMetadata(k8sInfo.get(NAMESPACE), k8sInfo.get(POD_NAME));
-        fileReaderOperator.metadata = Objects.nonNull(objectMeta) ? objectMeta.toString() : null;
+        try {
+            client = getKubernetesClient();
+        } catch (IOException e) {
+            log.error("Get k8s client error: {}", e.getMessage());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944355431


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java:
##########
@@ -92,11 +111,36 @@ public PodList getPods() {
     /**
      * get pod metadata by namespace and pod name
      */
-    public ObjectMeta getPodMetadata(String namespace, String podName) {
-        List<ObjectMeta> objectMetas = client.pods().list().getItems().stream().map(Pod::getMetadata)
-                .filter(data -> data.getName().equalsIgnoreCase(podName) && data.getNamespace()
-                        .equalsIgnoreCase(namespace)).collect(Collectors.toList());
-        return CollectionUtils.isNotEmpty(objectMetas) ? objectMetas.get(0) : null;
+    public Map<String, String> getK8sMetadata(JobProfile jobConf) {
+        if (Objects.isNull(jobConf)) {
+            return null;
+        }
+        Map<String, String> k8sInfo = MetaDataUtils.getLogInfo(fileReaderOperator.file.getName());
+        if (k8sInfo.isEmpty()) {
+            return null;
+        }
+        List<String> namespaces = MetaDataUtils.getNamespace(jobConf);
+        if (Objects.isNull(namespaces) || namespaces.isEmpty()) {
+            return null;
+        }
+        if (!namespaces.contains(k8sInfo.get(NAMESPACE))) {
+            return null;
+        }
+        Pod pod = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).withName(k8sInfo.get(POD_NAME)).get();
+        PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
+                .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
+        Gson gson = new Gson();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944357496


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java:
##########
@@ -53,10 +62,59 @@ public static Map<String, String> getLogInfo(String fileName) {
         return podInf;
     }
 
-    public static String concatString(String str1, String str2) {
-        if (!StringUtils.isNoneBlank(str2)) {
-            return str1;
+    /**
+     * standard log for k8s
+     *
+     * get labels of pod
+     */
+    public static Map<String, String> getPodLabels(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
+            return null;
+        }
+        String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();
+        return gson.fromJson(labels, type);
+    }
+
+    public static List<String> getNamespace(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
+        }
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();
+        Map<String, String> properties = gson.fromJson(property, type);
+        return properties.keySet().stream().map(data -> {
+            if (data.contains(NAMESPACE)) {
+                return properties.get(data);
+            }
+            return null;
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+
+    /**
+     * standard log for k8s
+     *
+     * get name of pod
+     */
+    public static String getPodName(JobProfile jobProfile) {
+        if (Objects.isNull(jobProfile) || !jobProfile.hasKey(JOB_FILE_PROPERTIES)) {
+            return null;
         }
-        return str1.concat("\n").concat(str2);
+        String property = jobProfile.get(JOB_FILE_PROPERTIES);
+        Type type = new TypeToken<HashMap<Integer, String>>() {
+        }.getType();
+        Gson gson = new Gson();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] GanfengTan commented on a diff in pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
GanfengTan commented on code in PR #5534:
URL: https://github.com/apache/inlong/pull/5534#discussion_r944356405


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * File job utils
+ */
+public class FileDataUtils {
+
+    public static final  String KUBERNETES_LOG = "log";
+
+    // Get standard log for k8s
+    public static String getK8sJsonLog(String log) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #5534: [INLONG-5533][Agent] Support structured output in the Kubernetes

Posted by GitBox <gi...@apache.org>.
healchow merged PR #5534:
URL: https://github.com/apache/inlong/pull/5534


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org