You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/11 07:06:20 UTC
[inlong] branch master updated: [INLONG-6131][Agent] Support file filtering by condition (#6132)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5d676a887 [INLONG-6131][Agent] Support file filtering by condition (#6132)
5d676a887 is described below
commit 5d676a8872ef80c23be694d0f37005df6eb61271
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Tue Oct 11 15:06:16 2022 +0800
[INLONG-6131][Agent] Support file filtering by condition (#6132)
---
.../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +--
.../apache/inlong/agent/pojo/JobProfileDto.java | 6 +-
.../agent/plugin/sources/TextFileSource.java | 2 +
.../sources/reader/file/KubernetesFileReader.java | 32 +--------
.../inlong/agent/plugin/utils/FileDataUtils.java | 78 +++++++++++++++++++++-
.../inlong/agent/plugin/utils/MetaDataUtils.java | 2 +-
.../inlong/agent/plugin/utils/PluginUtils.java | 27 ++++++++
7 files changed, 120 insertions(+), 38 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 436dc3dd5..52d78ab72 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -41,13 +41,16 @@ public class FileJob {
private String envList;
- private List<Map<String, String>> metaFields;
+ // JSON string, the content format is List<Map<String, String>>
+ private String metaFields;
private String dataSeparator;
- private Map<String, String> filterMetaByLabels;
+ // JSON string, the content format is Map<String,string>
+ private String filterMetaByLabels;
- private Map<String, Object> properties;
+ // JSON string, the content format is Map<String,Object>
+ private String properties;
// Monitor interval for file
private Long monitorInterval;
@@ -125,7 +128,7 @@ public class FileJob {
// Monitor switch, 1 true and 0 false
private Integer monitorStatus;
-
+
// Monitor expire time and the time in milliseconds
private Long monitorExpire;
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index b0c356732..9ca2b5138 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -111,7 +111,7 @@ public class JobProfileDto {
fileJob.setCollectType(fileJobTaskConfig.getCollectType());
fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
- fileJob.setProperties(fileJobTaskConfig.getProperties());
+ fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties()));
if (fileJobTaskConfig.getTimeOffset() != null) {
fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
}
@@ -131,11 +131,11 @@ public class JobProfileDto {
}
if (null != fileJobTaskConfig.getMetaFields()) {
- fileJob.setMetaFields(fileJob.getMetaFields());
+ fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields()));
}
if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
- fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels());
+ fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels()));
}
if (null != fileJobTaskConfig.getMonitorInterval()) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 6ca88df7c..840f83055 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
+import org.apache.inlong.agent.plugin.utils.FileDataUtils;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ public class TextFileSource extends AbstractSource {
public List<Reader> split(JobProfile jobConf) {
super.init(jobConf);
Collection<File> allFiles = PluginUtils.findSuitFiles(jobConf);
+ allFiles = FileDataUtils.filterFile(allFiles, jobConf);
List<Reader> result = new ArrayList<>();
String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
LOGGER.info("file splits size: {}", allFiles.size());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
index 8ea052bc1..66680fac6 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/KubernetesFileReader.java
@@ -20,30 +20,22 @@ package org.apache.inlong.agent.plugin.sources.reader.file;
import com.google.gson.Gson;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
+import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID;
import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME;
-import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
-import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
-import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
import static org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_ID;
import static org.apache.inlong.agent.constant.KubernetesConstants.METADATA_CONTAINER_NAME;
import static org.apache.inlong.agent.constant.KubernetesConstants.METADATA_NAMESPACE;
@@ -72,30 +64,13 @@ public final class KubernetesFileReader extends AbstractFileReader {
return;
}
try {
- client = getKubernetesClient();
+ client = PluginUtils.getKubernetesClient();
} catch (IOException e) {
log.error("get k8s client error: ", e);
}
fileReaderOperator.metadata = getK8sMetadata(fileReaderOperator.jobConf);
}
- // TODO only support default config in the POD
- private KubernetesClient getKubernetesClient() throws IOException {
- String ip = System.getenv(KUBERNETES_SERVICE_HOST);
- String port = System.getenv(KUBERNETES_SERVICE_PORT);
- if (Objects.isNull(ip) && Objects.isNull(port)) {
- throw new RuntimeException("get k8s client error,k8s env ip and port is null");
- }
- String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
- Config config = new ConfigBuilder()
- .withMasterUrl(maserUrl)
- .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
- .withOauthToken(new String(
- Files.readAllBytes((new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
- .build();
- return new KubernetesClientBuilder().withConfig(config).build();
- }
-
/**
* get PODS of kubernetes information
*/
@@ -132,8 +107,7 @@ public final class KubernetesFileReader extends AbstractFileReader {
return metadata;
}
Pod pod = podResource.get();
- PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE))
- .withLabels(MetaDataUtils.getPodLabels(jobConf)).list();
+ PodList podList = client.pods().inNamespace(k8sInfo.get(NAMESPACE)).list();
podList.getItems().forEach(data -> {
if (data.equals(pod)) {
metadata.put(METADATA_POD_UID, pod.getMetadata().getUid());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
index 381ed6745..bbde6f462 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
@@ -20,11 +20,26 @@ package org.apache.inlong.agent.plugin.utils;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.PodResource;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE;
+import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME;
/**
* File job utils
@@ -32,7 +47,7 @@ import java.util.Map;
public class FileDataUtils {
public static final String KUBERNETES_LOG = "log";
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileDataUtils.class);
private static final Gson GSON = new Gson();
/**
@@ -65,4 +80,65 @@ public class FileDataUtils {
return isJson;
}
+ /**
+ * Filter file by conditions
+ */
+ public static Collection<File> filterFile(Collection<File> allFiles, JobProfile jobConf) {
+ // filter file by labels
+ Collection<File> files = null;
+ try {
+ files = filterByLabels(allFiles, jobConf);
+ } catch (IOException e) {
+ LOGGER.error("filter file error: ", e);
+ }
+ return files;
+ }
+
+ /**
+ * Filter file by labels if standard log for k8s
+ */
+ private static Collection<File> filterByLabels(Collection<File> allFiles, JobProfile jobConf) throws IOException {
+ Map<String, String> labelsMap = MetaDataUtils.getPodLabels(jobConf);
+ if (labelsMap.isEmpty()) {
+ return allFiles;
+ }
+ Collection<File> standardK8sLogFiles = new ArrayList<>();
+ Iterator<File> iterator = allFiles.iterator();
+ KubernetesClient client = PluginUtils.getKubernetesClient();
+ while (iterator.hasNext()) {
+ File file = getFile(labelsMap, iterator.next(), client);
+ if (file == null) {
+ continue;
+ }
+ standardK8sLogFiles.add(file);
+ }
+ return standardK8sLogFiles;
+ }
+
+ private static File getFile(Map<String, String> labelsMap, File file, KubernetesClient client) {
+ Map<String, String> logInfo = MetaDataUtils.getLogInfo(file.getName());
+ if (logInfo.isEmpty()) {
+ return null;
+ }
+ PodResource podResource = client.pods().inNamespace(logInfo.get(NAMESPACE))
+ .withName(logInfo.get(POD_NAME));
+ if (Objects.isNull(podResource)) {
+ return null;
+ }
+ Pod pod = podResource.get();
+ Map<String, String> podLabels = pod.getMetadata().getLabels();
+ boolean filterLabelStatus = false;
+ for (String key : labelsMap.keySet()) {
+ if (podLabels.containsKey(key) && labelsMap.get(key).contains(podLabels.get(key))) {
+ filterLabelStatus = true;
+ continue;
+ }
+ if (podLabels.containsKey(key) && !labelsMap.get(key).contains(podLabels.get(key))) {
+ filterLabelStatus = false;
+ break;
+ }
+ }
+ return filterLabelStatus ? file : null;
+ }
+
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index 379e2dc32..a1ce76db8 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -93,7 +93,7 @@ public class MetaDataUtils {
return new HashMap<>();
}
String labels = jobProfile.get(JOB_FILE_META_FILTER_BY_LABELS);
- Type type = new TypeToken<HashMap<Integer, String>>() {
+ Type type = new TypeToken<HashMap<String, String>>() {
}.getType();
return GSON.fromJson(labels, type);
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 1fc8a68a9..afb122c68 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -17,9 +17,14 @@
package org.apache.inlong.agent.plugin.utils;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.plugin.trigger.PathPattern;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -31,10 +36,12 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.List;
+import java.util.Objects;
import static org.apache.inlong.agent.constant.CommonConstants.AGENT_COLON;
import static org.apache.inlong.agent.constant.CommonConstants.AGENT_NIX_OS;
@@ -45,6 +52,9 @@ import static org.apache.inlong.agent.constant.CommonConstants.FILE_MAX_NUM;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
+import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
/**
* Utils for plugin package.
@@ -150,4 +160,21 @@ public class PluginUtils {
}
}
+ // TODO only support default config in the POD
+ public static KubernetesClient getKubernetesClient() throws IOException {
+ String ip = System.getenv(KUBERNETES_SERVICE_HOST);
+ String port = System.getenv(KUBERNETES_SERVICE_PORT);
+ if (Objects.isNull(ip) && Objects.isNull(port)) {
+ throw new RuntimeException("get k8s client error,k8s env ip and port is null");
+ }
+ String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port);
+ Config config = new ConfigBuilder()
+ .withMasterUrl(maserUrl)
+ .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)
+ .withOauthToken(new String(
+ Files.readAllBytes((new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath())))
+ .build();
+ return new KubernetesClientBuilder().withConfig(config).build();
+ }
+
}