You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 11:43:53 UTC

[inlong] 01/05: [INLONG-7156][Agent] Support directly sending raw file data (#7157)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e26610ff1de43db5ecbe1eb548724630e9d3ee7e
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Jan 5 16:32:24 2023 +0800

    [INLONG-7156][Agent] Support directly sending raw file data (#7157)
---
 .../sources/reader/file/FileReaderOperator.java    |  8 ++++-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 34 +++++++++++++++++++++-
 .../inlong/agent/plugin/task/TestTextFileTask.java |  6 +++-
 .../agent-plugins/src/test/resources/test/3.txt    |  5 ++++
 5 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 32ede5da4..583632877 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -21,11 +21,11 @@ import com.google.gson.Gson;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
 import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
@@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader {
 
     private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
     private final StringBuffer sb = new StringBuffer();
+    private boolean needMetadata = false;
 
     public FileReaderOperator(File file, int position) {
         this(file, position, "");
@@ -261,6 +262,9 @@ public class FileReaderOperator extends AbstractReader {
     }
 
     public String metadataMessage(String message) {
+        if (!needMetadata) {
+            return message;
+        }
         long timestamp = System.currentTimeMillis();
         boolean isJson = FileDataUtils.isJSON(message);
         Map<String, String> mergeData = new HashMap<>(metadata);
@@ -280,8 +284,10 @@ public class FileReaderOperator extends AbstractReader {
         String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
         Arrays.stream(env).forEach(data -> {
             if (data.equalsIgnoreCase(KUBERNETES)) {
+                needMetadata = true;
                 new KubernetesMetadataProvider(this).getData();
             } else if (data.equalsIgnoreCase(ENV_CVM)) {
+                needMetadata = true;
                 metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost());
                 metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp());
                 metadata.put(METADATA_FILE_NAME, file.getName());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index d0bf5d99d..011f70bf7 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -169,7 +169,7 @@ public class TestFileAgent {
         await().atMost(10, TimeUnit.SECONDS).until(() -> {
             Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
             return jobs.size() == 1
-                    && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 4;
+                    && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 5;
         });
     }
 
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 6159aa219..f89bf0355 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -56,13 +57,15 @@ import java.util.stream.Stream;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
 
 @PowerMockIgnore({"javax.management.*", "javax.script.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
         "org.w3c.*"})
@@ -135,6 +138,33 @@ public class TestTextFileReader {
         }
     }
 
+    @Test
+    public void testFileRowDataRead() throws URISyntaxException {
+        URI uri = getClass().getClassLoader().getResource("test").toURI();
+        JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
+        String mainPath = Paths.get(uri).toString();
+        jobConfiguration.set(JOB_DIR_FILTER_PATTERNS, Paths.get(mainPath,
+                "3.txt").toFile().getAbsolutePath());
+        jobConfiguration.set(JOB_INSTANCE_ID, "test");
+        jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+        jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+        jobConfiguration.set(JOB_GROUP_ID, "groupid");
+        jobConfiguration.set(JOB_STREAM_ID, "streamid");
+        TextFileSource fileSource = new TextFileSource();
+        List<Reader> readerList = fileSource.split(jobConfiguration);
+        Assert.assertEquals(1, readerList.size());
+        Reader reader = readerList.get(0);
+        reader.init(jobConfiguration);
+        while (!reader.isFinished()) {
+            Message message = reader.read();
+            if (message == null) {
+                break;
+            }
+            Assert.assertEquals("agent text content test", message.toString());
+        }
+
+    }
+
     /**
      * Custom line end character.
      */
@@ -152,6 +182,7 @@ public class TestTextFileReader {
         jobConfiguration.set(JOB_STREAM_ID, "streamid");
         jobConfiguration.set(JOB_FILE_TRIGGER_TYPE, FileTriggerType.FULL);
         jobConfiguration.set(JOB_FILE_LINE_END_PATTERN, "line-end-symbol");
+        jobConfiguration.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
         TextFileSource fileSource = new TextFileSource();
         List<Reader> readerList = fileSource.split(jobConfiguration);
         Assert.assertEquals(1, readerList.size());
@@ -224,6 +255,7 @@ public class TestTextFileReader {
         jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
         jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
         jobProfile.set(JOB_INSTANCE_ID, "1");
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
         fileReaderOperator.init(jobProfile);
 
         Assert.assertEquals("world", getContent(
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index cd7c29aa5..e0245a83d 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
 import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -180,7 +182,7 @@ public class TestTextFileTask {
         jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath());
         jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
         jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL);
-
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
         // mock data
         final MockSink sink = mockTextTask(jobProfile);
         await().atMost(10, TimeUnit.SECONDS).until(() -> sink.getResult().size() == 100);
@@ -220,6 +222,7 @@ public class TestTextFileTask {
         jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath());
         jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
         jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
 
         // mock data
         final MockSink sink = mockTextTask(jobProfile);
@@ -252,6 +255,7 @@ public class TestTextFileTask {
         jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL);
         jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
         jobProfile.set(JobConstants.JOB_FILE_LINE_END_PATTERN, "[0-9]");
+        jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
 
         // mock data
         final MockSink sink = mockTextTask(jobProfile);
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/3.txt b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
new file mode 100644
index 000000000..45d4bba28
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
@@ -0,0 +1,5 @@
+agent text content test
+agent text content test
+agent text content test
+agent text content test
+agent text content test
\ No newline at end of file