You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/27 12:19:12 UTC
[inlong] branch master updated: [INLONG-6031][Audit] Clean code for InLong Audit (#6032)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1f310a1e5 [INLONG-6031][Audit] Clean code for InLong Audit (#6032)
1f310a1e5 is described below
commit 1f310a1e587b0e69d44019f9f66eef42f14b7af3
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Tue Sep 27 20:19:07 2022 +0800
[INLONG-6031][Audit] Clean code for InLong Audit (#6032)
Co-authored-by: healchow <he...@gmail.com>
---
.../inlong/agent/metrics/audit/AuditUtils.java | 31 +-
.../org/apache/inlong/agent/core/AgentMain.java | 22 +-
.../org/apache/inlong/audit/protocol/Commands.java | 19 +-
.../audit-common/src/main/proto/AuditApi.proto | 30 +-
.../inlong/audit/source/ServerMessageHandler.java | 180 ++++----
.../audit/{AuditImp.java => AuditOperator.java} | 174 ++++----
.../apache/inlong/audit/send/SenderHandler.java | 19 +-
.../apache/inlong/audit/send/SenderManager.java | 115 +++---
.../org/apache/inlong/audit/util/AuditData.java | 20 +-
.../apache/inlong/audit/util/AuditDataTest.java | 16 +-
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 62 ++-
.../apache/inlong/dataproxy/node/Application.java | 458 ++++++++++-----------
...ovider.java => ManagerPropsConfigProvider.java} | 38 +-
.../sort/standalone/SortStandaloneApplication.java | 19 +-
.../sort/standalone/metrics/audit/AuditUtils.java | 50 +--
.../inlong/sort/base/metric/SinkMetricData.java | 26 +-
.../inlong/sort/base/metric/SourceMetricData.java | 18 +-
.../server/broker/stats/audit/AuditUtils.java | 42 +-
18 files changed, 605 insertions(+), 734 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index d3ad42538..ff6518ae5 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -19,9 +19,10 @@ package org.apache.inlong.agent.metrics.audit;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
+import java.util.Collections;
import java.util.HashSet;
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
@@ -44,49 +45,47 @@ public class AuditUtils {
private static boolean IS_AUDIT = true;
/**
- * initAudit
+ * Init audit config
*/
public static void initAudit() {
AgentConfiguration conf = AgentConfiguration.getAgentConf();
- // IS_AUDIT
IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE);
if (IS_AUDIT) {
// AuditProxy
String strIpPorts = conf.get(AUDIT_KEY_PROXYS, DEFAULT_AUDIT_PROXYS);
- HashSet<String> proxys = new HashSet<>();
+ HashSet<String> proxySet = new HashSet<>();
if (!StringUtils.isBlank(strIpPorts)) {
String[] ipPorts = strIpPorts.split("\\s+");
- for (String ipPort : ipPorts) {
- proxys.add(ipPort);
- }
+ Collections.addAll(proxySet, ipPorts);
}
- AuditImp.getInstance().setAuditProxy(proxys);
+ AuditOperator.getInstance().setAuditProxy(proxySet);
+
// AuditConfig
String filePath = conf.get(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, AUDIT_DEFAULT_MAX_CACHE_ROWS);
AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
- AuditImp.getInstance().setAuditConfig(auditConfig);
+ AuditOperator.getInstance().setAuditConfig(auditConfig);
}
}
/**
- * add audit metric
+ * Add audit metric
*/
- public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count,
- long size) {
+ public static void add(int auditID, String inlongGroupId, String inlongStreamId,
+ long logTime, int count, long size) {
if (!IS_AUDIT) {
return;
}
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
+ AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
}
/**
- * sendReport
+ * Send audit data
*/
- public static void sendReport() {
+ public static void send() {
if (!IS_AUDIT) {
return;
}
- AuditImp.getInstance().sendReport();
+ AuditOperator.getInstance().send();
}
}
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
index ff48cf47f..6816f57f3 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentMain.java
@@ -85,44 +85,42 @@ public class AgentMain {
}
/**
- * Stopping agent gracefully if get killed.
+ * Stopping agent manager gracefully if it was killed.
*
- * @param manager agent manager
+ * @param agentManager agent manager
*/
- private static void stopManagerIfKilled(AgentManager manager) {
+ private static void stopAgentIfKilled(AgentManager agentManager) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
LOGGER.info("stopping agent gracefully");
- manager.stop();
+ agentManager.stop();
} catch (Exception ex) {
- LOGGER.error("exception while stopping threads", ex);
+ LOGGER.error("stop agent manager error: ", ex);
}
}));
}
/**
* Main entrance.
- *
- * @param args arguments
- * @throws Exception exceptions
*/
public static void main(String[] args) throws Exception {
CommandLine cl = initOptions(args);
assert cl != null;
initAgentConf(cl);
AuditUtils.initAudit();
+
AgentManager manager = new AgentManager();
try {
manager.start();
- stopManagerIfKilled(manager);
- //metrics
+ stopAgentIfKilled(manager);
+ // metrics
MetricObserver.init(AgentConfiguration.getAgentConf().getConfigProperties());
manager.join();
} catch (Exception ex) {
- LOGGER.error("exception caught", ex);
+ LOGGER.error("agent running exception: ", ex);
} finally {
manager.stop();
- AuditUtils.sendReport();
+ AuditUtils.send();
}
}
}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
index fc833bf73..7ae674b77 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/Commands.java
@@ -25,6 +25,9 @@ import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
import org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type;
import org.apache.inlong.audit.protocol.AuditApi.Pong;
+/**
+ * Audit commands, used to get various of ByteBuf.
+ */
public class Commands {
public static int HEAD_LENGTH = 4;
@@ -45,24 +48,24 @@ public class Commands {
public static ByteBuf getAuditRequestBuffer(AuditRequest auditRequest) {
BaseCommand cmdAuditRequest = BaseCommand.newBuilder()
- .setType(Type.AUDITREQUEST)
+ .setType(Type.AUDIT_REQUEST)
.setAuditRequest(auditRequest).build();
return getChannelBuffer(cmdAuditRequest.toByteArray());
}
- public static ByteBuf getAuditReplylBuffer(AuditReply auditReply) {
+ public static ByteBuf getAuditReplyBuffer(AuditReply auditReply) {
BaseCommand cmdAuditReply = BaseCommand.newBuilder()
- .setType(Type.AUDITREPLY)
+ .setType(Type.AUDIT_REPLY)
.setAuditReply(auditReply).build();
return getChannelBuffer(cmdAuditReply.toByteArray());
}
private static ByteBuf getChannelBuffer(byte[] body) {
- /* [totalSize] | [body]*/
+ // [totalSize] | [body]
int totalLength = body.length;
- ByteBuf cmdPingBuffer = ByteBufAllocator.DEFAULT.buffer(HEAD_LENGTH + totalLength);
- cmdPingBuffer.writeInt(totalLength);
- cmdPingBuffer.writeBytes(body);
- return cmdPingBuffer;
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(HEAD_LENGTH + totalLength);
+ buffer.writeInt(totalLength);
+ buffer.writeBytes(body);
+ return buffer;
}
}
diff --git a/inlong-audit/audit-common/src/main/proto/AuditApi.proto b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
index ce98fcdcf..ce89b112b 100644
--- a/inlong-audit/audit-common/src/main/proto/AuditApi.proto
+++ b/inlong-audit/audit-common/src/main/proto/AuditApi.proto
@@ -20,17 +20,17 @@ syntax = "proto3";
package org.apache.inlong.audit.protocol;
message BaseCommand {
- enum Type {
- PING = 0;
- PONG = 1;
- AUDITREQUEST = 2;
- AUDITREPLY = 3;
- }
- Type type = 1;
- AuditRequest audit_request = 2;
- AuditReply audit_reply = 3;
- Ping ping = 4;
- Pong pong = 5;
+ enum Type {
+ PING = 0;
+ PONG = 1;
+ AUDIT_REQUEST = 2;
+ AUDIT_REPLY = 3;
+ }
+ Type type = 1;
+ AuditRequest audit_request = 2;
+ AuditReply audit_reply = 3;
+ Ping ping = 4;
+ Pong pong = 5;
}
message Ping {
@@ -55,8 +55,8 @@ message AuditMessageHeader {
message AuditMessageBody {
uint64 log_ts = 1;
- string inlong_group_id= 2;
- string inlong_stream_id= 3;
+ string inlong_group_id = 2;
+ string inlong_stream_id = 3;
string audit_id = 4;
uint64 count = 5;
uint64 size = 6;
@@ -65,8 +65,8 @@ message AuditMessageBody {
message AuditReply {
enum RSP_CODE {
- SUCCESS = 0;
- FAILED = 1;
+ SUCCESS = 0;
+ FAILED = 1;
DISASTER = 2;
}
uint64 request_id = 1;
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 483698e79..4f4c86345 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -17,22 +17,16 @@
package org.apache.inlong.audit.source;
-import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
-
import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody;
import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE;
@@ -43,43 +37,40 @@ import org.apache.inlong.audit.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
/**
* Server message handler
- *
*/
public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
- private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
-
- private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerMessageHandler.class);
+ private static final Gson GSON = new Gson();
- private AbstractSource source;
private final ChannelGroup allChannels;
- private int maxConnections = Integer.MAX_VALUE;
-
private final ChannelProcessor processor;
private final ServiceDecoder serviceDecoder;
-
- private final Gson gson = new Gson();
+ private final int maxConnections;
public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder,
- ChannelGroup allChannels, Integer maxCons) {
- this.source = source;
+ ChannelGroup allChannels, Integer maxCons) {
this.processor = source.getChannelProcessor();
this.serviceDecoder = serviceDecoder;
this.allChannels = allChannels;
this.maxConnections = maxCons;
-
}
@Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) {
if (allChannels.size() - 1 >= maxConnections) {
- logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1)
- + ", maxConnections="
- + maxConnections + ",channel is " + ctx.channel());
ctx.channel().disconnect();
ctx.channel().close();
+ LOGGER.warn("refuse to connect to channel: {}, connections={}, maxConnections={}",
+ ctx.channel(), allChannels.size() - 1, maxConnections);
}
allChannels.add(ctx.channel());
ctx.fireChannelActive();
@@ -93,126 +84,115 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- logger.debug("message received");
if (msg == null) {
- logger.warn("get null message event, just skip");
+ LOGGER.warn("get null message event, just skip");
return;
}
- ByteBuf cb = (ByteBuf) msg;
- int len = cb.readableBytes();
+ ByteBuf buf = (ByteBuf) msg;
+ int len = buf.readableBytes();
if (len == 0) {
- logger.warn("receive message skip empty msg.");
- cb.clear();
+ LOGGER.warn("receive message skip empty msg");
+ buf.clear();
return;
}
Channel remoteChannel = ctx.channel();
- BaseCommand cmd = null;
+ BaseCommand cmd;
try {
- cmd = serviceDecoder.extractData(cb, remoteChannel);
+ cmd = serviceDecoder.extractData(buf, remoteChannel);
} catch (Exception ex) {
- logger.error("extractData has error e {}", ex);
- throw new IOException(ex.getCause());
+ LOGGER.error("extract data error: ", ex);
+ throw new IOException(ex);
}
-
if (cmd == null) {
- logger.warn("receive message extractData is null");
+ LOGGER.warn("extract data from received msg is null");
return;
}
+
ByteBuf channelBuffer = null;
switch (cmd.getType()) {
case PING:
checkArgument(cmd.hasPing());
- channelBuffer = Commands.getPongChannelBuffer();
+ channelBuffer = Commands.getPongChannelBuffer();
break;
case PONG:
checkArgument(cmd.hasPong());
- channelBuffer = Commands.getPingChannelBuffer();
+ channelBuffer = Commands.getPingChannelBuffer();
break;
- case AUDITREQUEST:
+ case AUDIT_REQUEST:
checkArgument(cmd.hasAuditRequest());
AuditReply auditReply = handleRequest(cmd.getAuditRequest());
- channelBuffer = Commands.getAuditReplylBuffer(auditReply);
+ channelBuffer = Commands.getAuditReplyBuffer(auditReply);
break;
- case AUDITREPLY:
+ case AUDIT_REPLY:
checkArgument(cmd.hasAuditReply());
break;
default:
- channelBuffer = null;
}
if (channelBuffer != null) {
writeResponse(remoteChannel, channelBuffer);
}
}
- private AuditReply handleRequest(AuditRequest auditRequest) {
- AuditReply reply = null;
- if (auditRequest != null) {
- List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
- if (bodyList != null) {
- int errorMsgBody = 0;
- for (AuditMessageBody auditMessageBody : bodyList) {
- AuditData auditData = new AuditData();
- auditData.setIp(auditRequest.getMsgHeader().getIp());
- auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
- auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
- auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
- auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
-
- auditData.setLogTs(auditMessageBody.getLogTs());
- auditData.setAuditId(auditMessageBody.getAuditId());
- auditData.setCount(auditMessageBody.getCount());
- auditData.setDelay(auditMessageBody.getDelay());
- auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
- auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
- auditData.setSize(auditMessageBody.getSize());
-
- byte[] body = null;
- try {
- body = gson.toJson(auditData).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- logger.error("UnsupportedEncodingException = {}", e);
- }
- if (body != null) {
- Event event = null;
- try {
- event = EventBuilder.withBody(body, null);
- processor.processEvent(event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- errorMsgBody++;
- }
- }
- }
- if (errorMsgBody != 0) {
- reply = AuditReply.newBuilder().setRequestId(auditRequest.getRequestId())
- .setMessage("Error writing to controller,data "
- + "will discard. error body num = "
- + errorMsgBody).setRspCode(RSP_CODE.FAILED).build();
- }
+ private AuditReply handleRequest(AuditRequest auditRequest) throws Exception {
+ if (auditRequest == null) {
+ throw new Exception("audit request cannot be null");
+ }
+ AuditReply reply = AuditReply.newBuilder()
+ .setRequestId(auditRequest.getRequestId())
+ .setRspCode(RSP_CODE.SUCCESS)
+ .build();
+ List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
+ int errorMsgBody = 0;
+ for (AuditMessageBody auditMessageBody : bodyList) {
+ AuditData auditData = new AuditData();
+ auditData.setIp(auditRequest.getMsgHeader().getIp());
+ auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
+ auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
+ auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
+ auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
+
+ auditData.setLogTs(auditMessageBody.getLogTs());
+ auditData.setAuditId(auditMessageBody.getAuditId());
+ auditData.setCount(auditMessageBody.getCount());
+ auditData.setDelay(auditMessageBody.getDelay());
+ auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
+ auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
+ auditData.setSize(auditMessageBody.getSize());
+
+ try {
+ byte[] body = GSON.toJson(auditData).getBytes(StandardCharsets.UTF_8);
+ Event event = EventBuilder.withBody(body, null);
+ processor.processEvent(event);
+ } catch (Throwable ex) {
+ LOGGER.error("writing data error, discard it: ", ex);
+ errorMsgBody++;
}
}
- if (reply == null) {
- reply = AuditReply.newBuilder().setRequestId(auditRequest.getRequestId())
- .setRspCode(RSP_CODE.SUCCESS).build();
+
+ if (errorMsgBody != 0) {
+ reply = reply.toBuilder()
+ .setMessage("writing data error, discard it, error body count=" + errorMsgBody)
+ .setRspCode(RSP_CODE.FAILED)
+ .build();
}
+
return reply;
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.error("exception caught", cause);
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOGGER.error("exception caught", cause);
}
- private void writeResponse(Channel remoteChannel, ByteBuf buffer) throws Exception {
- if (remoteChannel.isWritable()) {
- remoteChannel.writeAndFlush(buffer);
- } else {
- logger.warn(
- "the send buffer2 is full, so disconnect it!please check remote client"
- + "; Connection info:" + remoteChannel);
- throw new Exception(new Throwable(
- "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
- + remoteChannel));
+ private void writeResponse(Channel channel, ByteBuf buffer) throws Exception {
+ if (channel.isWritable()) {
+ channel.writeAndFlush(buffer);
+ return;
}
+
+ String msg = String.format("remote channel=%s is not writable, please check remote client!", channel);
+ LOGGER.warn(msg);
+ throw new Exception(msg);
}
+
}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
similarity index 60%
rename from inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
rename to inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
index 2ef56cac9..ddbafd916 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditImp.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java
@@ -36,47 +36,61 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDITREQUEST;
+import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;
-public class AuditImp {
- private static final Logger logger = LoggerFactory.getLogger(AuditImp.class);
- private static AuditImp auditImp = new AuditImp();
+/**
+ * Audit operator, which is singleton.
+ */
+public class AuditOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
private static final String FIELD_SEPARATORS = ":";
- private ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<String, StatInfo>();
- private HashMap<String, StatInfo> threadSumMap = new HashMap<String, StatInfo>();
- private ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<String, StatInfo>();
- private List<String> deleteKeyList = new ArrayList<String>();
- private AuditConfig auditConfig = null;
- private Config config = new Config();
- private Long sdkTime;
+ private static final int BATCH_NUM = 100;
+ private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
+ private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
+ private static final int PERIOD = 1000 * 60;
+ private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
+ private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
+ private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
+ private final List<String> deleteKeyList = new ArrayList<>();
+ private final Config config = new Config();
+ private final Timer timer = new Timer();
private int packageId = 1;
private int dataId = 0;
- private static final int BATCH_NUM = 100;
- boolean inited = false;
+ private boolean initialized = false;
private SenderManager manager;
- private static ReentrantLock globalLock = new ReentrantLock();
- private static int PERIOD = 1000 * 60;
- private Timer timer = new Timer();
- private TimerTask timerTask = new TimerTask() {
+
+ private final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
try {
- sendReport();
+ send();
} catch (Exception e) {
- logger.error(e.getMessage());
+ LOGGER.error(e.getMessage());
}
}
};
+ private AuditConfig auditConfig = null;
+
+ /**
+ * Not support create from outer.
+ */
+ private AuditOperator() {
- public static AuditImp getInstance() {
- return auditImp;
+ }
+
+ /**
+ * Get AuditOperator instance.
+ */
+ public static AuditOperator getInstance() {
+ return AUDIT_OPERATOR;
}
/**
* init
*/
private void init() {
- if (inited) {
+ if (initialized) {
return;
}
config.init();
@@ -88,29 +102,25 @@ public class AuditImp {
}
/**
- * setAuditProxy
- *
- * @param ipPortList
+ * Set AuditProxy from the ip
*/
public void setAuditProxy(HashSet<String> ipPortList) {
try {
- globalLock.lockInterruptibly();
- if (!inited) {
+ GLOBAL_LOCK.lockInterruptibly();
+ if (!initialized) {
init();
- inited = true;
+ initialized = true;
}
this.manager.setAuditProxy(ipPortList);
} catch (InterruptedException e) {
- logger.error(e.getMessage());
+ LOGGER.error(e.getMessage());
} finally {
- globalLock.unlock();
+ GLOBAL_LOCK.unlock();
}
}
/**
* set audit config
- *
- * @param config
*/
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
@@ -118,14 +128,7 @@ public class AuditImp {
}
/**
- * api
- *
- * @param auditID
- * @param inlongGroupID
- * @param inlongStreamID
- * @param logTime
- * @param count
- * @param size
+ * Add audit data
*/
public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
long delayTime = System.currentTimeMillis() - logTime;
@@ -135,30 +138,21 @@ public class AuditImp {
}
/**
- * add by key
- *
- * @param key
- * @param count
- * @param size
- * @param delayTime
+ * Add audit info by key.
*/
private void addByKey(String key, long count, long size, long delayTime) {
- try {
- if (countMap.get(key) == null) {
- countMap.put(key, new StatInfo(0L, 0L, 0L));
- }
- countMap.get(key).count.addAndGet(count);
- countMap.get(key).size.addAndGet(size);
- countMap.get(key).delay.addAndGet(delayTime * count);
- } catch (Exception e) {
- return;
+ if (countMap.get(key) == null) {
+ countMap.put(key, new StatInfo(0L, 0L, 0L));
}
+ countMap.get(key).count.addAndGet(count);
+ countMap.get(key).size.addAndGet(size);
+ countMap.get(key).delay.addAndGet(delayTime * count);
}
/**
- * Report audit data
+ * Send audit data
*/
- public synchronized void sendReport() {
+ public synchronized void send() {
manager.clearBuffer();
resetStat();
// Retrieve statistics from the list of objects without statistics to be eliminated
@@ -183,76 +177,78 @@ public class AuditImp {
this.deleteCountMap.put(key, value);
}
this.deleteKeyList.clear();
- sdkTime = Calendar.getInstance().getTimeInMillis();
- AuditApi.AuditMessageHeader mssageHeader = AuditApi.AuditMessageHeader.newBuilder()
+
+ long sdkTime = Calendar.getInstance().getTimeInMillis();
+ AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder()
.setIp(config.getLocalIP()).setDockerId(config.getDockerId())
.setThreadId(String.valueOf(Thread.currentThread().getId()))
.setSdkTs(sdkTime).setPacketId(packageId)
.build();
- AuditApi.AuditRequest.Builder requestBulid = AuditApi.AuditRequest.newBuilder();
- requestBulid.setMsgHeader(mssageHeader).setRequestId(manager.nextRequestId());
- for (Map.Entry<String, StatInfo> entry : threadSumMap.entrySet()) {
+ AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
+ requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());
+
+ // process the stat info for all threads
+ for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
long logTime = Long.parseLong(keyArray[0]) * PERIOD;
String inlongGroupID = keyArray[1];
String inlongStreamID = keyArray[2];
String auditID = keyArray[3];
StatInfo value = entry.getValue();
- AuditApi.AuditMessageBody mssageBody = AuditApi.AuditMessageBody.newBuilder()
- .setLogTs(logTime).setInlongGroupId(inlongGroupID)
- .setInlongStreamId(inlongStreamID).setAuditId(auditID)
- .setCount(value.count.get()).setSize(value.size.get())
+ AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder()
+ .setLogTs(logTime)
+ .setInlongGroupId(inlongGroupID)
+ .setInlongStreamId(inlongStreamID)
+ .setAuditId(auditID)
+ .setCount(value.count.get())
+ .setSize(value.size.get())
.setDelay(value.delay.get())
.build();
- requestBulid.addMsgBody(mssageBody);
+ requestBuild.addMsgBody(msgBody);
+
if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
- sendByBaseCommand(sdkTime, requestBulid.build());
- requestBulid.clearMsgBody();
+ sendByBaseCommand(requestBuild.build());
+ requestBuild.clearMsgBody();
}
}
- if (requestBulid.getMsgBodyCount() > 0) {
- sendByBaseCommand(sdkTime, requestBulid.build());
- requestBulid.clearMsgBody();
+ if (requestBuild.getMsgBodyCount() > 0) {
+ sendByBaseCommand(requestBuild.build());
+ requestBuild.clearMsgBody();
}
- threadSumMap.clear();
- logger.info("finish send report.");
+ threadCountMap.clear();
+
+ LOGGER.info("finish report audit data");
}
/**
- * send base command
- *
- * @param sdkTime
- * @param auditRequest
+ * Send base command
*/
- private void sendByBaseCommand(long sdkTime, AuditApi.AuditRequest auditRequest) {
+ private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
- baseCommand.setType(AUDITREQUEST).setAuditRequest(auditRequest).build();
- manager.send(sdkTime, baseCommand.build());
+ baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
+ manager.send(baseCommand.build());
}
/**
* Summary
- *
- * @param key
- * @param statInfo
*/
private void sumThreadGroup(String key, StatInfo statInfo) {
long count = statInfo.count.getAndSet(0);
if (0 == count) {
return;
}
- if (threadSumMap.get(key) == null) {
- threadSumMap.put(key, new StatInfo(0, 0, 0));
+ if (threadCountMap.get(key) == null) {
+ threadCountMap.put(key, new StatInfo(0, 0, 0));
}
long size = statInfo.size.getAndSet(0);
long delay = statInfo.delay.getAndSet(0);
- threadSumMap.get(key).count.addAndGet(count);
- threadSumMap.get(key).size.addAndGet(size);
- threadSumMap.get(key).delay.addAndGet(delay);
+ threadCountMap.get(key).count.addAndGet(count);
+ threadCountMap.get(key).size.addAndGet(size);
+ threadCountMap.get(key).delay.addAndGet(delay);
}
/**
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
index dd8ff1a3a..e2f8fc5b9 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderHandler.java
@@ -23,13 +23,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
- private static final Logger logger = LoggerFactory.getLogger(SenderHandler.class);
- private SenderManager manager;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SenderHandler.class);
+ private final SenderManager manager;
/**
* Constructor
- *
- * @param manager
*/
public SenderHandler(SenderManager manager) {
this.manager = manager;
@@ -39,11 +38,11 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
* Message Received
*/
@Override
- public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, byte[] e) {
+ public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, byte[] e) {
try {
manager.onMessageReceived(ctx, e);
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ LOGGER.error("channelRead0 error: ", ex);
}
}
@@ -55,7 +54,7 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
try {
manager.onExceptionCaught(ctx, e);
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ LOGGER.error("caught exception: ", ex);
}
}
@@ -63,11 +62,11 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
* Disconnected channel
*/
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
try {
super.channelInactive(ctx);
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ LOGGER.error("channelInactive error: ", ex);
}
}
@@ -79,7 +78,7 @@ public class SenderHandler extends SimpleChannelInboundHandler<byte[]> {
try {
super.channelUnregistered(ctx);
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ LOGGER.error("channelUnregistered error: ", ex);
}
}
}
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
index b841014d5..9d317a238 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderManager.java
@@ -42,28 +42,27 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
- * sender manager
+ * Audit sender manager
*/
public class SenderManager {
- private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+
public static final Long MAX_REQUEST_ID = 1000000000L;
- private static final int SEND_INTERVAL_MS = 20;
public static final int ALL_CONNECT_CHANNEL = -1;
public static final int DEFAULT_CONNECT_CHANNEL = 2;
- public static final String LF = "\n";
+ private static final Logger logger = LoggerFactory.getLogger(SenderManager.class);
+ private static final int SEND_INTERVAL_MS = 20;
+ private final SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
+ private final AtomicLong requestIdSeq = new AtomicLong(0L);
+ private final ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>();
+
private SenderGroup sender;
private int maxConnectChannels = ALL_CONNECT_CHANNEL;
- private SecureRandom sRandom = new SecureRandom(Long.toString(System.currentTimeMillis()).getBytes());
// IPList
- private HashSet<String> currentIpPorts = new HashSet<String>();
- private AtomicLong requestIdSeq = new AtomicLong(0L);
- private ConcurrentHashMap<Long, AuditData> dataMap = new ConcurrentHashMap<>();
+ private HashSet<String> currentIpPorts = new HashSet<>();
private AuditConfig auditConfig;
/**
* Constructor
- *
- * @param config
*/
public SenderManager(AuditConfig config) {
this(config, DEFAULT_CONNECT_CHANNEL);
@@ -71,9 +70,6 @@ public class SenderManager {
/**
* Constructor
- *
- * @param config
- * @param maxConnectChannels
*/
public SenderManager(AuditConfig config, int maxConnectChannels) {
try {
@@ -95,15 +91,15 @@ public class SenderManager {
this.sender.setHasSendError(false);
this.currentIpPorts = ipPortList;
int ipSize = ipPortList.size();
- int needNewSize = 0;
+ int needNewSize;
if (this.maxConnectChannels == ALL_CONNECT_CHANNEL || this.maxConnectChannels >= ipSize) {
needNewSize = ipSize;
} else {
needNewSize = maxConnectChannels;
}
+
HashSet<String> updateConfigIpLists = new HashSet<>();
- List<String> availableIpLists = new ArrayList<String>();
- availableIpLists.addAll(ipPortList);
+ List<String> availableIpLists = new ArrayList<>(ipPortList);
for (int i = 0; i < needNewSize; i++) {
int availableIpSize = availableIpLists.size();
int newIpPortIndex = this.sRandom.nextInt(availableIpSize);
@@ -116,12 +112,10 @@ public class SenderManager {
}
/**
- * next requestid
- *
- * @return
+ * next request id
*/
public Long nextRequestId() {
- Long requestId = requestIdSeq.getAndIncrement();
+ long requestId = requestIdSeq.getAndIncrement();
if (requestId > MAX_REQUEST_ID) {
requestId = 0L;
requestIdSeq.set(requestId);
@@ -130,22 +124,17 @@ public class SenderManager {
}
/**
- * send data
- *
- * @param sdkTime
- * @param baseCommand
+ * Send data with command
*/
- public void send(long sdkTime, AuditApi.BaseCommand baseCommand) {
- AuditData data = new AuditData(sdkTime, baseCommand);
- // Cache first
+ public void send(AuditApi.BaseCommand baseCommand) {
+ AuditData data = new AuditData(baseCommand);
+ // cache first
this.dataMap.putIfAbsent(baseCommand.getAuditRequest().getRequestId(), data);
this.sendData(data.getDataByte());
}
/**
- * send data
- *
- * @param data
+ * Send data byte array
*/
private void sendData(byte[] data) {
if (data == null || data.length <= 0) {
@@ -167,7 +156,7 @@ public class SenderManager {
logger.info("audit failed cache size: {}", this.dataMap.size());
for (AuditData data : this.dataMap.values()) {
this.sendData(data.getDataByte());
- sleep(SEND_INTERVAL_MS);
+ this.sleep();
}
if (this.dataMap.size() == 0) {
checkAuditFile();
@@ -190,10 +179,10 @@ public class SenderManager {
File file = new File(auditConfig.getDisasterFile());
if (!file.exists()) {
if (!file.createNewFile()) {
- logger.error("create {} {}", auditConfig.getDisasterFile(), " failed");
+ logger.error("create file {} failed", auditConfig.getDisasterFile());
return;
}
- logger.info("create {}", auditConfig.getDisasterFile());
+ logger.info("create file {} success", auditConfig.getDisasterFile());
}
if (file.length() > auditConfig.getMaxFileSize()) {
file.delete();
@@ -204,15 +193,13 @@ public class SenderManager {
objectOutputStream.writeObject(dataMap);
objectOutputStream.close();
fos.close();
- } catch (IOException ioException) {
- logger.error(ioException.getMessage());
+ } catch (IOException e) {
+ logger.error("write local file error: ", e);
}
}
/**
* check file path
- *
- * @return
*/
private boolean checkFilePath() {
File file = new File(auditConfig.getFilePath());
@@ -220,7 +207,7 @@ public class SenderManager {
if (!file.mkdirs()) {
return false;
}
- logger.info("create {}", auditConfig.getFilePath());
+ logger.info("create file {} success", auditConfig.getFilePath());
}
return true;
}
@@ -235,26 +222,26 @@ public class SenderManager {
return;
}
FileInputStream inputStream = new FileInputStream(auditConfig.getDisasterFile());
- ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
+ ObjectInputStream objectStream = new ObjectInputStream(inputStream);
ConcurrentHashMap<Long, AuditData> fileData =
- (ConcurrentHashMap<Long, AuditData>) objectInputStream.readObject();
+ (ConcurrentHashMap<Long, AuditData>) objectStream.readObject();
for (Map.Entry<Long, AuditData> entry : fileData.entrySet()) {
if (this.dataMap.size() < (auditConfig.getMaxCacheRow() / 2)) {
this.dataMap.putIfAbsent(entry.getKey(), entry.getValue());
}
this.sendData(entry.getValue().getDataByte());
- sleep(SEND_INTERVAL_MS);
+ this.sleep();
}
- objectInputStream.close();
+ objectStream.close();
inputStream.close();
file.delete();
- } catch (IOException | ClassNotFoundException ioException) {
- logger.error(ioException.getMessage());
+ } catch (IOException | ClassNotFoundException e) {
+ logger.error("check audit file error: ", e);
}
}
/**
- * get data map szie
+ * get data map size
*/
public int getDataMapSize() {
return this.dataMap.size();
@@ -262,68 +249,60 @@ public class SenderManager {
/**
* processing return package
- *
- * @param ctx ctx
- * @param msg msg
*/
public void onMessageReceived(ChannelHandlerContext ctx, byte[] msg) {
try {
- //Analyze abnormal events
- byte[] readBytes = msg;
- AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(readBytes);
+ // Analyze abnormal events
+ AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.parseFrom(msg);
// Parse request id
Long requestId = baseCommand.getAuditReply().getRequestId();
AuditData data = this.dataMap.get(requestId);
if (data == null) {
- logger.error("can not find the requestid onMessageReceived:" + requestId);
+ logger.error("can not find the request id onMessageReceived: " + requestId);
return;
}
- logger.info("audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode().toString());
+
+ logger.info("audit-proxy response code: {}", baseCommand.getAuditReply().getRspCode());
if (AuditApi.AuditReply.RSP_CODE.SUCCESS.equals(baseCommand.getAuditReply().getRspCode())) {
this.dataMap.remove(requestId);
return;
}
+
int resendTimes = data.increaseResendTimes();
- if (resendTimes < org.apache.inlong.audit.send.SenderGroup.MAX_SEND_TIMES) {
+ if (resendTimes < SenderGroup.MAX_SEND_TIMES) {
this.sendData(data.getDataByte());
}
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ logger.error("onMessageReceived exception: ", ex);
this.sender.setHasSendError(true);
}
}
/**
* Handle the packet return exception
- *
- * @param ctx
- * @param e
*/
public void onExceptionCaught(ChannelHandlerContext ctx, Throwable e) {
- logger.error(e.getCause().getMessage());
+ logger.error("channel context " + ctx + " occurred exception: ", e);
try {
this.sender.setHasSendError(true);
} catch (Throwable ex) {
- logger.error(ex.getMessage());
+ logger.error("setHasSendError error: ", ex);
}
}
/**
- * sleep
- *
- * @param millisecond
+ * sleep SEND_INTERVAL_MS
*/
- private void sleep(int millisecond) {
+ private void sleep() {
try {
- Thread.sleep(millisecond);
- } catch (Throwable e) {
- logger.error(e.getMessage());
+ Thread.sleep(SEND_INTERVAL_MS);
+ } catch (Throwable ex) {
+ logger.error("sleep error: ", ex);
}
}
/***
* set audit config
- * @param config
*/
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
index 4dde7e118..a7064b996 100644
--- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
+++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditData.java
@@ -24,33 +24,27 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
public class AuditData implements Serializable {
+
public static int HEAD_LENGTH = 4;
- private final long sdkTime;
private final AuditApi.BaseCommand content;
- private AtomicInteger resendTimes = new AtomicInteger(0);
+ private final AtomicInteger resendTimes = new AtomicInteger(0);
/**
* Constructor
- *
- * @param sdkTime
- * @param content
*/
- public AuditData(long sdkTime, AuditApi.BaseCommand content) {
- this.sdkTime = sdkTime;
+ public AuditData(AuditApi.BaseCommand content) {
this.content = content;
}
/**
- * set resendTimes
+ * Increase and get resend times
*/
public int increaseResendTimes() {
return this.resendTimes.incrementAndGet();
}
/**
- * getDataByte
- *
- * @return
+ * Get data byte array
*/
public byte[] getDataByte() {
return addBytes(ByteBuffer.allocate(HEAD_LENGTH).putInt(content.toByteArray().length).array(),
@@ -60,9 +54,7 @@ public class AuditData implements Serializable {
/**
* Concatenated byte array
*
- * @param data1
- * @param data2
- * @return data1 and data2 combined package result
+ * @return data1 and data2 combined package result
*/
public byte[] addBytes(byte[] data1, byte[] data2) {
byte[] data3 = new byte[data1.length + data2.length];
diff --git a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
index 82babece5..4de0413ff 100644
--- a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
+++ b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditDataTest.java
@@ -20,19 +20,20 @@ package org.apache.inlong.audit.util;
import org.apache.inlong.audit.protocol.AuditApi;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AuditDataTest {
+
@Test
public void increaseResendTimes() {
- AuditApi.BaseCommand content = null;
- AuditData test = new AuditData(System.currentTimeMillis(), content);
+ AuditData test = new AuditData(null);
int resendTimes = test.increaseResendTimes();
- assertTrue(resendTimes == 1);
+ assertEquals(1, resendTimes);
resendTimes = test.increaseResendTimes();
- assertTrue(resendTimes == 2);
+ assertEquals(2, resendTimes);
resendTimes = test.increaseResendTimes();
- assertTrue(resendTimes == 3);
+ assertEquals(3, resendTimes);
}
@Test
@@ -54,12 +55,9 @@ public class AuditDataTest {
AuditApi.AuditRequest request = AuditApi.AuditRequest.newBuilder().setMsgHeader(headerBuilder.build())
.addMsgBody(bodyBuilder.build()).build();
AuditApi.BaseCommand baseCommand = AuditApi.BaseCommand.newBuilder().setAuditRequest(request).build();
- AuditData test = new AuditData(System.currentTimeMillis(), baseCommand);
+ AuditData test = new AuditData(baseCommand);
byte[] data = test.getDataByte();
assertTrue(data.length > 0);
}
- @Test
- public void addBytes() {
- }
}
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 24c251626..331620af9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,13 +17,11 @@
package org.apache.inlong.dataproxy.metrics.audit;
-import java.util.HashSet;
-import java.util.Map;
import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -32,9 +30,12 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
/**
- *
- * AuditUtils
+ * Audit utils
*/
public class AuditUtils {
@@ -51,7 +52,7 @@ public class AuditUtils {
private static boolean IS_AUDIT = true;
/**
- * initAudit
+ * Init audit
*/
public static void initAudit() {
// IS_AUDIT
@@ -62,26 +63,21 @@ public class AuditUtils {
HashSet<String> proxys = new HashSet<>();
if (!StringUtils.isBlank(strIpPorts)) {
String[] ipPorts = strIpPorts.split("\\s+");
- for (String ipPort : ipPorts) {
- proxys.add(ipPort);
- }
+ Collections.addAll(proxys, ipPorts);
}
- AuditImp.getInstance().setAuditProxy(proxys);
+ AuditOperator.getInstance().setAuditProxy(proxys);
// AuditConfig
String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH);
int maxCacheRow = NumberUtils.toInt(
CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
AUDIT_DEFAULT_MAX_CACHE_ROWS);
AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
- AuditImp.getInstance().setAuditConfig(auditConfig);
+ AuditOperator.getInstance().setAuditConfig(auditConfig);
}
}
/**
- * add
- *
- * @param auditID
- * @param event
+ * Add audit data
*/
public static void add(int auditID, Event event) {
if (!IS_AUDIT || event == null) {
@@ -97,23 +93,20 @@ public class AuditUtils {
if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
}
- AuditImp.getInstance().add(auditID, inlongGroupId,
+ AuditOperator.getInstance().add(auditID, inlongGroupId,
inlongStreamId, logTime, msgCount, event.getBody().length);
} else {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
- AuditImp.getInstance().add(auditID, groupId,
+ AuditOperator.getInstance().add(auditID, groupId,
streamId, dataTime, msgCount, event.getBody().length);
}
}
/**
- * getLogTime
- *
- * @param headers
- * @return
+ * Get LogTime from headers
*/
public static long getLogTime(Map<String, String> headers) {
String strLogTime = headers.get(Constants.HEADER_KEY_MSG_TIME);
@@ -131,10 +124,7 @@ public class AuditUtils {
}
/**
- * getLogTime
- *
- * @param event
- * @return
+ * Get LogTime from event
*/
public static long getLogTime(Event event) {
if (event != null) {
@@ -145,20 +135,16 @@ public class AuditUtils {
}
/**
- * getAuditFormatTime
- *
- * @param msgTime
- * @return
+ * Get AuditFormatTime
*/
public static long getAuditFormatTime(long msgTime) {
- long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
- return auditFormatTime;
+ return msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
}
/**
- * sendReport
+ * Send audit data
*/
- public static void sendReport() {
- AuditImp.getInstance().sendReport();
+ public static void send() {
+ AuditOperator.getInstance().send();
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index 836ba0ce2..881e2b0b2 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -66,35 +66,30 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
/**
- * Application
+ * DataProxy application
*/
public class Application {
- private static final Logger logger = LoggerFactory
- .getLogger(Application.class);
-
- public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
- public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+ private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
+ private static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+ private static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
private final List<LifecycleAware> components;
private final LifecycleSupervisor supervisor;
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
private MaterializedConfiguration materializedConfiguration;
private MonitorService monitorServer;
- private final ReentrantLock lifecycleLock = new ReentrantLock();
private AdminTask adminTask;
- private HeartbeatManager heartbeatManager;
/**
* Constructor
*/
public Application() {
- this(new ArrayList<LifecycleAware>(0));
+ this(new ArrayList<>(0));
}
/**
* Constructor
- *
- * @param components
*/
public Application(List<LifecycleAware> components) {
this.components = components;
@@ -102,7 +97,161 @@ public class Application {
}
/**
- * start
+ * Main entrance
+ */
+ public static void main(String[] args) {
+ try {
+ SSLUtil.initGlobalSSLParameters();
+ Options options = new Options();
+
+ Option option = new Option("n", "name", true, "the name of this agent");
+ option.setRequired(true);
+ options.addOption(option);
+
+ option = new Option("f", "conf-file", true,
+ "specify a config file (required if -z missing)");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option(null, "no-reload-conf", false,
+ "do not reload config file if changed");
+ options.addOption(option);
+
+ // Options for Zookeeper
+ option = new Option("z", "zkConnString", true,
+ "specify the ZooKeeper connection to use (required if -f missing)");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("p", "zkBasePath", true,
+ "specify the base path in ZooKeeper for agent configs");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("h", "help", false, "display help text");
+ options.addOption(option);
+
+ // load configuration data from manager
+ option = new Option(null, "load-conf-from-manager", false,
+ "load configuration data from manager");
+ option.setRequired(false);
+ options.addOption(option);
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine commandLine = parser.parse(options, args);
+
+ if (commandLine.hasOption('h')) {
+ new HelpFormatter().printHelp("flume-ng agent", options, true);
+ return;
+ }
+
+ // start by manager configuration
+ if (commandLine.hasOption("load-conf-from-manager")) {
+ startByManagerConf(commandLine);
+ return;
+ }
+
+ String agentName = commandLine.getOptionValue('n');
+ boolean reload = !commandLine.hasOption("no-reload-conf");
+
+ boolean isZkConfigured = commandLine.hasOption('z') || commandLine.hasOption("zkConnString");
+
+ Application application;
+ if (isZkConfigured) {
+ // get options
+ String zkConnectionStr = commandLine.getOptionValue('z');
+ String baseZkPath = commandLine.getOptionValue('p');
+
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ List<LifecycleAware> components = Lists.newArrayList();
+ PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider(
+ agentName, zkConnectionStr, baseZkPath, eventBus);
+ components.add(zProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider(
+ agentName, zkConnectionStr, baseZkPath);
+ application = new Application();
+ application.handleConfigurationEvent(zProvider.getConfiguration());
+ }
+ } else {
+ File configurationFile = new File(commandLine.getOptionValue('f'));
+ // The following is to ensure that by default the agent will fail on startup
+ // if the file does not exist.
+ if (!configurationFile.exists()) {
+ // If command line invocation, then need to fail fast
+ if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
+ String path = configurationFile.getPath();
+ try {
+ path = configurationFile.getCanonicalPath();
+ } catch (IOException ex) {
+ LOGGER.error("failed to read canonical path for file: " + path, ex);
+ }
+ throw new ParseException("configuration file does not exist: " + path);
+ }
+ }
+
+ List<LifecycleAware> components = Lists.newArrayList();
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ PollingPropertiesFileConfigurationProvider configurationProvider;
+ configurationProvider = new PollingPropertiesFileConfigurationProvider(
+ agentName, configurationFile, eventBus, 30);
+ components.add(configurationProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ PropertiesFileConfigurationProvider configurationProvider;
+ configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile);
+ application = new Application();
+ application.handleConfigurationEvent(configurationProvider.getConfiguration());
+ }
+ }
+ // metrics
+ MetricObserver.init(CommonPropertiesHolder.get());
+ // audit
+ AuditUtils.initAudit();
+
+ final Application appReference = application;
+ Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook") {
+ @Override
+ public void run() {
+ AuditUtils.send();
+ appReference.stop();
+ }
+ });
+
+ // start application
+ application.start();
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ LOGGER.error("fatal error occurred while running data-proxy: ", e);
+ }
+ }
+
+ /**
+ * Start by Manager config
+ */
+ private static void startByManagerConf(CommandLine commandLine) {
+ String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+ ManagerPropsConfigProvider configurationProvider = new ManagerPropsConfigProvider(proxyName);
+ Application application = new Application();
+ application.handleConfigurationEvent(configurationProvider.getConfiguration());
+ application.start();
+
+ final Application appReference = application;
+ Runtime.getRuntime().addShutdownHook(new Thread("data-proxy-shutdown-hook") {
+ @Override
+ public void run() {
+ appReference.stop();
+ }
+ });
+ }
+
+ /**
+ * Start all components
*/
public void start() {
lifecycleLock.lock();
@@ -110,27 +259,23 @@ public class Application {
for (LifecycleAware component : components) {
// update dataproxy config
if (component instanceof IDataProxyConfigHolder) {
- ((IDataProxyConfigHolder) component)
- .setDataProxyConfig(
- RemoteConfigManager.getInstance().getCurrentClusterConfigRef());
+ ((IDataProxyConfigHolder) component).setDataProxyConfig(
+ RemoteConfigManager.getInstance().getCurrentClusterConfigRef());
}
- supervisor.supervise(component,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
// start admin task
this.adminTask = new AdminTask(new Context(CommonPropertiesHolder.get()));
this.adminTask.start();
- this.heartbeatManager = new HeartbeatManager();
- this.heartbeatManager.start();
+ HeartbeatManager heartbeatManager = new HeartbeatManager();
+ heartbeatManager.start();
} finally {
lifecycleLock.unlock();
}
}
/**
- * handleConfigurationEvent
- *
- * @param conf
+ * Handle the configuration event
*/
@Subscribe
public void handleConfigurationEvent(MaterializedConfiguration conf) {
@@ -139,8 +284,7 @@ public class Application {
stopAllComponents();
startAllComponents(conf);
} catch (InterruptedException e) {
- logger.info("Interrupted while trying to handle configuration event");
- return;
+ LOGGER.info("interrupted while handle the configuration event");
} finally {
// If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
if (lifecycleLock.isHeldByCurrentThread()) {
@@ -150,7 +294,7 @@ public class Application {
}
/**
- * stop
+ * Stop the application
*/
public void stop() {
lifecycleLock.lock();
@@ -170,102 +314,93 @@ public class Application {
}
/**
- * stopAllComponents
+ * Stop all components
*/
private void stopAllComponents() {
- if (this.materializedConfiguration != null) {
- logger.info("Shutting down configuration: {}", this.materializedConfiguration);
- for (Entry<String, SourceRunner> entry : this.materializedConfiguration
- .getSourceRunners().entrySet()) {
+ LOGGER.info("shutting down configuration: {}", materializedConfiguration);
+ if (materializedConfiguration != null) {
+ for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) {
try {
- logger.info("Stopping Source " + entry.getKey());
+ LOGGER.info("stopping source " + entry.getKey());
supervisor.unsupervise(entry.getValue());
} catch (Exception e) {
- logger.error("Error while stopping {}", entry.getValue(), e);
+ LOGGER.error("error while stopping source " + entry.getValue(), e);
}
}
- for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners()
- .entrySet()) {
+ for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
try {
- logger.info("Stopping Sink " + entry.getKey());
+ LOGGER.info("stopping sink " + entry.getKey());
supervisor.unsupervise(entry.getValue());
} catch (Exception e) {
- logger.error("Error while stopping {}", entry.getValue(), e);
+ LOGGER.error("error while stopping sink " + entry.getValue(), e);
}
}
- for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels()
- .entrySet()) {
+ for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
try {
- logger.info("Stopping Channel " + entry.getKey());
+ LOGGER.info("stopping channel " + entry.getKey());
supervisor.unsupervise(entry.getValue());
} catch (Exception e) {
- logger.error("Error while stopping {}", entry.getValue(), e);
+ LOGGER.error("error while stopping channel " + entry.getValue(), e);
}
}
}
+
+ LOGGER.info("shutting down monitor server: {}", monitorServer);
if (monitorServer != null) {
monitorServer.stop();
}
}
/**
- * startAllComponents
- *
- * @param materializedConfiguration
+ * Start all components
*/
private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
- logger.info("Starting new configuration:{}", materializedConfiguration);
+ LOGGER.info("Starting new configuration:{}", materializedConfiguration);
this.materializedConfiguration = materializedConfiguration;
-
for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
try {
- logger.info("Starting Channel " + entry.getKey());
- supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ LOGGER.info("starting channel " + entry.getKey());
+ supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(),
+ LifecycleState.START);
} catch (Exception e) {
- logger.error("Error while starting {}", entry.getValue(), e);
+ LOGGER.error("error while starting channel " + entry.getValue(), e);
}
}
- /*
- * Wait for all channels to start.
- */
+ // Wait for all channels to start.
for (Channel ch : materializedConfiguration.getChannels().values()) {
while (ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)) {
try {
- logger.info("Waiting for channel: " + ch.getName()
- + " to start. Sleeping for 500 ms");
+ LOGGER.info("sleeping for 500 ms to wait for channel: {} to start", ch.getName());
Thread.sleep(500);
} catch (InterruptedException e) {
- logger.error("Interrupted while waiting for channel to start.", e);
+ LOGGER.error("interrupted while waiting for channel to start: ", e);
Throwables.propagate(e);
}
}
}
- for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
- .entrySet()) {
+ for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
try {
- logger.info("Starting Sink " + entry.getKey());
- supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ LOGGER.info("starting sink " + entry.getKey());
+ supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(),
+ LifecycleState.START);
} catch (Exception e) {
- logger.error("Error while starting {}", entry.getValue(), e);
+ LOGGER.error("error while starting sink: " + entry.getValue(), e);
}
}
- for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners()
- .entrySet()) {
+ for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) {
try {
- logger.info("Starting Source " + entry.getKey());
- supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ LOGGER.info("starting source " + entry.getKey());
+ supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(),
+ LifecycleState.START);
} catch (Exception e) {
- logger.error("Error while starting {}", entry.getValue(), e);
+ LOGGER.error("error while starting source: " + entry.getValue(), e);
}
}
@@ -273,7 +408,7 @@ public class Application {
}
/**
- * loadMonitoring
+ * Load monitoring
*/
@SuppressWarnings("unchecked")
private void loadMonitoring() {
@@ -285,8 +420,7 @@ public class Application {
Class<? extends MonitorService> klass;
try {
// Is it a known type?
- klass = MonitoringType.valueOf(
- monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+ klass = MonitoringType.valueOf(monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
} catch (Exception e) {
// Not a known type, use FQCN
klass = (Class<? extends MonitorService>) Class.forName(monitorType);
@@ -295,187 +429,15 @@ public class Application {
Context context = new Context();
for (String key : keys) {
if (key.startsWith(CONF_MONITOR_PREFIX)) {
- context.put(key.substring(CONF_MONITOR_PREFIX.length()),
- systemProps.getProperty(key));
+ context.put(key.substring(CONF_MONITOR_PREFIX.length()), systemProps.getProperty(key));
}
}
monitorServer.configure(context);
monitorServer.start();
}
} catch (Exception e) {
- logger.warn("Error starting monitoring. "
- + "Monitoring might not be available.", e);
- }
-
- }
-
- /**
- * main
- *
- * @param args
- */
- public static void main(String[] args) {
-
- try {
- SSLUtil.initGlobalSSLParameters();
-
- Options options = new Options();
-
- Option option = new Option("n", "name", true, "the name of this agent");
- option.setRequired(true);
- options.addOption(option);
-
- option = new Option("f", "conf-file", true,
- "specify a config file (required if -z missing)");
- option.setRequired(false);
- options.addOption(option);
-
- option = new Option(null, "no-reload-conf", false,
- "do not reload config file if changed");
- options.addOption(option);
-
- // Options for Zookeeper
- option = new Option("z", "zkConnString", true,
- "specify the ZooKeeper connection to use (required if -f missing)");
- option.setRequired(false);
- options.addOption(option);
-
- option = new Option("p", "zkBasePath", true,
- "specify the base path in ZooKeeper for agent configs");
- option.setRequired(false);
- options.addOption(option);
-
- option = new Option("h", "help", false, "display help text");
- options.addOption(option);
-
- // load configuration data from manager
- option = new Option(null, "load-conf-from-manager", false,
- "load configuration data from manager");
- option.setRequired(false);
- options.addOption(option);
-
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = parser.parse(options, args);
-
- if (commandLine.hasOption('h')) {
- new HelpFormatter().printHelp("flume-ng agent", options, true);
- return;
- }
-
- // start by manager configuation
- if (commandLine.hasOption("load-conf-from-manager")) {
- startByManagerConf(commandLine);
- return;
- }
-
- String agentName = commandLine.getOptionValue('n');
- boolean reload = !commandLine.hasOption("no-reload-conf");
-
- boolean isZkConfigured = false;
- if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
- isZkConfigured = true;
- }
-
- Application application;
- if (isZkConfigured) {
- // get options
- String zkConnectionStr = commandLine.getOptionValue('z');
- String baseZkPath = commandLine.getOptionValue('p');
-
- if (reload) {
- EventBus eventBus = new EventBus(agentName + "-event-bus");
- List<LifecycleAware> components = Lists.newArrayList();
- PollingZooKeeperConfigurationProvider zProvider = new PollingZooKeeperConfigurationProvider(
- agentName, zkConnectionStr, baseZkPath, eventBus);
- components.add(zProvider);
- application = new Application(components);
- eventBus.register(application);
- } else {
- StaticZooKeeperConfigurationProvider zProvider = new StaticZooKeeperConfigurationProvider(
- agentName, zkConnectionStr, baseZkPath);
- application = new Application();
- application.handleConfigurationEvent(zProvider.getConfiguration());
- }
- } else {
- File configurationFile = new File(commandLine.getOptionValue('f'));
-
- // The following is to ensure that by default the agent will fail on startup
- // if the file does not exist.
- if (!configurationFile.exists()) {
- // If command line invocation, then need to fail fast
- if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
- String path = configurationFile.getPath();
- try {
- path = configurationFile.getCanonicalPath();
- } catch (IOException ex) {
- logger.error("Failed to read canonical path for file: " + path,
- ex);
- }
- throw new ParseException(
- "The specified configuration file does not exist: " + path);
- }
- }
- List<LifecycleAware> components = Lists.newArrayList();
-
- if (reload) {
- EventBus eventBus = new EventBus(agentName + "-event-bus");
- PollingPropertiesFileConfigurationProvider configurationProvider;
- configurationProvider = new PollingPropertiesFileConfigurationProvider(
- agentName, configurationFile, eventBus, 30);
- components.add(configurationProvider);
- application = new Application(components);
- eventBus.register(application);
- } else {
- PropertiesFileConfigurationProvider configurationProvider;
- configurationProvider = new PropertiesFileConfigurationProvider(
- agentName, configurationFile);
- application = new Application();
- application.handleConfigurationEvent(configurationProvider.getConfiguration());
- }
- }
- // metrics
- MetricObserver.init(CommonPropertiesHolder.get());
- // audit
- AuditUtils.initAudit();
-
- final Application appReference = application;
- Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
-
- @Override
- public void run() {
- AuditUtils.sendReport();
- appReference.stop();
- }
- });
-
- // start application
- application.start();
- Thread.sleep(5000);
- } catch (Exception e) {
- logger.error("A fatal error occurred while running. Exception follows.", e);
+ LOGGER.warn("starting monitoring error, the monitoring might not be available: ", e);
}
}
- /**
- * startByManagerConf
- *
- * @param commandLine
- */
- private static void startByManagerConf(CommandLine commandLine) {
- String proxyName = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
- ManagerPropertiesConfigurationProvider configurationProvider = new ManagerPropertiesConfigurationProvider(
- proxyName);
- Application application = new Application();
- application.handleConfigurationEvent(configurationProvider.getConfiguration());
- application.start();
-
- final Application appReference = application;
- Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
-
- @Override
- public void run() {
- appReference.stop();
- }
- });
- }
-}
\ No newline at end of file
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java
similarity index 62%
rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java
rename to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java
index d1451ef28..04adf218b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropertiesConfigurationProvider.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/ManagerPropsConfigProvider.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,48 +17,38 @@
package org.apache.inlong.dataproxy.node;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flume.conf.FlumeConfiguration;
import org.apache.flume.node.AbstractConfigurationProvider;
import org.apache.inlong.dataproxy.config.RemoteConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
- *
- * ManagerPropertiesConfigurationProvider
+ * Manager properties configuration provider
*/
-public class ManagerPropertiesConfigurationProvider extends
- AbstractConfigurationProvider {
+public class ManagerPropsConfigProvider extends AbstractConfigurationProvider {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(ManagerPropertiesConfigurationProvider.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ManagerPropsConfigProvider.class);
- /**
- * ManagerPropertiJesConfigurationProvider
- *
- * @param agentName
- */
- public ManagerPropertiesConfigurationProvider(String agentName) {
+ public ManagerPropsConfigProvider(String agentName) {
super(agentName);
}
/**
- * getFlumeConfiguration
- *
- * @return
+ * Get Flume configuration
*/
@Override
public FlumeConfiguration getFlumeConfiguration() {
try {
Map<String, String> flumeProperties = RemoteConfigManager.getInstance().getFlumeProperties();
- LOGGER.info("flumeProperties:{}", flumeProperties);
+ LOGGER.info("all flume props: {}", flumeProperties);
return new FlumeConfiguration(flumeProperties);
} catch (Exception e) {
- LOGGER.error("exception catch:" + e.getMessage(), e);
+ LOGGER.error("get flume props error: ", e);
}
- return new FlumeConfiguration(new HashMap<String, String>());
+ return new FlumeConfiguration(new HashMap<>());
}
}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index 50d6f69ce..e0094a3ce 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -25,38 +25,35 @@ import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
/**
- *
- * SortStandaloneApplication
+ * Sort Standalone Application
*/
public class SortStandaloneApplication {
- public static final Logger LOG = InlongLoggerFactory.getLogger(Application.class);
+ public static final Logger LOGGER = InlongLoggerFactory.getLogger(Application.class);
/**
- * main
- *
- * @param args
+ * Main entrance
*/
public static void main(String[] args) {
- LOG.info("start to sort-standalone");
+ LOGGER.info("start to sort-standalone");
try {
SortCluster cluster = new SortCluster();
- Runtime.getRuntime().addShutdownHook(new Thread("sortstandalone-shutdown-hook") {
+ Runtime.getRuntime().addShutdownHook(new Thread("sort-standalone-shutdown-hook") {
@Override
public void run() {
- AuditUtils.sendReport();
+ AuditUtils.send();
cluster.close();
}
});
- //
+ // start the cluster
cluster.start();
// metrics
MetricObserver.init(CommonPropertiesHolder.get());
AuditUtils.initAudit();
Thread.sleep(5000);
} catch (Exception e) {
- LOG.error("A fatal error occurred while running. Exception follows.", e);
+ LOGGER.error("fatal error occurred while running sort-standalone: ", e);
}
}
}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
index bc0d219b8..6a7a8900c 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/metrics/audit/AuditUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,22 +17,22 @@
package org.apache.inlong.sort.standalone.metrics.audit;
-import java.util.HashSet;
-import java.util.Map;
-
import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
/**
- *
- * AuditUtils
+ * Audit utils
*/
public class AuditUtils {
@@ -49,7 +49,7 @@ public class AuditUtils {
private static boolean IS_AUDIT = true;
/**
- * initAudit
+ * Init audit
*/
public static void initAudit() {
// IS_AUDIT
@@ -60,11 +60,9 @@ public class AuditUtils {
HashSet<String> proxys = new HashSet<>();
if (!StringUtils.isBlank(strIpPorts)) {
String[] ipPorts = strIpPorts.split("\\s+");
- for (String ipPort : ipPorts) {
- proxys.add(ipPort);
- }
+ Collections.addAll(proxys, ipPorts);
}
- AuditImp.getInstance().setAuditProxy(proxys);
+ AuditOperator.getInstance().setAuditProxy(proxys);
// AuditConfig
String filePath = CommonPropertiesHolder.getString(AUDIT_KEY_FILE_PATH,
AUDIT_DEFAULT_FILE_PATH);
@@ -72,30 +70,24 @@ public class AuditUtils {
CommonPropertiesHolder.getString(AUDIT_KEY_MAX_CACHE_ROWS),
AUDIT_DEFAULT_MAX_CACHE_ROWS);
AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
- AuditImp.getInstance().setAuditConfig(auditConfig);
+ AuditOperator.getInstance().setAuditConfig(auditConfig);
}
}
/**
- * add
- *
- * @param auditID
- * @param event
+ * Add audit data
*/
public static void add(int auditID, ProfileEvent event) {
if (IS_AUDIT && event != null) {
String inlongGroupId = event.getInlongGroupId();
String inlongStreamId = event.getInlongStreamId();
long logTime = event.getRawLogTime();
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
}
}
/**
- * add
- *
- * @param auditID
- * @param event
+ * Add audit data
*/
public static void add(int auditID, Event event) {
if (IS_AUDIT && event != null) {
@@ -103,14 +95,14 @@ public class AuditUtils {
String inlongGroupId = SortMetricItem.getInlongGroupId(headers);
String inlongStreamId = SortMetricItem.getInlongStreamId(headers);
long logTime = SortMetricItem.getLogTime(headers);
- AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
+ AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, 1, event.getBody().length);
}
}
/**
- * sendReport
+ * Send audit data
*/
- public static void sendReport() {
- AuditImp.getInstance().sendReport();
+ public static void send() {
+ AuditOperator.getInstance().send();
}
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 45e33c3c2..a117a3f27 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -22,7 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
@@ -43,9 +43,10 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
*/
public class SinkMetricData implements MetricData {
- private MetricGroup metricGroup;
- private Map<String, String> labels;
- private AuditImp auditImp;
+ private final MetricGroup metricGroup;
+ private final Map<String, String> labels;
+ private final RegisteredMetric registeredMetric;
+ private AuditOperator auditOperator;
private Counter numRecordsOut;
private Counter numBytesOut;
private Counter numRecordsOutForMeter;
@@ -54,7 +55,6 @@ public class SinkMetricData implements MetricData {
private Counter dirtyBytes;
private Meter numRecordsOutPerSecond;
private Meter numBytesOutPerSecond;
- private RegisteredMetric registeredMetric;
public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
@@ -94,8 +94,8 @@ public class SinkMetricData implements MetricData {
}
if (option.getIpPorts().isPresent()) {
- AuditImp.getInstance().setAuditProxy(option.getIpPortList());
- this.auditImp = AuditImp.getInstance();
+ AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
+ this.auditOperator = AuditOperator.getInstance();
}
}
@@ -271,8 +271,8 @@ public class SinkMetricData implements MetricData {
numBytesOutForMeter.inc(rowSize);
}
- if (auditImp != null) {
- auditImp.add(
+ if (auditOperator != null) {
+ auditOperator.add(
Constants.AUDIT_SORT_OUTPUT,
getGroupId(),
getStreamId(),
@@ -296,10 +296,10 @@ public class SinkMetricData implements MetricData {
public String toString() {
switch (registeredMetric) {
case DIRTY:
- return "SinkMetricData{"
+ return "SinkMetricData{"
+ "metricGroup=" + metricGroup
+ ", labels=" + labels
- + ", auditImp=" + auditImp
+ + ", auditOperator=" + auditOperator
+ ", dirtyRecords=" + dirtyRecords.getCount()
+ ", dirtyBytes=" + dirtyBytes.getCount()
+ '}';
@@ -307,7 +307,7 @@ public class SinkMetricData implements MetricData {
return "SinkMetricData{"
+ "metricGroup=" + metricGroup
+ ", labels=" + labels
- + ", auditImp=" + auditImp
+ + ", auditOperator=" + auditOperator
+ ", numRecordsOut=" + numRecordsOut.getCount()
+ ", numBytesOut=" + numBytesOut.getCount()
+ ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
@@ -319,7 +319,7 @@ public class SinkMetricData implements MetricData {
return "SinkMetricData{"
+ "metricGroup=" + metricGroup
+ ", labels=" + labels
- + ", auditImp=" + auditImp
+ + ", auditOperator=" + auditOperator
+ ", numRecordsOut=" + numRecordsOut.getCount()
+ ", numBytesOut=" + numBytesOut.getCount()
+ ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 3ac6a96f8..b93c5bb53 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -22,7 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.sort.base.Constants;
import java.nio.charset.StandardCharsets;
@@ -40,15 +40,15 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
*/
public class SourceMetricData implements MetricData {
- private MetricGroup metricGroup;
- private Map<String, String> labels;
+ private final MetricGroup metricGroup;
+ private final Map<String, String> labels;
private Counter numRecordsIn;
private Counter numBytesIn;
private Counter numRecordsInForMeter;
private Counter numBytesInForMeter;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
- private AuditImp auditImp;
+ private AuditOperator auditOperator;
public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
@@ -70,8 +70,8 @@ public class SourceMetricData implements MetricData {
}
if (option.getIpPorts().isPresent()) {
- AuditImp.getInstance().setAuditProxy(option.getIpPortList());
- this.auditImp = AuditImp.getInstance();
+ AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
+ this.auditOperator = AuditOperator.getInstance();
}
}
@@ -211,8 +211,8 @@ public class SourceMetricData implements MetricData {
this.numBytesInForMeter.inc(rowDataSize);
}
- if (auditImp != null) {
- auditImp.add(
+ if (auditOperator != null) {
+ auditOperator.add(
Constants.AUDIT_SORT_INPUT,
getGroupId(),
getStreamId(),
@@ -233,7 +233,7 @@ public class SourceMetricData implements MetricData {
+ ", numBytesInForMeter=" + numBytesInForMeter.getCount()
+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
- + ", auditImp=" + auditImp
+ + ", auditOperator=" + auditOperator
+ '}';
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
index b4fbbb9da..dea86d46a 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/stats/audit/AuditUtils.java
@@ -1,10 +1,10 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,8 +17,7 @@
package org.apache.inlong.tubemq.server.broker.stats.audit;
-import java.util.Map;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
@@ -26,18 +25,21 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.server.broker.stats.TrafficInfo;
import org.apache.inlong.tubemq.server.common.fileconfig.ADConfig;
+import java.util.Map;
+
/**
* AuditUtils
*
* A wrapper class for Audit report operations
*/
public class AuditUtils {
+
private static ADConfig auditConfig = new ADConfig();
/**
* init audit instance
*
- * @param adConfig the initial configure
+ * @param adConfig the initial configure
*/
public static void initAudit(ADConfig adConfig) {
// check whether enable audit
@@ -48,28 +50,27 @@ public class AuditUtils {
auditConfig = adConfig;
// initial audit instance
- AuditImp.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet());
+ AuditOperator.getInstance().setAuditProxy(adConfig.getAuditProxyAddrSet());
AuditConfig auditConfig =
new AuditConfig(adConfig.getAuditCacheFilePath(),
adConfig.getAuditCacheMaxRows());
- AuditImp.getInstance().setAuditConfig(auditConfig);
+ AuditOperator.getInstance().setAuditConfig(auditConfig);
}
/**
* add produce record
*
- * @param groupId the group id
- * @param streamId the stream id
- * @param logTime the record time
- * @param count the record count
- * @param size the record size
+ * @param groupId the group id
+ * @param streamId the stream id
+ * @param logTime the record time
+ * @param count the record count
+ * @param size the record size
*/
- public static void addProduceRecord(String groupId, String streamId,
- String logTime, long count, long size) {
+ public static void addProduceRecord(String groupId, String streamId, String logTime, long count, long size) {
if (!auditConfig.isAuditEnable()) {
return;
}
- AuditImp.getInstance().add(auditConfig.getAuditIdProduce(),
+ AuditOperator.getInstance().add(auditConfig.getAuditIdProduce(),
groupId, streamId, DateTimeConvertUtils.yyyyMMddHHmm2ms(logTime), count, size);
}
@@ -79,8 +80,7 @@ public class AuditUtils {
* @param trafficInfos the consumed traffic information
*/
public static void addConsumeRecord(Map<String, TrafficInfo> trafficInfos) {
- if (!auditConfig.isAuditEnable()
- || trafficInfos == null || trafficInfos.isEmpty()) {
+ if (!auditConfig.isAuditEnable() || trafficInfos == null || trafficInfos.isEmpty()) {
return;
}
for (Map.Entry<String, TrafficInfo> entry : trafficInfos.entrySet()) {
@@ -99,19 +99,19 @@ public class AuditUtils {
// #127.0.0.1#32677#test_consume#2#202207041219
// topicName, brokerIP, clientId,
// clientIP, client processId, consume group, partitionId, msgTime
- AuditImp.getInstance().add(auditConfig.getAuditIdConsume(),
+ AuditOperator.getInstance().add(auditConfig.getAuditIdConsume(),
statKeyItems[0], statKeyItems[5], DateTimeConvertUtils.yyyyMMddHHmm2ms(statKeyItems[7]),
entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
}
}
/**
- * close audit report
+ * Close audit, if it was enabled, send its data first.
*/
public static void closeAudit() {
if (!auditConfig.isAuditEnable()) {
return;
}
- AuditImp.getInstance().sendReport();
+ AuditOperator.getInstance().send();
}
}