You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/11 07:06:20 UTC

[inlong] branch master updated: [INLONG-6131][Agent] Support file filtering by condition (#6132)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d676a887 [INLONG-6131][Agent] Support file filtering by condition (#6132)
5d676a887 is described below

commit 5d676a8872ef80c23be694d0f37005df6eb61271
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Tue Oct 11 15:06:16 2022 +0800

    [INLONG-6131][Agent] Support file filtering by condition (#6132)
---
 .../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +--
 .../apache/inlong/agent/pojo/JobProfileDto.java    |  6 +-
 .../agent/plugin/sources/TextFileSource.java       |  2 +
 .../sources/reader/file/KubernetesFileReader.java  | 32 +--------
 .../inlong/agent/plugin/utils/FileDataUtils.java   | 78 +++++++++++++++++++++-
 .../inlong/agent/plugin/utils/MetaDataUtils.java   |  2 +-
 .../inlong/agent/plugin/utils/PluginUtils.java     | 27 ++++++++
 7 files changed, 120 insertions(+), 38 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 436dc3dd5..52d78ab72 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -41,13 +41,16 @@ public class FileJob {
 
     private String envList;
 
-    private List<Map<String, String>> metaFields;
+    // JSON string, the content format is List<Map<String, String>>
+    private String metaFields;
 
     private String dataSeparator;
 
-    private Map<String, String> filterMetaByLabels;
+    // JSON string, the content format is Map<String,string>
+    private String filterMetaByLabels;
 
-    private Map<String, Object> properties;
+    // JSON string, the content format is Map<String,Object>
+    private String properties;
 
     // Monitor interval for file
     private Long monitorInterval;
@@ -125,7 +128,7 @@ public class FileJob {
 
         // Monitor switch, 1 true and 0 false
         private Integer monitorStatus;
-        
+
         // Monitor expire time and the time in milliseconds
         private Long monitorExpire;
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index b0c356732..9ca2b5138 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -111,7 +111,7 @@ public class JobProfileDto {
         fileJob.setCollectType(fileJobTaskConfig.getCollectType());
         fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
         fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
-        fileJob.setProperties(fileJobTaskConfig.getProperties());
+        fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties()));
         if (fileJobTaskConfig.getTimeOffset() != null) {
             fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
         }
@@ -131,11 +131,11 @@ public class JobProfileDto {
         }
 
         if (null != fileJobTaskConfig.getMetaFields()) {
-            fileJob.setMetaFields(fileJob.getMetaFields());
+            fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields()));
         }
 
         if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
-            fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels());
+            fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels()));
         }
 
         if (null != fileJobTaskConfig.getMonitorInterval()) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 6ca88df7c..840f83055 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
