You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/25 03:31:48 UTC
[incubator-inlong] branch master updated: [INLONG-2297][agent] support audit for source and sink (#2307)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 256ebf9 [INLONG-2297][agent] support audit for source and sink (#2307)
256ebf9 is described below
commit 256ebf94057467356c4204b1ff6b367af41f6435
Author: justinwwhuang <hw...@163.com>
AuthorDate: Tue Jan 25 11:31:41 2022 +0800
[INLONG-2297][agent] support audit for source and sink (#2307)
---
inlong-agent/agent-common/pom.xml | 9 +-
.../inlong/agent/constants/AgentConstants.java | 10 +-
.../inlong/agent/constants/CommonConstants.java | 2 +
.../inlong/agent/metrics/audit/AuditUtils.java | 93 ++++++++++++++++++
.../org/apache/inlong/agent/utils/ConfigUtil.java | 5 +-
.../org/apache/inlong/agent/core/AgentMain.java | 4 +-
.../org/apache/inlong/agent/core/AgentManager.java | 3 +-
.../inlong/agent/plugin/sinks/ProxySink.java | 50 +++++-----
.../inlong/agent/plugin/sinks/PulsarSink.java | 106 ++++++++++++---------
.../agent/plugin/sources/reader/SqlReader.java | 21 +++-
.../plugin/sources/reader/TextFileReader.java | 21 +++-
.../apache/inlong/agent/plugin/TestFileAgent.java | 4 +
.../agent/plugin/sources/TestTextFileReader.java | 14 ++-
13 files changed, 255 insertions(+), 87 deletions(-)
diff --git a/inlong-agent/agent-common/pom.xml b/inlong-agent/agent-common/pom.xml
index d71a0e5..2010cb0 100755
--- a/inlong-agent/agent-common/pom.xml
+++ b/inlong-agent/agent-common/pom.xml
@@ -18,8 +18,8 @@
under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-agent</artifactId>
@@ -102,6 +102,11 @@
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
index 5b4e7af..2c5bc81 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/AgentConstants.java
@@ -28,7 +28,9 @@ public class AgentConstants {
public static final String DEFAULT_AGENT_LOCAL_CACHE = ".local";
public static final String AGENT_LOCAL_CACHE_TIMEOUT = "agent.local.cache.timeout";
- /** cache timeout in minutes. **/
+ /**
+ * cache timeout in minutes.
+ **/
public static final int DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT = 30;
public static final String AGENT_LOCAL_STORE_PATH = "agent.localStore.path";
@@ -166,4 +168,10 @@ public class AgentConstants {
public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port";
public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
+ public static final String AUDIT_ENABLE = "agent.audit.enable";
+ public static final boolean DEFAULT_AUDIT_ENABLE = true;
+
+ public static final String AUDIT_KEY_PROXYS = "audit.proxys";
+ public static final String DEFAULT_AUDIT_PROXYS = "";
+
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
index d65ad66..855904c 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constants/CommonConstants.java
@@ -25,9 +25,11 @@ public class CommonConstants {
public static final String DEFAULT_PROXY_NET_TAG = "";
public static final String PROXY_INLONG_GROUP_ID = "proxy.inlongGroupId";
+ public static final String DEFAULT_PROXY_INLONG_GROUP_ID = "default_inlong_group_id";
public static final String POSITION_SUFFIX = ".position";
public static final String PROXY_INLONG_STREAM_ID = "proxy.inlongStreamId";
+ public static final String DEFAULT_PROXY_INLONG_STREAM_ID = "default_inlong_stream_id";
public static final String PROXY_LOCAL_HOST = "proxy.localHost";
public static final String DEFAULT_PROXY_LOCALHOST = AgentUtils.getLocalIp();
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
new file mode 100644
index 0000000..87baa0e
--- /dev/null
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics.audit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.util.AuditConfig;
+
+import java.util.HashSet;
+
+import static org.apache.inlong.agent.constants.AgentConstants.AUDIT_ENABLE;
+import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AUDIT_ENABLE;
+import static org.apache.inlong.agent.constants.AgentConstants.AUDIT_KEY_PROXYS;
+import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_AUDIT_PROXYS;
+
+/**
+ * AuditUtils
+ */
+public class AuditUtils {
+
+ public static final String AUDIT_KEY_FILE_PATH = "audit.filePath";
+ public static final String AUDIT_DEFAULT_FILE_PATH = "/data/inlong/audit/";
+ public static final String AUDIT_KEY_MAX_CACHE_ROWS = "audit.maxCacheRows";
+ public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
+ public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
+ public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
+
+ private static boolean IS_AUDIT = true;
+
+ /**
+ * initAudit
+ */
+ public static void initAudit() {
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ // IS_AUDIT
+ IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE);
+ if (IS_AUDIT) {
+ // AuditProxy
+ String strIpPorts = conf.get(AUDIT_KEY_PROXYS, DEFAULT_AUDIT_PROXYS);
+ HashSet<String> proxys = new HashSet<>();
+ if (!StringUtils.isBlank(strIpPorts)) {
+ String[] ipPorts = strIpPorts.split("\\s+");
+ for (String ipPort : ipPorts) {
+ proxys.add(ipPort);
+ }
+ }
+ AuditImp.getInstance().setAuditProxy(proxys);
+ // AuditConfig
+ String filePath = conf.get(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
+ int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, AUDIT_DEFAULT_MAX_CACHE_ROWS);
+ AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
+ AuditImp.getInstance().setAuditConfig(auditConfig);
+ }
+ }
+
+ /**
+ * add
+ *
+ * @param auditID
+ */
+ public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime) {
+ if (!IS_AUDIT) {
+ return;
+ }
+ AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, 0);
+ }
+
+ /**
+ * sendReport
+ */
+ public static void sendReport() {
+ if (!IS_AUDIT) {
+ return;
+ }
+ AuditImp.getInstance().sendReport();
+ }
+}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
index a922af0..5bae21d 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/ConfigUtil.java
@@ -19,16 +19,15 @@
package org.apache.inlong.agent.utils;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
import static org.apache.inlong.agent.constants.AgentConstants.DEFAULT_PROMETHEUS_ENABLE;
import static org.apache.inlong.agent.constants.AgentConstants.PROMETHEUS_ENABLE;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-
public class ConfigUtil {
public static boolean isPrometheusEnabled() {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
return conf.getBoolean(PROMETHEUS_ENABLE, DEFAULT_PROMETHEUS_ENABLE);
}
-
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
index 2c8d655..a4f6303 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +124,7 @@ public class AgentMain {
CommandLine cl = initOptions(args);
assert cl != null;
initAgentConf(cl);
+ AuditUtils.initAudit();
AgentManager manager = new AgentManager();
try {
manager.start();
@@ -141,7 +143,7 @@ public class AgentMain {
LOGGER.error("exception caught", ex);
} finally {
manager.stop();
-
+ AuditUtils.sendReport();
if (metricsServer != null) {
metricsServer.stop();
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index ceae99a..274106f 100755
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -23,6 +23,7 @@ import static org.apache.inlong.agent.constants.JobConstants.JOB_TRIGGER;
import java.lang.reflect.Constructor;
import java.util.List;
+
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
@@ -112,7 +113,7 @@ public class AgentManager extends AbstractDaemon {
// db is a required component, so if not init correctly,
// throw exception and stop running.
return (Db) Class.forName(conf.get(
- AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
+ AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
.newInstance();
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index a0e30e7..9800b04 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -50,12 +50,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constants.CommonConstants;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.message.PackProxyMessage;
@@ -98,10 +100,10 @@ public class ProxySink extends AbstractSink {
public ProxySink() {
if (ConfigUtil.isPrometheusEnabled()) {
this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(
- PROXY_SINK_TAG_NAME, index.incrementAndGet()));
+ PROXY_SINK_TAG_NAME, index.incrementAndGet()));
} else {
this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(
- PROXY_SINK_TAG_NAME, index.incrementAndGet()));
+ PROXY_SINK_TAG_NAME, index.incrementAndGet()));
}
}
@@ -113,19 +115,21 @@ public class ProxySink extends AbstractSink {
extractStreamFromMessage(message, fieldSplitter);
if (!(message instanceof EndMessage)) {
ProxyMessage proxyMessage = ProxyMessage.parse(message);
- // add proxy message to cache.
+ // add proxy message to cache.
cache.compute(proxyMessage.getInlongStreamId(),
- (s, packProxyMessage) -> {
- if (packProxyMessage == null) {
- packProxyMessage = new PackProxyMessage(
- maxBatchSize, maxQueueNumber,
- maxBatchTimeoutMs, proxyMessage.getInlongStreamId());
- }
- // add message to package proxy
- packProxyMessage.addProxyMessage(proxyMessage);
- //
- return packProxyMessage;
- });
+ (s, packProxyMessage) -> {
+ if (packProxyMessage == null) {
+ packProxyMessage = new PackProxyMessage(
+ maxBatchSize, maxQueueNumber,
+ maxBatchTimeoutMs, proxyMessage.getInlongStreamId());
+ }
+ // add message to package proxy
+ packProxyMessage.addProxyMessage(proxyMessage);
+ //
+ return packProxyMessage;
+ });
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
+ inlongGroupId, inlongStreamId, System.currentTimeMillis());
// increment the count of successful sinks
sinkMetrics.incSinkSuccessCount();
} else {
@@ -138,12 +142,13 @@ public class ProxySink extends AbstractSink {
/**
* extract stream id from message if message filter is presented
* or use the default stream id
+ *
* @param message
*/
private void extractStreamFromMessage(Message message, byte[] fieldSplitter) {
if (messageFilter != null) {
message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
- messageFilter.filterStreamId(message, fieldSplitter));
+ messageFilter.filterStreamId(message, fieldSplitter));
} else {
message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId);
}
@@ -170,8 +175,8 @@ public class ProxySink extends AbstractSink {
senderManager.sendBatch(jobInstanceId, inlongGroupId, result.getKey(),
result.getValue(), 0, dataTime);
LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}"
- + "dataTime is {}", inlongGroupId, result.getRight().size(),
- jobInstanceId, sourceName, dataTime);
+ + "dataTime is {}", inlongGroupId, result.getRight().size(),
+ jobInstanceId, sourceName, dataTime);
}
});
@@ -187,20 +192,20 @@ public class ProxySink extends AbstractSink {
public void init(JobProfile jobConf) {
maxBatchSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE);
maxQueueNumber = jobConf.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
- DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
+ DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
maxBatchTimeoutMs = jobConf.getInt(
- PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
+ PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
- DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+ DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
cache = new ConcurrentHashMap<>(10);
dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
- jobConf.get(JOB_CYCLE_UNIT, ""));
+ jobConf.get(JOB_CYCLE_UNIT, ""));
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
messageFilter = initMessageFilter(jobConf);
fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes(
- StandardCharsets.UTF_8);
+ StandardCharsets.UTF_8);
executorService.execute(flushCache());
senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
try {
@@ -240,6 +245,7 @@ public class ProxySink extends AbstractSink {
/**
* check whether all stream id messages finished
+ *
* @return
*/
private boolean sinkFinish() {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index 560068b..8133a7e 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -17,37 +17,16 @@
package org.apache.inlong.agent.plugin.sinks;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ASYNC;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_THREAD_NUM;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_CACHE_CAPACITY;
-import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_POLL_TIMEOUT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ASYNC;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXCOUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXSIZE;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BLOCK_QUEUE;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_COMPRESS_TYPE;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ENABLE_BATCH;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_MAX_PENDING_COUNT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_THREAD_NUM;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SERVERS;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_CACHE_CAPACITY;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_POLL_TIMEOUT;
-import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_TOPIC;
-
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
@@ -68,11 +47,39 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_THREAD_NUM;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_THREAD_NUM;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ASYNC;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ASYNC;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_POLL_TIMEOUT;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_POLL_TIMEOUT;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SINK_CACHE_CAPACITY;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_SINK_CACHE_CAPACITY;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_SERVERS;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_TOPIC;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_MAX_PENDING_COUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXSIZE;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BATCH_MAXCOUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_ENABLE_BATCH;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_BLOCK_QUEUE;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE;
+import static org.apache.inlong.agent.constants.CommonConstants.PULSAR_PRODUCER_COMPRESS_TYPE;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE;
+
public class PulsarSink extends AbstractDaemon implements Sink {
+
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarSink.class);
private static final String PULSAR_SINK_TAG_NAME = "AgentPulsarMetric";
+ private String inlongGroupId;
+ private String inlongStreamId;
private boolean async;
private long pollTimeout;
private int threadNum;
@@ -90,14 +97,14 @@ public class PulsarSink extends AbstractDaemon implements Sink {
public PulsarSink() {
if (ConfigUtil.isPrometheusEnabled()) {
this.pluginMetricNew = new PluginPrometheusMetric(AgentUtils.getUniqId(
- PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+ PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(
- PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+ PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
} else {
this.pluginMetricNew = new PluginJmxMetric(AgentUtils.getUniqId(
- PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+ PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(
- PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
+ PULSAR_SINK_TAG_NAME, metricsIndex.incrementAndGet()));
}
}
@@ -106,6 +113,8 @@ public class PulsarSink extends AbstractDaemon implements Sink {
if (message != null && writing) {
// if message is not null
try {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
+ inlongGroupId, inlongStreamId, System.currentTimeMillis());
// put message to cache, wait until cache is not full.
pluginMetricNew.incSendNum();
cache.put(message.getBody());
@@ -131,7 +140,8 @@ public class PulsarSink extends AbstractDaemon implements Sink {
@Override
public void init(JobProfile jobConf) {
-
+ inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
+ inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
threadNum = jobConf.getInt(PULSAR_PRODUCER_THREAD_NUM, DEFAULT_PULSAR_PRODUCER_THREAD_NUM);
async = jobConf.getBoolean(PULSAR_PRODUCER_ASYNC, DEFAULT_PULSAR_PRODUCER_ASYNC);
pollTimeout = jobConf.getLong(PULSAR_SINK_POLL_TIMEOUT, DEFAULT_PULSAR_SINK_POLL_TIMEOUT);
@@ -146,7 +156,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
try {
stop();
LOGGER.info("send success num is {}, failed num is {}",
- pluginMetricNew.getSendSuccessNum(), pluginMetricNew.getSendFailedNum());
+ pluginMetricNew.getSendSuccessNum(), pluginMetricNew.getSendFailedNum());
} catch (Exception ex) {
LOGGER.error("exception caught", ex);
}
@@ -154,6 +164,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
/**
* sending data with producer.
+ *
* @param item
* @param producer
* @throws PulsarClientException
@@ -181,6 +192,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
/**
* thread for sending data.
+ *
* @return runnable thread.
*/
private Runnable sendThread(Producer<byte[]> producer) {
@@ -189,7 +201,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
try {
byte[] item = cache.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (item != null) {
- // sending to pulsar
+ // sending to pulsar
sendingData(item, producer);
}
} catch (Exception ex) {
@@ -201,6 +213,7 @@ public class PulsarSink extends AbstractDaemon implements Sink {
/**
* construct producer for every thread.
+ *
* @return
*/
private Producer<byte[]> constructProducer() {
@@ -211,29 +224,30 @@ public class PulsarSink extends AbstractDaemon implements Sink {
String pulsarServers = profile.get(PULSAR_SERVERS);
String pulsarTopic = profile.get(PULSAR_TOPIC);
int pendingNum = profile.getInt(PULSAR_PRODUCER_MAX_PENDING_COUNT,
- DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT);
+ DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT);
int batchSize = profile.getInt(PULSAR_PRODUCER_BATCH_MAXSIZE,
- DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE);
+ DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE);
int batchCount = profile.getInt(PULSAR_PRODUCER_BATCH_MAXCOUNT,
- DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT);
+ DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT);
boolean enableBatch = profile.getBoolean(PULSAR_PRODUCER_ENABLE_BATCH,
- DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH);
+ DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH);
boolean blockQueue = profile.getBoolean(PULSAR_PRODUCER_BLOCK_QUEUE, DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE);
CompressionType compressionType = PluginUtils.convertType(
- profile.get(PULSAR_PRODUCER_COMPRESS_TYPE,
- DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE));
- LOGGER.info("init producer, pulsarServers: {}, topic: {}, pendingNum: {}, batchSize: {}, "
- + "batchCount: {}, enableBatch: {}, compressType: {}, blockQueue: {}", pulsarServers, pulsarTopic,
- pendingNum, batchSize, batchCount, enableBatch, compressionType, blockQueue);
+ profile.get(PULSAR_PRODUCER_COMPRESS_TYPE,
+ DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE));
+ LOGGER.info("init producer, pulsarServers: {}, topic: {}, pendingNum: {}, batchSize: {},"
+ + " batchCount: {}, enableBatch: {}, compressType: {}, blockQueue: {}",
+ pulsarServers, pulsarTopic, pendingNum, batchSize, batchCount,
+ enableBatch, compressionType, blockQueue);
client = PulsarClient.builder()
- .serviceUrl(pulsarServers).build();
+ .serviceUrl(pulsarServers).build();
return client.newProducer().topic(pulsarTopic)
- .compressionType(compressionType)
- .batchingMaxBytes(batchSize)
- .batchingMaxMessages(batchCount)
- .blockIfQueueFull(blockQueue)
- .maxPendingMessages(pendingNum)
- .enableBatching(enableBatch).create();
+ .compressionType(compressionType)
+ .batchingMaxBytes(batchSize)
+ .batchingMaxMessages(batchCount)
+ .blockIfQueueFull(blockQueue)
+ .maxPendingMessages(pendingNum)
+ .enableBatching(enableBatch).create();
} catch (Exception exception) {
LOGGER.error("error while init producer", exception);
throw new RuntimeException(exception);
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index e8647e9..fa3e9f5 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -21,6 +21,8 @@ import static java.sql.Types.BINARY;
import static java.sql.Types.BLOB;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.VARBINARY;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
@@ -30,11 +32,13 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.CharUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
@@ -53,6 +57,9 @@ public class SqlReader implements Reader {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
+ private String inlongGroupId;
+ private String inlongStreamId;
+
private static final String SQL_READER_TAG_NAME = "AgentSqlMetric";
private static final String JOB_DATABASE_USER = "job.sql.user";
@@ -98,10 +105,10 @@ public class SqlReader implements Reader {
if (ConfigUtil.isPrometheusEnabled()) {
this.sqlFileMetric = new PluginPrometheusMetric(
- AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+ AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
} else {
this.sqlFileMetric = new PluginJmxMetric(
- AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+ AgentUtils.getUniqId(SQL_READER_TAG_NAME, metricsIndex.incrementAndGet()));
}
}
@@ -128,6 +135,8 @@ public class SqlReader implements Reader {
}
lineColumns.add(dataValue);
}
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
+ inlongGroupId, inlongStreamId, System.currentTimeMillis());
sqlFileMetric.incReadNum();
return generateMessage(lineColumns);
@@ -186,6 +195,8 @@ public class SqlReader implements Reader {
@Override
public void init(JobProfile jobConf) {
+ inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID);
+ inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, "");
int batchSize = jobConf.getInt(JOB_DATABASE_BATCH_SIZE, DEFAULT_JOB_DATABASE_BATCH_SIZE);
String userName = jobConf.get(JOB_DATABASE_USER);
String password = jobConf.get(JOB_DATABASE_PASSWORD);
@@ -193,17 +204,17 @@ public class SqlReader implements Reader {
int port = jobConf.getInt(JOB_DATABASE_PORT);
String driverClass = jobConf.get(JOB_DATABASE_DRIVER_CLASS,
- DEFAULT_JOB_DATABASE_DRIVER_CLASS);
+ DEFAULT_JOB_DATABASE_DRIVER_CLASS);
separator = jobConf.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
finished = false;
try {
String databaseType = jobConf.get(JOB_DATABASE_TYPE, MYSQL);
String url = String.format("jdbc:%s://%s:%d", databaseType, hostName, port);
conn = AgentDbUtils.getConnectionFailover(
- driverClass, url, userName, password);
+ driverClass, url, userName, password);
if (databaseType.equals(MYSQL)) {
statement = conn.createStatement(
- ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE);
resultSet = statement.executeQuery(sql);
} else {
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
index 646fe9d..d3c0328 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/TextFileReader.java
@@ -17,6 +17,11 @@
package org.apache.inlong.agent.plugin.sources.reader;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+
import static org.apache.inlong.agent.constants.JobConstants.DEFAULT_JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
@@ -29,9 +34,11 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Validator;
@@ -53,6 +60,8 @@ public class TextFileReader implements Reader {
public static final int NEVER_STOP_SIGN = -1;
+ private String inlongGroupId;
+ private String inlongStreamId;
private final File file;
private final int position;
private final String md5;
@@ -76,10 +85,10 @@ public class TextFileReader implements Reader {
if (ConfigUtil.isPrometheusEnabled()) {
textFileMetric = new PluginPrometheusMetric(AgentUtils.getUniqId(
- TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+ TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
} else {
textFileMetric = new PluginJmxMetric(AgentUtils.getUniqId(
- TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
+ TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
}
}
@@ -92,6 +101,8 @@ public class TextFileReader implements Reader {
if (iterator != null && iterator.hasNext()) {
String message = iterator.next();
if (validateMessage(message)) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
+ inlongGroupId, inlongStreamId, System.currentTimeMillis());
textFileMetric.incReadNum();
return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8));
}
@@ -159,6 +170,8 @@ public class TextFileReader implements Reader {
LOGGER.info("file name for task is {}, md5 is {}", file, md5);
stream = Files.newBufferedReader(file.toPath()).lines().skip(position);
iterator = stream.iterator();
+ inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
} catch (Exception ex) {
throw new FileException("error init stream for " + file.getPath(), ex);
}
@@ -166,7 +179,7 @@ public class TextFileReader implements Reader {
private void initReadTimeout(JobProfile jobConf) {
int waitTime = jobConf.getInt(JOB_FILE_MAX_WAIT,
- DEFAULT_JOB_FILE_MAX_WAIT);
+ DEFAULT_JOB_FILE_MAX_WAIT);
if (waitTime == NEVER_STOP_SIGN) {
timeout = NEVER_STOP_SIGN;
} else {
@@ -178,6 +191,6 @@ public class TextFileReader implements Reader {
public void destroy() {
AgentUtils.finallyClose(stream);
LOGGER.info("destroy reader with read {} num {}",
- textFileMetric.getTagName(), textFileMetric.getReadNum());
+ textFileMetric.getTagName(), textFileMetric.getReadNum());
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index 570fc89..c548f5e 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -18,6 +18,8 @@
package org.apache.inlong.agent.plugin;
import static org.apache.inlong.agent.constants.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constants.JobConstants.JOB_CYCLE_UNIT;
import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
@@ -109,6 +111,8 @@ public class TestFileAgent {
profile.set(JOB_DIR_FILTER_PATTERN, Paths.get(testRootDir.toString(),
"hugeFile.[0-9].txt").toString());
profile.set(JOB_READ_WAIT_TIMEOUT, String.valueOf(readWaitTimeMilliseconds));
+ profile.set(PROXY_INLONG_GROUP_ID, "groupid");
+ profile.set(PROXY_INLONG_STREAM_ID, "streamid");
agent.submitJob(profile);
}
}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 12c39c8..abdc1bb 100755
--- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.plugin.sources;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static org.apache.inlong.agent.constants.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constants.JobConstants.JOB_DIR_FILTER_PATTERN;
import static org.apache.inlong.agent.constants.JobConstants.JOB_FILE_MAX_WAIT;
import static org.apache.inlong.agent.constants.JobConstants.JOB_INSTANCE_ID;
@@ -35,6 +37,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.plugin.Message;
@@ -93,8 +96,10 @@ public class TestTextFileReader {
JobProfile jobConfiguration = JobProfile.parseJsonStr("{}");
String mainPath = Paths.get(uri).toString();
jobConfiguration.set(JOB_DIR_FILTER_PATTERN, Paths.get(mainPath,
- "[1-2].txt").toFile().getAbsolutePath());
+ "[1-2].txt").toFile().getAbsolutePath());
jobConfiguration.set(JOB_INSTANCE_ID, "test");
+ jobConfiguration.set(PROXY_INLONG_GROUP_ID, "groupid");
+ jobConfiguration.set(PROXY_INLONG_STREAM_ID, "streamid");
TextFileSource fileSource = new TextFileSource();
List<Reader> readerList = fileSource.split(jobConfiguration);
Assert.assertEquals(2, readerList.size());
@@ -126,7 +131,10 @@ public class TestTextFileReader {
}
Files.write(localPath, afterList, StandardOpenOption.APPEND);
TextFileReader reader = new TextFileReader(localPath.toFile(), 1000);
- reader.init(new JobProfile());
+ JobProfile jobProfile = new JobProfile();
+ jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+ jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
+ reader.init(jobProfile);
Assert.assertEquals("world", new String(reader.read().getBody()));
@@ -136,6 +144,8 @@ public class TestTextFileReader {
public void testTextTailTimeout() throws Exception {
JobProfile jobProfile = new JobProfile();
jobProfile.setInt(JOB_FILE_MAX_WAIT, 1);
+ jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid");
+ jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid");
Path localPath = Paths.get(testDir.toString(), "test1.txt");
TextFileReader reader = new TextFileReader(localPath.toFile(), 0);
if (localPath.toFile().exists()) {