You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/19 08:05:38 UTC
[inlong] branch master updated: [INLONG-5589][Agent] To extend the type of file data for the k8s log (#5590)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1d7f03fe4 [INLONG-5589][Agent] To extend the type of file data for the k8s log (#5590)
1d7f03fe4 is described below
commit 1d7f03fe46d286ecae6defb507f01f9af2337bb2
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Aug 19 16:05:33 2022 +0800
[INLONG-5589][Agent] To extend the type of file data for the k8s log (#5590)
---
.../org/apache/inlong/agent/utils/AgentUtils.java | 1 +
.../org/apache/inlong/agent/core/AgentManager.java | 9 +++------
.../apache/inlong/agent/core/job/JobWrapper.java | 1 +
.../agent/plugin/fetcher/ManagerFetcher.java | 6 ++++--
.../agent/plugin/sources/TextFileSource.java | 1 +
.../sources/reader/file/AbstractFileReader.java | 5 +++--
.../plugin/sources/reader/file/TextFileReader.java | 5 +++++
.../inlong/agent/plugin/utils/FileDataUtils.java | 22 ++++++++++++++++++++--
8 files changed, 38 insertions(+), 12 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index d8ec603e3..20ef03e31 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -291,6 +291,7 @@ public class AgentUtils {
}
}
}
+ LOGGER.info("path name:{} , pattern name:{}", Arrays.toString(pathNames), Arrays.toString(patternNames));
return true;
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 5708b6a33..8aaede49c 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -55,17 +55,14 @@ public class AgentManager extends AbstractDaemon {
private final TriggerManager triggerManager;
private final TaskPositionManager taskPositionManager;
private final HeartbeatManager heartbeatManager;
-
-
- // jetty for config operations via http.
- private ConfigJetty configJetty;
-
private final ProfileFetcher fetcher;
private final AgentConfiguration conf;
private final Db db;
private final LocalProfile localProfile;
private final CommandDb commandDb;
private final JobProfileDb jobProfileDb;
+ // jetty for config operations via http.
+ private ConfigJetty configJetty;
public AgentManager() {
conf = AgentConfiguration.getAgentConf();
@@ -99,7 +96,7 @@ public class AgentManager extends AbstractDaemon {
return
(ProfileFetcher) constructor.newInstance(agentManager);
} catch (Exception ex) {
- LOGGER.warn("cannot find fetcher, ignore it {}", ex.getMessage());
+ LOGGER.warn("cannot find fetcher: ", ex);
}
return null;
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 2bdc3874c..cd51aff40 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -102,6 +102,7 @@ public class JobWrapper extends AbstractStateWrapper {
*/
private void submitAllTasks() {
List<Task> tasks = job.createTasks();
+ LOGGER.info("job name is {} and task size {}", job.getName(), tasks.size());
tasks.forEach(task -> {
allTasks.add(task);
taskManager.submitTask(task);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index daaa60930..7eab6d2f4 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -282,7 +282,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
* the fetch file command can be normal or special
*/
private void dealWithFetchResult(TaskResult taskResult) {
- LOGGER.info("deal with fetch result {}", taskResult);
+ if (!taskResult.getCmdConfigs().isEmpty() || !taskResult.getDataConfigs().isEmpty()) {
+ LOGGER.info("deal with fetch result {}", taskResult);
+ }
for (DataConfig dataConfig : taskResult.getDataConfigs()) {
TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
@@ -385,7 +387,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
Collection<File> suitFiles = PluginUtils.findSuitFiles(triggerProfile);
// filter files exited before
List<File> pendingFiles = suitFiles.stream().filter(file ->
- !agentManager.getJobManager().checkJobExsit(file.getAbsolutePath()))
+ !agentManager.getJobManager().checkJobExsit(file.getAbsolutePath()))
.collect(Collectors.toList());
for (File pendingFile : pendingFiles) {
JobProfile copiedProfile = copyJobProfile(triggerProfile, dataTime,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 65e25f4e1..6ca88df7c 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -56,6 +56,7 @@ public class TextFileSource extends AbstractSource {
Collection<File> allFiles = PluginUtils.findSuitFiles(jobConf);
List<Reader> result = new ArrayList<>();
String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
+ LOGGER.info("file splits size: {}", allFiles.size());
for (File file : allFiles) {
int startPosition = getStartPosition(jobConf, file);
LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", startPosition,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
index f61cb4c28..c3e4e1144 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
@@ -48,17 +48,18 @@ public abstract class AbstractFileReader {
List<String> lines = fileReaderOperator.stream.collect(Collectors.toList());
if (fileReaderOperator.jobConf.hasKey(JOB_FILE_CONTENT_COLLECT_TYPE)) {
long timestamp = System.currentTimeMillis();
+ boolean isJson = FileDataUtils.isJSON(lines.isEmpty() ? null : lines.get(0));
if (Objects.nonNull(fileReaderOperator.metadata)) {
lines = lines.stream().map(data -> {
Map<String, String> mergeData = new HashMap<>(fileReaderOperator.metadata);
- mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data));
+ mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data, isJson));
mergeData.put(DATA_CONTENT_TIME, String.valueOf(timestamp));
return GSON.toJson(mergeData);
}).collect(Collectors.toList());
} else if (!fileReaderOperator.jobConf.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
lines = lines.stream().map(data -> {
Map<String, String> mergeData = new HashMap<>();
- mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data));
+ mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data, isJson));
mergeData.put(DATA_CONTENT_TIME, String.valueOf(timestamp));
return GSON.toJson(mergeData);
}).collect(Collectors.toList());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
index e811efc39..73ed01c98 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -19,6 +19,8 @@
package org.apache.inlong.agent.plugin.sources.reader.file;
import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -38,6 +40,8 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PA
*/
public final class TextFileReader extends AbstractFileReader {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TextFileReader.class);
+
private final Map<File, String> lineStringBuffer = new ConcurrentHashMap<>();
public TextFileReader(FileReaderOperator fileReaderOperator) {
@@ -48,6 +52,7 @@ public final class TextFileReader extends AbstractFileReader {
List<String> lines = Files.newBufferedReader(fileReaderOperator.file.toPath()).lines().skip(
fileReaderOperator.position)
.collect(Collectors.toList());
+ LOGGER.info("path is {}, data reads size {}", fileReaderOperator.file.getName(), lines.size());
List<String> resultLines = new ArrayList<>();
//TODO line regular expression matching
if (fileReaderOperator.jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
index 4f892fa81..923c0dcc4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.utils;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
@@ -37,14 +38,31 @@ public class FileDataUtils {
/**
* Get standard log for k8s
*/
- public static String getK8sJsonLog(String log) {
+ public static String getK8sJsonLog(String log, Boolean isJson) {
if (!StringUtils.isNoneBlank(log)) {
return null;
}
+ if (!isJson) {
+ return log;
+ }
Type type = new TypeToken<HashMap<Integer, String>>() {
}.getType();
Map<String, String> logJson = GSON.fromJson(log, type);
- return logJson.get(KUBERNETES_LOG);
+ return logJson.getOrDefault(KUBERNETES_LOG, log);
+ }
+
+ /**
+ * To judge json
+ */
+ public static boolean isJSON(String json) {
+ boolean isJson;
+ try {
+ JsonObject convertedObject = new Gson().fromJson(json, JsonObject.class);
+ isJson = convertedObject.isJsonObject();
+ } catch (Exception exception) {
+ return false;
+ }
+ return isJson;
}
}