You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/08/19 08:05:38 UTC

[inlong] branch master updated: [INLONG-5589][Agent] To extend the type of file data for the k8s log (#5590)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d7f03fe4 [INLONG-5589][Agent] To extend the type of file data for the k8s log (#5590)
1d7f03fe4 is described below

commit 1d7f03fe46d286ecae6defb507f01f9af2337bb2
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Fri Aug 19 16:05:33 2022 +0800

    [INLONG-5589][Agent] To extend the type of file data for the k8s log (#5590)
---
 .../org/apache/inlong/agent/utils/AgentUtils.java  |  1 +
 .../org/apache/inlong/agent/core/AgentManager.java |  9 +++------
 .../apache/inlong/agent/core/job/JobWrapper.java   |  1 +
 .../agent/plugin/fetcher/ManagerFetcher.java       |  6 ++++--
 .../agent/plugin/sources/TextFileSource.java       |  1 +
 .../sources/reader/file/AbstractFileReader.java    |  5 +++--
 .../plugin/sources/reader/file/TextFileReader.java |  5 +++++
 .../inlong/agent/plugin/utils/FileDataUtils.java   | 22 ++++++++++++++++++++--
 8 files changed, 38 insertions(+), 12 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index d8ec603e3..20ef03e31 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -291,6 +291,7 @@ public class AgentUtils {
                 }
             }
         }
