You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 11:43:53 UTC
[inlong] 01/05: [INLONG-7156][Agent] Support directly sending raw file data (#7157)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit e26610ff1de43db5ecbe1eb548724630e9d3ee7e
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Thu Jan 5 16:32:24 2023 +0800
[INLONG-7156][Agent] Support directly sending raw file data (#7157)
---
.../sources/reader/file/FileReaderOperator.java | 8 ++++-
.../apache/inlong/agent/plugin/TestFileAgent.java | 2 +-
.../agent/plugin/sources/TestTextFileReader.java | 34 +++++++++++++++++++++-
.../inlong/agent/plugin/task/TestTextFileTask.java | 6 +++-
.../agent-plugins/src/test/resources/test/3.txt | 5 ++++
5 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 32ede5da4..583632877 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -21,11 +21,11 @@ import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
import org.apache.inlong.agent.plugin.utils.FileDataUtils;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
@@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
private final StringBuffer sb = new StringBuffer();
+ private boolean needMetadata = false;
public FileReaderOperator(File file, int position) {
this(file, position, "");
@@ -261,6 +262,9 @@ public class FileReaderOperator extends AbstractReader {
}
public String metadataMessage(String message) {
+ if (!needMetadata) {
+ return message;
+ }
long timestamp = System.currentTimeMillis();
boolean isJson = FileDataUtils.isJSON(message);
Map<String, String> mergeData = new HashMap<>(metadata);
@@ -280,8 +284,10 @@ public class FileReaderOperator extends AbstractReader {
String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
Arrays.stream(env).forEach(data -> {
if (data.equalsIgnoreCase(KUBERNETES)) {
+ needMetadata = true;
new KubernetesMetadataProvider(this).getData();
} else if (data.equalsIgnoreCase(ENV_CVM)) {
+ needMetadata = true;
metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost());
metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp());
metadata.put(METADATA_FILE_NAME, file.getName());
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index d0bf5d99d..011f70bf7 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -169,7 +169,7 @@ public class TestFileAgent {
await().atMost(10, TimeUnit.SECONDS).until(() -> {
Map<String, JobWrapper> jobs = agent.getManager().getJobManager().getJobs();
return jobs.size() == 1
- && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 4;
+ && jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 5;
});
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 6159aa219..f89bf0355 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -56,13 +57,15 @@ import java.util.stream.Stream;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_LINE_END_PATTERN;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER_TYPE;
import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
@PowerMockIgnore({"javax.management.*", "javax.script.*", "com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*",
"org.w3c.*"})
@@ -135,6 +138,33 @@ public class TestTextFileReader {
}
}
+ @Test
+ public void testFileRowDataRead() throws URISyntaxException {
+ URI uri = getClass().getClassLoader().getResource("test").toURI();
+ JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
+ String mainPath = Paths.get(uri).toString();
+ jobConfiguration.set(JOB_DIR_FILTER_PATTERNS, Paths.get(mainPath,
+ "3.txt").toFile().getAbsolutePath());
+ jobConfiguration.set(JOB_INSTANCE_ID, "test");
+ jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+ jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
+ jobConfiguration.set(JOB_GROUP_ID, "groupid");
+ jobConfiguration.set(JOB_STREAM_ID, "streamid");
+ TextFileSource fileSource = new TextFileSource();
+ List<Reader> readerList = fileSource.split(jobConfiguration);
+ Assert.assertEquals(1, readerList.size());
+ Reader reader = readerList.get(0);
+ reader.init(jobConfiguration);
+ while (!reader.isFinished()) {
+ Message message = reader.read();
+ if (message == null) {
+ break;
+ }
+ Assert.assertEquals("agent text content test", message.toString());
+ }
+
+ }
+
/**
* Custom line end character.
*/
@@ -152,6 +182,7 @@ public class TestTextFileReader {
jobConfiguration.set(JOB_STREAM_ID, "streamid");
jobConfiguration.set(JOB_FILE_TRIGGER_TYPE, FileTriggerType.FULL);
jobConfiguration.set(JOB_FILE_LINE_END_PATTERN, "line-end-symbol");
+ jobConfiguration.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
TextFileSource fileSource = new TextFileSource();
List<Reader> readerList = fileSource.split(jobConfiguration);
Assert.assertEquals(1, readerList.size());
@@ -224,6 +255,7 @@ public class TestTextFileReader {
jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
jobProfile.set(JOB_INSTANCE_ID, "1");
+ jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
fileReaderOperator.init(jobProfile);
Assert.assertEquals("world", getContent(
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
index cd7c29aa5..e0245a83d 100644
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java
@@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST;
+import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -180,7 +182,7 @@ public class TestTextFileTask {
jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath());
jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL);
-
+ jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
// mock data
final MockSink sink = mockTextTask(jobProfile);
await().atMost(10, TimeUnit.SECONDS).until(() -> sink.getResult().size() == 100);
@@ -220,6 +222,7 @@ public class TestTextFileTask {
jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, file.getAbsolutePath());
jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.INCREMENT);
jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
+ jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
// mock data
final MockSink sink = mockTextTask(jobProfile);
@@ -252,6 +255,7 @@ public class TestTextFileTask {
jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, DataCollectType.FULL);
jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, String.valueOf(0));
jobProfile.set(JobConstants.JOB_FILE_LINE_END_PATTERN, "[0-9]");
+ jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);
// mock data
final MockSink sink = mockTextTask(jobProfile);
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/3.txt b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
new file mode 100644
index 000000000..45d4bba28
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
@@ -0,0 +1,5 @@
+agent text content test
+agent text content test
+agent text content test
+agent text content test
+agent text content test
\ No newline at end of file