+import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ public class TextFileSource extends AbstractSource {
     public List<Reader> split(JobProfile jobConf) {
         super.init(jobConf);
         Collection<File> allFiles = PluginUtils.findSuitFiles(jobConf);
+        allFiles = FileDataUtils.filterFile(allFiles, jobConf);
         List<Reader> result = new ArrayList<>();
         String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
         LOGGER.info("file splits size: {}", allFiles.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
index 8ea052bc1..66680fac6 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -20,30 +20,22 @@ package org.apache.inlong.agent.plugin.sources.reader.file;
 import com.google.gson.Gson;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import io.fabric8.kubernetes.client.dsl.MixedOperation;
 import io.fabric8.kubernetes.client.dsl.PodResource;
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
 import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
-import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
-import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
-import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
 import static org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_ID;
 import static org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_NAME;
 import static org.apache.inlong.agent.constant.KubernetesConstants.METADATA_NAMESPACE;
@@ -72,30 +64,13 @@ public final class KubernetesFileReader extends AbstractFileReader {
             return;
         }
         try {
-            client = getKubernetesClient();
+            client = PluginUtils.getKubernetesClient();
         } catch (IOException e) {
             log.error("get k8s client error: ", e);
         }
         fileReaderOperator.metadata = getK8sMetadata(fileReaderOperator.jobConf);
     }
 
-    // TODO only support default config in the POD
-    private KubernetesClient getKubernetesClient() throws IOException {
-        String ip = System.getenv(KUBERNETES_SERVICE_HOST);
-        String port = System.getenv(KUBERNETES_SERVICE_PORT);
-        if (Objects.isNull(ip) && Objects.isNull(port)) {
-            throw new RuntimeException("get k8s client error,k8s env ip and port is null");
-        }
-        String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
-        Config config = new ConfigBuilder()
-                .withMasterUrl(maserUrl)
-                .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
-                .withOauthToken(new String(
-                        Files.readAllBytes((new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
-                .build();
-        return new KubernetesClientBuilder().withConfig(config).build();
-    }
-
     /**
      * get PODS of kubernetes information
      */
@@ -132,8 +107,7 @@ public final class KubernetesFileReader extends AbstractFileReader {
             return metadata;
         }
         Pod pod = podResource.get();
-        PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
-                .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
+        PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).list();
         podList.getItems().forEach(data -> {
             if (data.equals(pod)) {
                 metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
index 381ed6745..bbde6f462 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
@@ -20,11 +20,26 @@ package org.apache.inlong.agent.plugin.utils;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.PodResource;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
+import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
 
 /**
  * File job utils
@@ -32,7 +47,7 @@ import java.util.Map;
 public class FileDataUtils {
 
     public static final String KUBERNETES_LOG = "log";
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileDataUtils.class);
     private static final Gson GSON = new Gson();
 
     /**
@@ -65,4 +80,65 @@ public class FileDataUtils {
         return isJson;
     }
 
+    /**
+     * Filter file by conditions
+     */
+    public static Collection<File> filterFile(Collection<File> allFiles, JobProfile jobConf) {
+        // filter file by labels 
+        Collection<File> files = null;
+        try {
+            files = filterByLabels(allFiles, jobConf);
+        } catch (IOException e) {
+            LOGGER.error("filter file error: ", e);
+        }
+        return files;
+    }
+
+    /**
+     * Filter file by labels if standard log for k8s
+     */
+    private static Collection<File> filterByLabels(Collection<File> allFiles, JobProfile jobConf) throws IOException {
+        Map<String, String> labelsMap = MetaDataUtils.getPodLabels(jobConf);
+        if (labelsMap.isEmpty()) {
+            return allFiles;
+        }
+        Collection<File> standardK8sLogFiles = new ArrayList<>();
+        Iterator<File> iterator = allFiles.iterator();
+        KubernetesClient client = PluginUtils.getKubernetesClient();
+        while (iterator.hasNext()) {
+            File file = getFile(labelsMap, iterator.next(), client);
+            if (file == null) {
+                continue;
+            }
+            standardK8sLogFiles.add(file);
+        }
+        return standardK8sLogFiles;
+    }
+
+    private static File getFile(Map<String, String> labelsMap, File file, KubernetesClient client) {
+        Map<String, String> logInfo = MetaDataUtils.getLogInfo(file.getName());
+        if (logInfo.isEmpty()) {
+            return null;
+        }
+        PodResource podResource = client.pods().inNamespace(logInfo.get(NAMESPACE))
+                .withName(logInfo.get(POD_NAME));
+        if (Objects.isNull(podResource)) {
+            return null;
+        }
+        Pod pod = podResource.get();
+        Map<String, String> podLabels = pod.getMetadata().getLabels();
+        boolean filterLabelStatus = false;
+        for (String key : labelsMap.keySet()) {
+            if (podLabels.containsKey(key) && labelsMap.get(key).contains(podLabels.get(key))) {
+                filterLabelStatus = true;
+                continue;
+            }
+            if (podLabels.containsKey(key) && !labelsMap.get(key).contains(podLabels.get(key))) {
+                filterLabelStatus = false;
+                break;
+            }
+        }
+        return filterLabelStatus ? file : null;
+    }
+
 }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index 379e2dc32..a1ce76db8 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -93,7 +93,7 @@ public class MetaDataUtils {
             return new HashMap<>();
         }
         String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
-        Type type = new TypeToken<HashMap<Integer, String>>() {
+        Type type = new TypeToken<HashMap<String, String>>() {
         }.getType();
         return GSON.fromJson(labels, type);
     }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 1fc8a68a9..afb122c68 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -17,9 +17,14 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.trigger.PathPattern;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -31,10 +36,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Objects;
 
 import static org.apache.inlong.agent.constant.CommonConstants.AGENT_COLON;
 import static org.apache.inlong.agent.constant.CommonConstants.AGENT_NIX_OS;
@@ -45,6 +52,9 @@ import static org.apache.inlong.agent.constant.CommonConstants.FILE_MAX_NUM;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
+import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
 
 /**
  * Utils for plugin package.
@@ -150,4 +160,21 @@ public class PluginUtils {
         }
     }
 
+    // TODO only support default config in the POD
+    public static KubernetesClient getKubernetesClient() throws IOException {
+        String ip = System.getenv(KUBERNETES_SERVICE_HOST);
+        String port = System.getenv(KUBERNETES_SERVICE_PORT);
+        if (Objects.isNull(ip) && Objects.isNull(port)) {
+            throw new RuntimeException("get k8s client error,k8s env ip and port is null");
+        }
+        String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
+        Config config = new ConfigBuilder()
+                .withMasterUrl(maserUrl)
+                .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
+                .withOauthToken(new String(
+                        Files.readAllBytes((new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
+                .build();
+        return new KubernetesClientBuilder().withConfig(config).build();
+    }
+
 }