+        LOGGER.info("path name:{} , pattern name:{}", Arrays.toString(pathNames), Arrays.toString(patternNames));
         return true;
     }
 
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 5708b6a33..8aaede49c 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -55,17 +55,14 @@ public class AgentManager extends AbstractDaemon {
     private final TriggerManager triggerManager;
     private final TaskPositionManager taskPositionManager;
     private final HeartbeatManager heartbeatManager;
-
-
-    // jetty for config operations via http.
-    private ConfigJetty configJetty;
-
     private final ProfileFetcher fetcher;
     private final AgentConfiguration conf;
     private final Db db;
     private final LocalProfile localProfile;
     private final CommandDb commandDb;
     private final JobProfileDb jobProfileDb;
+    // jetty for config operations via http.
+    private ConfigJetty configJetty;
 
     public AgentManager() {
         conf = AgentConfiguration.getAgentConf();
@@ -99,7 +96,7 @@ public class AgentManager extends AbstractDaemon {
             return
                     (ProfileFetcher) constructor.newInstance(agentManager);
         } catch (Exception ex) {
-            LOGGER.warn("cannot find fetcher, ignore it {}", ex.getMessage());
+            LOGGER.warn("cannot find fetcher: ", ex);
         }
         return null;
     }
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 2bdc3874c..cd51aff40 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -102,6 +102,7 @@ public class JobWrapper extends AbstractStateWrapper {
      */
     private void submitAllTasks() {
         List<Task> tasks = job.createTasks();
+        LOGGER.info("job name is {} and task size {}", job.getName(), tasks.size());
         tasks.forEach(task -> {
             allTasks.add(task);
             taskManager.submitTask(task);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index daaa60930..7eab6d2f4 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -282,7 +282,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
      * the fetch file command can be normal or special
      */
     private void dealWithFetchResult(TaskResult taskResult) {
-        LOGGER.info("deal with fetch result {}", taskResult);
+        if (!taskResult.getCmdConfigs().isEmpty() || !taskResult.getDataConfigs().isEmpty()) {
+            LOGGER.info("deal with fetch result {}", taskResult);
+        }
         for (DataConfig dataConfig : taskResult.getDataConfigs()) {
             TriggerProfile profile = TriggerProfile.getTriggerProfiles(dataConfig);
             LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
@@ -385,7 +387,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher {
         Collection<File> suitFiles = PluginUtils.findSuitFiles(triggerProfile);
         // filter files exited before
         List<File> pendingFiles = suitFiles.stream().filter(file ->
-                        !agentManager.getJobManager().checkJobExsit(file.getAbsolutePath()))
+                !agentManager.getJobManager().checkJobExsit(file.getAbsolutePath()))
                 .collect(Collectors.toList());
         for (File pendingFile : pendingFiles) {
             JobProfile copiedProfile = copyJobProfile(triggerProfile, dataTime,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 65e25f4e1..6ca88df7c 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -56,6 +56,7 @@ public class TextFileSource extends AbstractSource {
         Collection<File> allFiles = PluginUtils.findSuitFiles(jobConf);
         List<Reader> result = new ArrayList<>();
         String filterPattern = jobConf.get(JOB_LINE_FILTER_PATTERN, DEFAULT_JOB_LINE_FILTER);
+        LOGGER.info("file splits size: {}", allFiles.size());
         for (File file : allFiles) {
             int startPosition = getStartPosition(jobConf, file);
             LOGGER.info("read from history position {} with job profile {}, file absolute path: {}", startPosition,
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
index f61cb4c28..c3e4e1144 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractFileReader.java
@@ -48,17 +48,18 @@ public abstract class AbstractFileReader {
         List<String> lines = fileReaderOperator.stream.collect(Collectors.toList());
         if (fileReaderOperator.jobConf.hasKey(JOB_FILE_CONTENT_COLLECT_TYPE)) {
             long timestamp = System.currentTimeMillis();
+            boolean isJson = FileDataUtils.isJSON(lines.isEmpty() ? null : lines.get(0));
             if (Objects.nonNull(fileReaderOperator.metadata)) {
                 lines = lines.stream().map(data -> {
                     Map<String, String> mergeData = new HashMap<>(fileReaderOperator.metadata);
-                    mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data));
+                    mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data, isJson));
                     mergeData.put(DATA_CONTENT_TIME, String.valueOf(timestamp));
                     return GSON.toJson(mergeData);
                 }).collect(Collectors.toList());
             } else if (!fileReaderOperator.jobConf.hasKey(JOB_FILE_META_FILTER_BY_LABELS)) {
                 lines = lines.stream().map(data -> {
                     Map<String, String> mergeData = new HashMap<>();
-                    mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data));
+                    mergeData.put(DATA_CONTENT, FileDataUtils.getK8sJsonLog(data, isJson));
                     mergeData.put(DATA_CONTENT_TIME, String.valueOf(timestamp));
                     return GSON.toJson(mergeData);
                 }).collect(Collectors.toList());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
index e811efc39..73ed01c98 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TextFileReader.java
@@ -19,6 +19,8 @@
 package org.apache.inlong.agent.plugin.sources.reader.file;
 
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,6 +40,8 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PA
  */
 public final class TextFileReader extends AbstractFileReader {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(TextFileReader.class);
+
     private final Map<File, String> lineStringBuffer = new ConcurrentHashMap<>();
 
     public TextFileReader(FileReaderOperator fileReaderOperator) {
@@ -48,6 +52,7 @@ public final class TextFileReader extends AbstractFileReader {
         List<String> lines = Files.newBufferedReader(fileReaderOperator.file.toPath()).lines().skip(
                 fileReaderOperator.position)
                 .collect(Collectors.toList());
+        LOGGER.info("path is {}, data reads size {}", fileReaderOperator.file.getName(), lines.size());
         List<String> resultLines = new ArrayList<>();
         //TODO line regular expression matching
         if (fileReaderOperator.jobConf.hasKey(JOB_FILE_LINE_END_PATTERN)) {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
index 4f892fa81..923c0dcc4 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.plugin.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang3.StringUtils;
 
@@ -37,14 +38,31 @@ public class FileDataUtils {
     /**
      * Get standard log for k8s
      */
-    public static String getK8sJsonLog(String log) {
+    public static String getK8sJsonLog(String log, Boolean isJson) {
         if (!StringUtils.isNoneBlank(log)) {
             return null;
         }
+        if (!isJson) {
+            return log;
+        }
         Type type = new TypeToken<HashMap<Integer, String>>() {
         }.getType();
         Map<String, String> logJson = GSON.fromJson(log, type);
-        return logJson.get(KUBERNETES_LOG);
+        return logJson.getOrDefault(KUBERNETES_LOG, log);
+    }
+
+    /**
+     * To judge json
+     */
+    public static boolean isJSON(String json) {
+        boolean isJson;
+        try {
+            JsonObject convertedObject = new Gson().fromJson(json, JsonObject.class);
+            isJson = convertedObject.isJsonObject();
+        } catch (Exception exception) {
+            return false;
+        }
+        return isJson;
     }
 
 }