You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/09/19 03:07:52 UTC
[inlong] branch master updated: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 59352497b [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)
59352497b is described below
commit 59352497b14fd0dd835a2038b699adc5266cf590
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Sep 19 11:07:46 2022 +0800
[INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)
---
.../inlong/dataproxy/consts/ConfigConstants.java | 1 +
.../inlong/dataproxy/http/HttpBaseSource.java | 9 +-
.../dataproxy/http/SimpleMessageHandler.java | 35 ++---
.../dataproxy/metrics/DataProxyMetricItemSet.java | 128 +++++++++++++++
.../apache/inlong/dataproxy/sink/PulsarSink.java | 174 ++++++++++-----------
.../org/apache/inlong/dataproxy/sink/TubeSink.java | 121 +++++---------
.../dataproxy/sink/pulsar/PulsarClientService.java | 8 +-
.../dataproxy/sink/pulsar/SendMessageCallBack.java | 2 +-
.../apache/inlong/dataproxy/source/BaseSource.java | 24 ++-
.../dataproxy/source/ServerMessageHandler.java | 65 +++-----
.../dataproxy/source/SimpleMessageHandler.java | 11 +-
.../inlong/dataproxy/source/SimpleTcpSource.java | 14 --
.../inlong/dataproxy/source/SimpleUdpSource.java | 1 +
13 files changed, 320 insertions(+), 273 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 1a6fcada1..ae6bc380f 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -96,6 +96,7 @@ public class ConfigConstants {
public static final String CLUSTER_ID_KEY = "clusterId";
public static final String MANAGER_HOST = "manager.hosts";
public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
+ public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges";
public static final String PROXY_REPORT_IP = "proxy.report.ip";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 84e45e2fd..973bc128c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -29,6 +29,7 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -77,7 +78,13 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource,
statIntervalSec, maxMonitorCnt);
}
// register metrics
- this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ ConfigManager configManager = ConfigManager.getInstance();
+ String clusterId =
+ configManager.getCommonProperties().getOrDefault(
+ ConfigConstants.PROXY_CLUSTER_NAME,
+ ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+ this.metricItemSet =
+ new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
MetricRegister.register(metricItemSet);
super.start();
logger.info("{} started!", this.getName());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 16bb5c917..c5ca7c1d7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -36,7 +36,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.http.exception.MessageProcessException;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
@@ -175,14 +174,14 @@ public class SimpleMessageHandler implements MessageHandler {
intMsgCnt, 1, data.length, 0);
monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
}
- addMetric(true, data.length, event);
+ addStatistics(true, data.length, event);
} catch (ChannelException ex) {
if (monitorIndex != null) {
monitorIndex.addAndGet(strBuff.toString(),
0, 0, 0, intMsgCnt);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
}
- addMetric(false, data.length, event);
+ addStatistics(false, data.length, event);
logCounter++;
if (logCounter == 1 || logCounter % 1000 == 0) {
LOG.error("Error writing to channel, and will retry after 1s, ex={},"
@@ -223,31 +222,19 @@ public class SimpleMessageHandler implements MessageHandler {
}
/**
- * add audit metric
+ * add statistics information
*
- * @param result success or failure
+ * @param isSuccess success or failure
* @param size message size
* @param event message event
*/
- private void addMetric(boolean result, long size, Event event) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, this.metricItemSet.getName());
- dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, this.metricItemSet.getName());
- DataProxyMetricItem.fillInlongId(event, dimensions);
- DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- if (result) {
- metricItem.readSuccessCount.incrementAndGet();
- metricItem.readSuccessSize.addAndGet(size);
- try {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
- } catch (Exception e) {
- LOG.error("add audit metric has exception e= {}", e);
- }
- } else {
- metricItem.readFailCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(size);
+ private void addStatistics(boolean isSuccess, long size, Event event) {
+ if (event == null) {
+ return;
+ }
+ metricItemSet.fillSrcMetricItemsByEvent(event, isSuccess, size);
+ if (isSuccess) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index 811442428..e420e03a2 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -17,8 +17,15 @@
package org.apache.inlong.dataproxy.metrics;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.flume.Event;
import org.apache.inlong.common.metric.MetricDomain;
import org.apache.inlong.common.metric.MetricItemSet;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.consts.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
/**
*
@@ -26,6 +33,8 @@ import org.apache.inlong.common.metric.MetricItemSet;
*/
@MetricDomain(name = "DataProxy")
public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+ private String clusterId = null;
+ private String sourceDataId = null;
/**
* Constructor
@@ -36,6 +45,125 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
super(name);
}
+ /**
+ * Constructor
+ *
+ * @param clusterId the cluster id
+ * @param name the module name
+ */
+ public DataProxyMetricItemSet(String clusterId, String name) {
+ super(name);
+ this.clusterId = clusterId;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param clusterId the cluster id
+ * @param name the module name
+ * @param sourceDataId the source data id
+ */
+ public DataProxyMetricItemSet(String clusterId, String name, String sourceDataId) {
+ super(name);
+ this.clusterId = clusterId;
+ this.sourceDataId = sourceDataId;
+ }
+
+ /**
+ * Fill source metric items by event
+ *
+ * @param event the event object
+ * @param isSuccess whether success read
+ * @param size the message size
+ */
+ public void fillSrcMetricItemsByEvent(Event event, boolean isSuccess, long size) {
+ fillMetricItemsByEvent(event, true, true, isSuccess, size, 0);
+ }
+
+ /**
+ * Fill sink metric items by event
+ *
+ * @param event the event object
+ * @param isSuccess whether success read or send
+ * @param size the message size
+ */
+ public void fillSinkReadMetricItemsByEvent(Event event, boolean isSuccess, long size) {
+ fillMetricItemsByEvent(event, false, true, isSuccess, size, 0);
+ }
+
+ /**
+ * Fill sink send metric items by event
+ *
+ * @param event the event object
+ * @param sentTime the sent time
+ * @param isSuccess whether success read or send
+ * @param size the message size
+ */
+ public void fillSinkSendMetricItemsByEvent(Event event, long sentTime,
+ boolean isSuccess, long size) {
+ fillMetricItemsByEvent(event, false, false, isSuccess, size, sentTime);
+ }
+
+ /**
+ * Fill metric items by event
+ *
+ * @param event the event object
+ * @param isSource whether source part
+ * @param isReadOp whether read operation
+ * @param isSuccess whether success read or send
+ * @param size the message size
+ */
+ private void fillMetricItemsByEvent(Event event, boolean isSource,
+ boolean isReadOp, boolean isSuccess,
+ long size, long sendTime) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, clusterId);
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID,
+ event.getHeaders().get(AttributeConstants.GROUP_ID));
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID,
+ event.getHeaders().get(AttributeConstants.STREAM_ID));
+ long dataTime = NumberUtils.toLong(
+ event.getHeaders().get(AttributeConstants.DATA_TIME));
+ long auditFormatTime = dataTime - dataTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
+ if (isSource) {
+ dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, name);
+ dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, sourceDataId);
+ } else {
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, name);
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+ event.getHeaders().get(ConfigConstants.TOPIC_KEY));
+ }
+ DataProxyMetricItem metricItem = findMetricItem(dimensions);
+ if (isReadOp) {
+ if (isSuccess) {
+ metricItem.readSuccessCount.incrementAndGet();
+ metricItem.readSuccessSize.addAndGet(size);
+ } else {
+ metricItem.readFailCount.incrementAndGet();
+ metricItem.readFailSize.addAndGet(size);
+ }
+ } else {
+ if (isSuccess) {
+ metricItem.sendSuccessCount.incrementAndGet();
+ metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+ if (sendTime > 0) {
+ long currentTime = System.currentTimeMillis();
+ long msgDataTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.DATA_TIME));
+ long msgRcvTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.RCV_TIME));
+ metricItem.sinkDuration.addAndGet(currentTime - sendTime);
+ metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL);
+ metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
+ }
+ } else {
+ metricItem.sendFailCount.incrementAndGet();
+ metricItem.sendFailSize.addAndGet(event.getBody().length);
+ }
+ }
+ }
+
/**
* createItem
*
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 69da396bc..3785ae993 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -17,6 +17,9 @@
package org.apache.inlong.dataproxy.sink;
+import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
+
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -24,6 +27,16 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import io.netty.handler.codec.TooLongFrameException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -45,8 +58,8 @@ import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.pulsar.CreatePulsarClientCallBack;
import org.apache.inlong.dataproxy.sink.pulsar.PulsarClientService;
import org.apache.inlong.dataproxy.sink.pulsar.SendMessageCallBack;
@@ -62,19 +75,6 @@ import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
-
/**
* Use pulsarSink need adding such config, if these ara not config in dataproxy-pulsar.conf,
* PulsarSink will use default value.
@@ -172,7 +172,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
/*
* metric
*/
- private Map<String, String> dimensions;
private DataProxyMetricItemSet metricItemSet;
private ConfigManager configManager;
private Map<String, String> topicProperties;
@@ -196,7 +195,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
topicProperties = configManager.getTopicProperties();
pulsarCluster = configManager.getMqClusterUrl2Token();
pulsarConfig = configManager.getMqClusterConfig(); //pulsar common config
- Map<String, String> commonProperties = configManager.getCommonProperties();
sinkThreadPoolSize = pulsarConfig.getThreadNum();
if (sinkThreadPoolSize <= 0) {
sinkThreadPoolSize = 1;
@@ -294,11 +292,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
@Override
public void start() {
logger.info("[{}] pulsar sink starting...", getName());
- // register metrics
- this.dimensions = new HashMap<>();
- this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
-
sinkCounter.start();
pulsarClientService.initCreateConnection(this);
@@ -326,8 +319,13 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i);
sinkThreadPool[i].start();
}
-
- this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ // register metricItemSet
+ ConfigManager configManager = ConfigManager.getInstance();
+ String clusterId =
+ configManager.getCommonProperties().getOrDefault(
+ ConfigConstants.PROXY_CLUSTER_NAME,
+ ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+ this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
MetricRegister.register(metricItemSet);
this.canTake = true;
logger.info("[{}] Pulsar sink started", getName());
@@ -405,19 +403,13 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
+ "last long time it will cause memoryChannel full and fileChannel write.)", getName());
tx.rollback();
// metric
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().get(ConfigConstants.TOPIC_KEY));
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(event.getBody().length);
+ this.metricItemSet.fillSinkReadMetricItemsByEvent(
+ event, false, event.getBody().length);
} else {
tx.commit();
// metric
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().get(ConfigConstants.TOPIC_KEY));
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readSuccessCount.incrementAndGet();
- metricItem.readSuccessSize.addAndGet(event.getBody().length);
+ this.metricItemSet.fillSinkReadMetricItemsByEvent(
+ event, true, event.getBody().length);
}
} else {
status = Status.BACKOFF;
@@ -437,37 +429,6 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
return status;
}
- private void editStatistic(final Event event, boolean isSuccess, boolean isOrder) {
- if (event == null
- || pulsarConfig.getStatIntervalSec() <= 0) {
- return;
- }
- // get statistic items
- String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
- String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
- int intMsgCnt = Integer.parseInt(
- event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
- long dataTimeL = Long.parseLong(
- event.getHeaders().get(AttributeConstants.DATA_TIME));
- String orderType = isOrder ? "order" : "non-order";
- StringBuilder newBase = new StringBuilder(512)
- .append(this.getName()).append(SEPARATOR).append(topic).append(SEPARATOR)
- .append(streamId).append(SEPARATOR).append(nodeIp)
- .append(SEPARATOR).append(NetworkUtils.getLocalIp())
- .append(SEPARATOR).append(orderType).append(SEPARATOR)
- .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
- long messageSize = event.getBody().length;
- if (isSuccess) {
- monitorIndex.addAndGet(newBase.toString(), intMsgCnt, 1, messageSize, 0);
- } else {
- monitorIndex.addAndGet(newBase.toString(), 0, 0, 0, intMsgCnt);
- if (logPrinterB.shouldPrint()) {
- logger.warn("error cannot send event, {} event size is {}", topic, messageSize);
- }
- }
- }
-
@Override
public void handleCreateClientSuccess(String url) {
logger.info("createConnection success for url = {}", url);
@@ -481,7 +442,8 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
}
@Override
- public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) {
+ public void handleMessageSendSuccess(String topic, Object result,
+ EventStat eventStat, long startTime) {
/*
* Statistics pulsar performance
*/
@@ -505,20 +467,11 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
getName(), (nowCnt - oldCnt), (t2 - t1));
t1 = t2;
}
- Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.sendSuccessCount.incrementAndGet();
- metricItem.sendSuccessSize.addAndGet(eventStat.getEvent().getBody().length);
- metricItem.sendCount.incrementAndGet();
- metricItem.sendSize.addAndGet(eventStat.getEvent().getBody().length);
- monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
- editStatistic(eventStat.getEvent(), true, eventStat.isOrderMessage());
-
+ addStatistics(eventStat, true, startTime);
}
@Override
public void handleMessageSendException(String topic, EventStat eventStat, Object e) {
- monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
boolean needRetry = true;
if (e instanceof NotFoundException) {
logger.error("NotFoundException for topic " + topic + ", message will be discard!", e);
@@ -533,26 +486,67 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
if (logPrinterB.shouldPrint()) {
logger.error("send failed for " + getName(), e);
}
- if (eventStat.getRetryCnt() == 0) {
- editStatistic(eventStat.getEvent(), false, eventStat.isOrderMessage());
- }
}
- Map<String, String> dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.sendFailCount.incrementAndGet();
- metricItem.sendFailSize.addAndGet(eventStat.getEvent().getBody().length);
+ addStatistics(eventStat, false, 0);
eventStat.incRetryCnt();
if (!eventStat.isOrderMessage() && needRetry) {
processResendEvent(eventStat);
}
}
- private Map<String, String> getNewDimension(String otherKey, String value) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
- dimensions.put(otherKey, value);
- return dimensions;
+ /**
+ * Add statistics information
+ *
+ * @param eventStat the statistic event
+ * @param isSuccess is processed successfully
+ * @param sendTime the send time when success processed
+ */
+ private void addStatistics(EventStat eventStat, boolean isSuccess, long sendTime) {
+ if (eventStat == null || eventStat.getEvent() == null) {
+ return;
+ }
+ Event event = eventStat.getEvent();
+ // add jmx metric items;
+ PulsarSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
+ event, sendTime, isSuccess, event.getBody().length);
+ if (isSuccess) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+ }
+ if (pulsarConfig.getStatIntervalSec() <= 0) {
+ return;
+ }
+ // add monitor items base file storage
+ String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
+ String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+ String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
+ int intMsgCnt = Integer.parseInt(
+ event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
+ long dataTimeL = Long.parseLong(
+ event.getHeaders().get(AttributeConstants.DATA_TIME));
+ String orderType = eventStat.isOrderMessage() ? "order" : "non-order";
+ // build statistic key
+ StringBuilder newBase = new StringBuilder(512)
+ .append(getName()).append(SEP_HASHTAG).append(topic)
+ .append(SEP_HASHTAG).append(streamId).append(SEP_HASHTAG)
+ .append(nodeIp).append(SEP_HASHTAG).append(NetworkUtils.getLocalIp())
+ .append(SEP_HASHTAG).append(orderType).append(SEP_HASHTAG)
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+ // count data
+ if (isSuccess) {
+ monitorIndex.addAndGet(newBase.toString(),
+ intMsgCnt, 1, event.getBody().length, 0);
+ monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
+ } else {
+ monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
+ if (eventStat.getRetryCnt() == 0) {
+ monitorIndex.addAndGet(newBase.toString(),
+ 0, 0, 0, intMsgCnt);
+ if (logPrinterB.shouldPrint()) {
+ logger.warn("error cannot send event, {} event size is {}",
+ topic, event.getBody().length);
+ }
+ }
+ }
}
private boolean processEvent(EventStat eventStat) {
@@ -626,7 +620,9 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag
}
}
} catch (Throwable throwable) {
- monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
+ if (pulsarConfig.getStatIntervalSec() > 0) {
+ monitorIndexExt.incrementAndGet("PULSAR_SINK_DROPPED");
+ }
if (logPrinterC.shouldPrint()) {
logger.error(getName() + " Discard msg because put events to both of "
+ "queue and fileChannel fail", throwable);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index f1f2b6bbb..86787136a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -21,7 +21,6 @@ import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEP_HASHTAG;
import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT;
import com.google.common.base.Preconditions;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -53,7 +52,6 @@ import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler;
@@ -94,7 +92,6 @@ public class TubeSink extends AbstractSink implements Configurable {
// used for RoundRobin different cluster while send message
private RateLimiter diskRateLimiter;
private Thread[] sinkThreadPool;
- private Map<String, String> dimensions;
private DataProxyMetricItemSet metricItemSet;
private final AtomicBoolean started = new AtomicBoolean(false);
private static final LogCounter LOG_SINK_TASK_PRINTER =
@@ -186,12 +183,13 @@ public class TubeSink extends AbstractSink implements Configurable {
monitorIndexExt = new MonitorIndexExt("Tube_Sink_monitors#" + this.getName(),
statIntervalSec, maxMonitorCnt);
}
- // initial dimensions
- this.dimensions = new HashMap<>();
- this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
// register metrics
- this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ ConfigManager configManager = ConfigManager.getInstance();
+ String clusterId =
+ configManager.getCommonProperties().getOrDefault(
+ ConfigConstants.PROXY_CLUSTER_NAME,
+ ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+ this.metricItemSet = new DataProxyMetricItemSet(clusterId, this.getName());
MetricRegister.register(metricItemSet);
// create tube connection
try {
@@ -272,27 +270,17 @@ public class TubeSink extends AbstractSink implements Configurable {
if (diskRateLimiter != null) {
diskRateLimiter.acquire(event.getBody().length);
}
- Map<String, String> dimensions;
- dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().getOrDefault(ConfigConstants.TOPIC_KEY, ""));
if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
tx.commit();
cachedMsgCnt.incrementAndGet();
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readSuccessCount.incrementAndGet();
- metricItem.readSuccessSize.addAndGet(event.getBody().length);
+ this.metricItemSet.fillSinkReadMetricItemsByEvent(
+ event, true, event.getBody().length);
} else {
tx.rollback();
- //logger.info("[{}] Channel --> Queue(has no enough space,current code point) "
- // + "--> TubeMQ, check if TubeMQ server or network is ok.(if this situation last long time "
- // + "it will cause memoryChannel full and fileChannel write.)", getName());
- // metric
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- metricItem.readFailCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(event.getBody().length);
+ this.metricItemSet.fillSinkReadMetricItemsByEvent(
+ event, false, event.getBody().length);
}
} else {
- // logger.info("[{}]No data to process in the channel.",getName());
status = Status.BACKOFF;
tx.commit();
}
@@ -436,17 +424,9 @@ public class TubeSink extends AbstractSink implements Configurable {
successMsgCnt.incrementAndGet();
inflightMsgCnt.decrementAndGet();
takenMsgCnt.decrementAndGet();
- this.addMetric(myEventStat.getEvent(), true, sendTime);
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS);
- }
- this.editStatistic(myEventStat.getEvent(), true);
+ this.addStatistics(myEventStat.getEvent(), true, false, sendTime);
} else {
- this.addMetric(myEventStat.getEvent(), false, 0);
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE);
- }
- this.editStatistic(myEventStat.getEvent(), false);
+ this.addStatistics(myEventStat.getEvent(), false, false, 0);
if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
result.getErrMsg(), resendQueue.size(),
@@ -464,50 +444,34 @@ public class TubeSink extends AbstractSink implements Configurable {
@Override
public void onException(final Throwable e) {
- if (statIntervalSec > 0) {
- monitorIndexExt.incrementAndGet(KEY_SINK_EXP);
- }
- this.editStatistic(myEventStat.getEvent(), false);
+ addStatistics(myEventStat.getEvent(), false, true, 0);
resendEvent(myEventStat, true);
}
/**
- * addMetric
+ * Add statistics information
+ *
+ * @param event the statistic event
+ * @param isSuccess is processed successfully
+ * @param isException is exception when failure processed
+ * @param sendTime the send time when success processed
*/
- private void addMetric(Event event, boolean result, long sendTime) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, TubeSink.this.getName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, TubeSink.this.getName());
- dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
- event.getHeaders().get(ConfigConstants.TOPIC_KEY));
- DataProxyMetricItem.fillInlongId(event, dimensions);
- DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
- DataProxyMetricItem metricItem = TubeSink.this.metricItemSet.findMetricItem(dimensions);
- if (result) {
- metricItem.sendSuccessCount.incrementAndGet();
- metricItem.sendSuccessSize.addAndGet(event.getBody().length);
+ private void addStatistics(Event event, boolean isSuccess,
+ boolean isException, long sendTime) {
+ if (event == null) {
+ return;
+ }
+ // add jmx metric items;
+ TubeSink.this.metricItemSet.fillSinkSendMetricItemsByEvent(
+ event, sendTime, isSuccess, event.getBody().length);
+ // add audit items;
+ if (isSuccess) {
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
- if (sendTime > 0) {
- long currentTime = System.currentTimeMillis();
- long msgDataTimeL = Long.parseLong(
- event.getHeaders().get(AttributeConstants.DATA_TIME));
- long msgRcvTimeL = Long.parseLong(
- event.getHeaders().get(AttributeConstants.RCV_TIME));
- metricItem.sinkDuration.addAndGet(currentTime - sendTime);
- metricItem.nodeDuration.addAndGet(currentTime - msgRcvTimeL);
- metricItem.wholeDuration.addAndGet(currentTime - msgDataTimeL);
- }
- } else {
- metricItem.sendFailCount.incrementAndGet();
- metricItem.sendFailSize.addAndGet(event.getBody().length);
}
- }
-
- private void editStatistic(final Event event, boolean isSuccess) {
- if (event == null || statIntervalSec <= 0) {
+ if (statIntervalSec <= 0) {
return;
}
- // get statistic items
+ // add monitor items base file storage
String topic = event.getHeaders().get(ConfigConstants.TOPIC_KEY);
String streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
String nodeIp = event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY);
@@ -526,9 +490,14 @@ public class TubeSink extends AbstractSink implements Configurable {
if (isSuccess) {
monitorIndex.addAndGet(newBase.toString(),
intMsgCnt, 1, event.getBody().length, 0);
+ monitorIndexExt.incrementAndGet(KEY_SINK_SUCCESS);
} else {
monitorIndex.addAndGet(newBase.toString(),
0, 0, 0, intMsgCnt);
+ monitorIndexExt.incrementAndGet(KEY_SINK_FAILURE);
+ if (isException) {
+ monitorIndexExt.incrementAndGet(KEY_SINK_EXP);
+ }
}
}
}
@@ -590,14 +559,6 @@ public class TubeSink extends AbstractSink implements Configurable {
}
}
- private Map<String, String> getNewDimension(String otherKey, String value) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
- dimensions.put(otherKey, value);
- return dimensions;
- }
-
/**
* Differentiate unpublished topic sets and publish them
* attention: only append added topics
@@ -695,14 +656,4 @@ public class TubeSink extends AbstractSink implements Configurable {
}
return tmpMasterAddr;
}
-
- /**
- * get metricItemSet
- *
- * @return the metricItemSet
- */
- private DataProxyMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 7ef6228f9..6510bce57 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -34,7 +34,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.EventStat;
import org.apache.inlong.dataproxy.source.MsgType;
import org.apache.inlong.dataproxy.utils.MessageUtils;
@@ -175,6 +174,7 @@ public class PulsarClientService {
// build and send message
Map<String, String> proMap =
MessageUtils.getXfsAttrs(event.getHeaders(), pkgVersion);
+ long startTime = System.currentTimeMillis();
if (es.isOrderMessage()) {
String partitionKey = event.getHeaders().get(AttributeConstants.MESSAGE_PARTITION_KEY);
try {
@@ -183,8 +183,7 @@ public class PulsarClientService {
.key(partitionKey)
.value(event.getBody())
.send();
- sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
+ sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es, startTime);
forCallBackP.setCanUseSend(true);
result = true;
} catch (PulsarClientException ex) {
@@ -204,9 +203,8 @@ public class PulsarClientService {
.value(event.getBody())
.sendAsync()
.thenAccept((msgId) -> {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event);
forCallBackP.setCanUseSend(true);
- sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es);
+ sendMessageCallBack.handleMessageSendSuccess(topic, msgId, es, startTime);
})
.exceptionally((e) -> {
forCallBackP.setCanUseSend(false);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
index ce029759c..4d5b3a393 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
@@ -21,7 +21,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
public interface SendMessageCallBack {
- void handleMessageSendSuccess(String topic, Object msgId, EventStat es);
+ void handleMessageSendSuccess(String topic, Object msgId, EventStat es, long startTime);
void handleMessageSendException(String topic, EventStat es, Object exception);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 12d68bf2a..8fed4bbc3 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -34,10 +34,13 @@ import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +86,8 @@ public abstract class BaseSource
protected boolean customProcessor = false;
+ private DataProxyMetricItemSet metricItemSet;
+
/*
* monitor
*/
@@ -157,6 +162,15 @@ public abstract class BaseSource
FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
}
super.start();
+ // initial metric item set
+ ConfigManager configManager = ConfigManager.getInstance();
+ String clusterId =
+ configManager.getCommonProperties().getOrDefault(
+ ConfigConstants.PROXY_CLUSTER_NAME,
+ ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+ this.metricItemSet =
+ new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port));
+ MetricRegister.register(metricItemSet);
/*
* init monitor logic
*/
@@ -298,7 +312,7 @@ public abstract class BaseSource
ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance();
Class<? extends ChannelInitializer> clazz =
(Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
- Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
+ Constructor ctor = clazz.getConstructor(BaseSource.class, ChannelGroup.class,
String.class, ServiceDecoder.class, String.class, Integer.class,
String.class, String.class, Boolean.class,
Integer.class, Boolean.class, MonitorIndex.class,
@@ -319,6 +333,14 @@ public abstract class BaseSource
return fac;
}
+ /**
+ * get metricItemSet
+ * @return the metricItemSet
+ */
+ public DataProxyMetricItemSet getMetricItemSet() {
+ return metricItemSet;
+ }
+
public Context getContext() {
return context;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 6e92cb6ba..e4a436448 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -42,7 +42,6 @@ import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.InLongMsg;
@@ -53,7 +52,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.MessageIDException;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
@@ -82,7 +80,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private AbstractSource source;
+ private BaseSource source;
private final ChannelGroup allChannels;
@@ -124,7 +122,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
* @param monitorIndexExt MonitorIndexExt
* @param protocolType protocolType
*/
- public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder,
+ public ServerMessageHandler(BaseSource source, ServiceDecoder serviceDecoder,
ChannelGroup allChannels,
String topic, String attr, Boolean filterEmptyMsg,
Integer maxCons, Boolean isCompressed, MonitorIndex monitorIndex,
@@ -137,16 +135,11 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
if (null != attr) {
this.defaultMXAttr = attr;
}
-
this.filterEmptyMsg = filterEmptyMsg;
this.isCompressed = isCompressed;
this.maxConnections = maxCons;
this.protocolType = protocolType;
- if (source instanceof SimpleTcpSource) {
- this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet();
- } else {
- this.metricItemSet = new DataProxyMetricItemSet(this.toString());
- }
+ this.metricItemSet = source.getMetricItemSet();
this.monitorIndex = monitorIndex;
this.monitorIndexExt = monitorIndexExt;
}
@@ -461,7 +454,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
try {
processor.processEvent(event);
monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
- this.addMetric(true, data.length, event);
+ this.addStatistics(true, data.length, event);
monitorIndex.addAndGet(strBuff.toString(),
streamMsgCnt, 1, data.length, 0);
strBuff.delete(0, strBuff.length());
@@ -469,7 +462,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
logger.error("Error writting to channel,data will discard.", ex);
monitorIndexExt.incrementAndGet("EVENT_DROPPED");
monitorIndex.addAndGet(strBuff.toString(), 0, 0, 0, streamMsgCnt);
- this.addMetric(false, data.length, event);
+ this.addStatistics(false, data.length, event);
strBuff.delete(0, strBuff.length());
throw new ChannelException("ProcessEvent error can't write event to channel.");
}
@@ -548,7 +541,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null) {
logger.error("Get null msg, just skip!");
- this.addMetric(false, 0, null);
return;
}
ByteBuf cb = (ByteBuf) msg;
@@ -558,7 +550,6 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
int len = cb.readableBytes();
if (len == 0 && this.filterEmptyMsg) {
logger.warn("Get empty msg from {}, just skip!", strRemoteIP);
- this.addMetric(false, 0, null);
return;
}
// parse message
@@ -569,12 +560,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
strRemoteIP, msgRcvTime, remoteChannel);
if (resultMap == null || resultMap.isEmpty()) {
logger.info("Parse message result is null, from {}", strRemoteIP);
- this.addMetric(false, 0, null);
return;
}
} catch (MessageIDException ex) {
logger.error("MessageIDException ex = {}", ex);
- this.addMetric(false, 0, null);
throw new IOException(ex.getCause());
}
// process message by msgType
@@ -583,12 +572,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
remoteChannel.writeAndFlush(heartbeatBuffer);
- this.addMetric(false, 0, null);
return;
}
// process heart beat 8
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
- this.addMetric(false, 0, null);
return;
}
// process data message
@@ -616,10 +603,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
try {
processor.processEvent(event);
- this.addMetric(true, body.length, event);
+ this.addStatistics(true, body.length, event);
} catch (Throwable ex) {
logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
+ this.addStatistics(false, body.length, event);
throw new ChannelException(
"Process Controller Event error can't write event to channel.");
}
@@ -640,10 +627,10 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
try {
processor.processEvent(event);
- this.addMetric(true, body.length, event);
+ this.addStatistics(true, body.length, event);
} catch (Throwable ex) {
logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
+ this.addStatistics(false, body.length, event);
throw new ChannelException(
"Process Controller Event error can't write event to channel.");
}
@@ -677,31 +664,19 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
}
/**
- * addMetric
+ * add statistics information
*
- * @param result
- * @param size
- * @param event
+ * @param isSuccess success or failure
+ * @param size message size
+ * @param event message event
*/
- private void addMetric(boolean result, long size, Event event) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
- dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
- dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName());
- DataProxyMetricItem.fillInlongId(event, dimensions);
- DataProxyMetricItem.fillAuditFormatTime(event, dimensions);
- DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
- if (result) {
- metricItem.readSuccessCount.incrementAndGet();
- metricItem.readSuccessSize.addAndGet(size);
- try {
- AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
- } catch (Exception e) {
- logger.error("add metric has exception e= {}", e);
- }
- } else {
- metricItem.readFailCount.incrementAndGet();
- metricItem.readFailSize.addAndGet(size);
+ private void addStatistics(boolean isSuccess, long size, Event event) {
+ if (event == null) {
+ return;
+ }
+ this.metricItemSet.fillSrcMetricItemsByEvent(event, isSuccess, size);
+ if (isSuccess) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_READ_SUCCESS, event);
}
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 1e2d54405..b101b9651 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -48,7 +48,6 @@ import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
-import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -87,7 +86,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
= DateTimeFormatter.ofPattern("yyyyMMddHHmm");
private static final ZoneId defZoneId = ZoneId.systemDefault();
- private AbstractSource source;
+ private BaseSource source;
private final ChannelGroup allChannels;
private int maxConnections = Integer.MAX_VALUE;
private boolean filterEmptyMsg = false;
@@ -112,7 +111,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
* @param isCompressed
* @param protocolType
*/
- public SimpleMessageHandler(AbstractSource source, ServiceDecoder serProcessor,
+ public SimpleMessageHandler(BaseSource source, ServiceDecoder serProcessor,
ChannelGroup allChannels,
String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength,
Integer maxCons,
@@ -130,11 +129,7 @@ public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
this.isCompressed = isCompressed;
this.maxConnections = maxCons;
this.protocolType = protocolType;
- if (source instanceof SimpleTcpSource) {
- this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet();
- } else {
- this.metricItemSet = new DataProxyMetricItemSet(this.toString());
- }
+ this.metricItemSet = source.getMetricItemSet();
}
private String getRemoteIp(Channel channel) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 1f1a86e66..734fd95dd 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -39,9 +39,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
-import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.utils.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,8 +81,6 @@ public class SimpleTcpSource extends BaseSource
private ServerBootstrap bootstrap;
- private DataProxyMetricItemSet metricItemSet;
-
public SimpleTcpSource() {
super();
@@ -182,8 +178,6 @@ public class SimpleTcpSource extends BaseSource
@Override
public synchronized void startSource() {
logger.info("start " + this.getName());
- this.metricItemSet = new DataProxyMetricItemSet(this.getName());
- MetricRegister.register(metricItemSet);
checkBlackListThread = new CheckBlackListThread();
checkBlackListThread.start();
// ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
@@ -270,14 +264,6 @@ public class SimpleTcpSource extends BaseSource
}
}
- /**
- * get metricItemSet
- * @return the metricItemSet
- */
- public DataProxyMetricItemSet getMetricItemSet() {
- return metricItemSet;
- }
-
@Override
public String getProtocolName() {
return "tcp";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index 5c17451d3..be42b4494 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -50,6 +50,7 @@ public class SimpleUdpSource
@Override
public void startSource() {
// setup Netty server
+ logger.info("start " + this.getName());
bootstrap = new Bootstrap();
logger.info("Set max workers : {} ;",maxThreads);
bootstrap.channel(NioDatagramChannel.class);