You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/21 07:04:47 UTC
[incubator-inlong] branch master updated: [INLONG-2224][DataProxy] Source receive one message will be send to pulsar twice when config both memery channel and file channel (#2257)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9301823 [INLONG-2224][DataProxy] Source receive one message will be send to pulsar twice when config both memery channel and file channel (#2257)
9301823 is described below
commit 9301823501c3baff7eaa884f3a96ca586ec40919
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Jan 21 15:04:39 2022 +0800
[INLONG-2224][DataProxy] Source receive one message will be send to pulsar twice when config both memery channel and file channel (#2257)
---
...o.conf => flume-mulit-pulsar-http-example.conf} | 56 ++++++++++++----------
...mo.conf => flume-mulit-pulsar-tcp-example.conf} | 29 ++++++-----
...mo.conf => flume-mulit-pulsar-udp-example.conf} | 55 +++++++++++----------
.../apache/inlong/dataproxy/sink/PulsarSink.java | 25 +++++-----
.../dataproxy/sink/pulsar/PulsarClientService.java | 7 ++-
.../dataproxy/sink/pulsar/SendMessageCallBack.java | 4 +-
.../apache/inlong/dataproxy/source/BaseSource.java | 13 ++++-
.../inlong/dataproxy/source/SimpleTcpSource.java | 2 -
8 files changed, 106 insertions(+), 85 deletions(-)
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
similarity index 77%
copy from inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
copy to inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
index 599b97a..09ce407 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
@@ -17,30 +17,35 @@
# under the License.
#
-agent1.sources = tcp-source
+#
+# Attention:
+# use this example, you must rename this file name to flume.conf,
+# and replace the file with the same name in conf directory.
+#
+
+agent1.sources = http-source
agent1.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2 pulsar-sink-msg5 pulsar-sink-msg6
-
-agent1.sources.tcp-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
-agent1.sources.tcp-source.type = org.apache.inlong.dataproxy.source.SimpleTcpSource
-agent1.sources.tcp-source.msg-factory-name = org.apache.inlong.dataproxy.source.ServerMessageFactory
-agent1.sources.tcp-source.host = 0.0.0.0
-agent1.sources.tcp-source.port = 46801
-agent1.sources.tcp-source.max-msg-length = 524288
-agent1.sources.tcp-source.topic = test1
-agent1.sources.tcp-source.attr = m=9
-agent1.sources.tcp-source.connections = 30000
-agent1.sources.tcp-source.max-threads = 64
-agent1.sources.tcp-source.receiveBufferSize = 1048576
-agent1.sources.tcp-source.sendBufferSize = 1048576
-agent1.sources.tcp-source.custom-cp = true
-agent1.sources.tcp-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
-agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.tcp-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.tcp-source.metric-agent-port=8003
-agent1.sources.tcp-source.metric-cache-size=1000000
-agent1.sources.tcp-source.set=10
+agent1.sources.http-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
+agent1.sources.http-source.type = org.apache.inlong.dataproxy.http.SimpleHttpSource
+agent1.sources.http-source.message-handler-name = org.apache.inlong.dataproxy.http.SimpleMessageHandler
+agent1.sources.http-source.host = 0.0.0.0
+agent1.sources.http-source.port = 46802
+agent1.sources.http-source.max-msg-length = 524288
+agent1.sources.http-source.topic = test1
+agent1.sources.http-source.attr = m=9
+agent1.sources.http-source.connections = 30000
+agent1.sources.http-source.max-threads = 64
+agent1.sources.http-source.receiveBufferSize = 1048576
+agent1.sources.http-source.sendBufferSize = 1048576
+agent1.sources.http-source.custom-cp = true
+agent1.sources.http-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
+agent1.sources.http-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.http-source.metric-recovery-path=/data/DataProxy/file/recovery
+agent1.sources.http-source.metric-agent-port=8003
+agent1.sources.http-source.metric-cache-size=1000000
+agent1.sources.http-source.set=10
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 10000
@@ -70,10 +75,9 @@ agent1.channels.ch-msg6.dataDirs = /data/DataProxy/file/ch-msg6/data
agent1.channels.ch-msg6.fsyncPerTransaction = false
agent1.channels.ch-msg6.fsyncInterval = 10
-
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,7 +89,7 @@ agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -97,7 +101,7 @@ agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -109,7 +113,7 @@ agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg6.thread-num = 8
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
similarity index 89%
copy from inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
copy to inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
index 599b97a..3658f6c 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
@@ -17,6 +17,12 @@
# under the License.
#
+#
+# Attention:
+# use this example, you must rename this file name to flume.conf,
+# and replace the file with the same name in conf directory.
+#
+
agent1.sources = tcp-source
agent1.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2 pulsar-sink-msg5 pulsar-sink-msg6
@@ -37,10 +43,10 @@ agent1.sources.tcp-source.sendBufferSize = 1048576
agent1.sources.tcp-source.custom-cp = true
agent1.sources.tcp-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.tcp-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.tcp-source.metric-agent-port=8003
-agent1.sources.tcp-source.metric-cache-size=1000000
-agent1.sources.tcp-source.set=10
+agent1.sources.tcp-source.metric-recovery-path = /data/DataProxy/file/recovery
+agent1.sources.tcp-source.metric-agent-port = 8003
+agent1.sources.tcp-source.metric-cache-size = 1000000
+agent1.sources.tcp-source.set = 10
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 10000
@@ -70,10 +76,9 @@ agent1.channels.ch-msg6.dataDirs = /data/DataProxy/file/ch-msg6/data
agent1.channels.ch-msg6.fsyncPerTransaction = false
agent1.channels.ch-msg6.fsyncInterval = 10
-
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,7 +90,7 @@ agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -93,11 +98,11 @@ agent1.sinks.pulsar-sink-msg2.client-id-cache = true
agent1.sinks.pulsar-sink-msg2.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg2.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg2.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -105,11 +110,11 @@ agent1.sinks.pulsar-sink-msg5.client-id-cache = true
agent1.sinks.pulsar-sink-msg5.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg5.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg5.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg6.thread-num = 8
@@ -117,4 +122,4 @@ agent1.sinks.pulsar-sink-msg6.client-id-cache = true
agent1.sinks.pulsar-sink-msg6.max_survived_time = 300000
agent1.sinks.pulsar-sink-msg6.max_survived_size = 3000000
agent1.sinks.pulsar-sink-msg6.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec=20000000
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec = 20000000
\ No newline at end of file
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
similarity index 78%
rename from inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
rename to inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
index 599b97a..c2c688d 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
@@ -17,30 +17,35 @@
# under the License.
#
-agent1.sources = tcp-source
+#
+# Attention:
+# use this example, you must rename this file name to flume.conf,
+# and replace the file with the same name in conf directory.
+#
+
+agent1.sources = udp-source
agent1.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2 pulsar-sink-msg5 pulsar-sink-msg6
-
-agent1.sources.tcp-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
-agent1.sources.tcp-source.type = org.apache.inlong.dataproxy.source.SimpleTcpSource
-agent1.sources.tcp-source.msg-factory-name = org.apache.inlong.dataproxy.source.ServerMessageFactory
-agent1.sources.tcp-source.host = 0.0.0.0
-agent1.sources.tcp-source.port = 46801
-agent1.sources.tcp-source.max-msg-length = 524288
-agent1.sources.tcp-source.topic = test1
-agent1.sources.tcp-source.attr = m=9
-agent1.sources.tcp-source.connections = 30000
-agent1.sources.tcp-source.max-threads = 64
-agent1.sources.tcp-source.receiveBufferSize = 1048576
-agent1.sources.tcp-source.sendBufferSize = 1048576
-agent1.sources.tcp-source.custom-cp = true
-agent1.sources.tcp-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
-agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.tcp-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.tcp-source.metric-agent-port=8003
-agent1.sources.tcp-source.metric-cache-size=1000000
-agent1.sources.tcp-source.set=10
+agent1.sources.upd-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
+agent1.sources.upd-source.type = org.apache.inlong.dataproxy.source.SimpleUdpSource
+agent1.sources.upd-source.msg-factory-name = org.apache.inlong.dataproxy.source.ServerMessageFactory
+agent1.sources.upd-source.host = 0.0.0.0
+agent1.sources.upd-source.port = 46803
+agent1.sources.upd-source.max-msg-length = 524288
+agent1.sources.upd-source.topic = test1
+agent1.sources.upd-source.attr = m=9
+agent1.sources.upd-source.connections = 30000
+agent1.sources.upd-source.max-threads = 64
+agent1.sources.upd-source.receiveBufferSize = 1048576
+agent1.sources.upd-source.sendBufferSize = 1048576
+agent1.sources.upd-source.custom-cp = true
+agent1.sources.upd-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
+agent1.sources.upd-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.upd-source.metric-recovery-path=/data/DataProxy/file/recovery
+agent1.sources.upd-source.metric-agent-port=8003
+agent1.sources.upd-source.metric-cache-size=1000000
+agent1.sources.upd-source.set=10
agent1.channels.ch-msg1.type = memory
agent1.channels.ch-msg1.capacity = 10000
@@ -73,7 +78,7 @@ agent1.channels.ch-msg6.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,7 +90,7 @@ agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -97,7 +102,7 @@ agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -109,7 +114,7 @@ agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
agent1.sinks.pulsar-sink-msg6.thread-num = 8
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 9f92131..e846265 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -310,9 +310,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
* switch for lots of metrics
*/
if (isNewMetricOn) {
- monitorIndex = new MonitorIndex("Sink", statIntervalSec, maxMonitorCnt);
+ monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, maxMonitorCnt);
}
- monitorIndexExt = new MonitorIndexExt("TDBus_monitors#" + this.getName(),
+ monitorIndexExt = new MonitorIndexExt("Data_proxy_monitors#" + this.getName(),
statIntervalSec, maxMonitorCnt);
}
@@ -323,7 +323,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
try {
initTopicSet(new HashSet<String>(topicProperties.values()));
} catch (Exception e) {
- logger.info("meta sink start publish topic fail.",e);
+ logger.info("pulsar sink start publish topic fail.",e);
}
for (int i = 0; i < sinkThreadPool.length; i++) {
@@ -332,7 +332,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
+ i);
sinkThreadPool[i].start();
}
- logger.debug("meta sink started");
+ logger.debug("pulsar sink started");
}
@Override
@@ -354,7 +354,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
try {
monitorIndex.shutDown();
} catch (Exception e) {
- logger.warn("statrunner interrupted");
+ logger.warn("stat runner interrupted");
}
}
if (sinkThreadPool != null) {
@@ -406,7 +406,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
try {
tx.rollback();
} catch (Throwable e) {
- logger.error("metasink transaction rollback exception",e);
+ logger.error("pulsar sink transaction rollback exception",e);
}
} finally {
@@ -463,7 +463,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
/*
- * SINK_INTF#metasink1#topic#streamId#clientIp#busIP#pkgTime#successCnt#packcnt
+ * SINK_INTF#pulsarsink1#topic#streamId#clientIp#busIP#pkgTime#successCnt
+ * #packcnt
* #packsize#failCnt
*/
StringBuilder newbase = new StringBuilder();
@@ -505,7 +506,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
@Override
- public void handleMessageSendSuccess(Object result, EventStat eventStat) {
+ public void handleMessageSendSuccess(String topic, Object result, EventStat eventStat) {
/*
* Statistics pulsar performance
*/
@@ -522,19 +523,19 @@ public class PulsarSink extends AbstractSink implements Configurable,
if (nowCnt % logEveryNEvents == 0 && nowCnt != lastSuccessSendCnt.get()) {
lastSuccessSendCnt.set(nowCnt);
t2 = System.currentTimeMillis();
- logger.info("metasink {}, succ put {} events to pulsar,"
+ logger.info("Pulsar sink {}, succ put {} events to pulsar,"
+ " in the past {} millsec", new Object[] {
getName(), (nowCnt - oldCnt), (t2 - t1)
});
t1 = t2;
}
- monitorIndexExt.incrementAndGet("METASINK_SUCCESS");
+ monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
editStatistic(eventStat.getEvent(), null, result.toString());
}
@Override
- public void handleMessageSendException(EventStat eventStat, Object e) {
- monitorIndexExt.incrementAndGet("METASINK_EXP");
+ public void handleMessageSendException(String topic, EventStat eventStat, Object e) {
+ monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
if (e instanceof TooLongFrameException) {
PulsarSink.this.overflow = true;
} else if (e instanceof ProducerQueueIsFullError) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 02dbfca..83b57a0 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -166,7 +166,7 @@ public class PulsarClientService {
}
Map<String, String> proMap = new HashMap<>();
- proMap.put("tdbusip", localIp);
+ proMap.put("data_proxy_ip", localIp);
String streamId = "";
if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
@@ -179,11 +179,11 @@ public class PulsarClientService {
forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
.sendAsync().thenAccept((msgId) -> {
forCallBackP.setCanUseSend(true);
- sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl)msgId, es);
+ sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl)msgId, es);
}).exceptionally((e) -> {
forCallBackP.setCanUseSend(false);
- sendMessageCallBack.handleMessageSendException(es, e);
+ sendMessageCallBack.handleMessageSendException(topic, es, e);
return null;
});
return true;
@@ -235,7 +235,6 @@ public class PulsarClientService {
}
public List<TopicProducerInfo> initTopicProducer(String topic) {
- logger.info("initTopicProducer topic = {}", topic);
List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
List<TopicProducerInfo> newList = new ArrayList<>();
if (pulsarClients != null) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
index 66323e7..ce02975 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
@@ -21,7 +21,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
public interface SendMessageCallBack {
- void handleMessageSendSuccess(Object msgId, EventStat es);
+ void handleMessageSendSuccess(String topic, Object msgId, EventStat es);
- void handleMessageSendException(EventStat es, Object exception);
+ void handleMessageSendException(String topic, EventStat es, Object exception);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 5ac6121..9e0c651 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -20,15 +20,18 @@ package org.apache.inlong.dataproxy.source;
import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import org.apache.commons.lang.StringUtils;
+import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
+import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
import org.jboss.netty.bootstrap.Bootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -122,6 +125,13 @@ public abstract class BaseSource
@Override
public synchronized void start() {
+ if (customProcessor) {
+ ChannelSelector selector = getChannelProcessor().getSelector();
+ FailoverChannelProcessor newProcessor = new FailoverChannelProcessor(selector);
+ newProcessor.configure(this.context);
+ setChannelProcessor(newProcessor);
+ FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
+ }
super.start();
/*
* init monitor logic
@@ -166,6 +176,7 @@ public abstract class BaseSource
@Override
public void configure(Context context) {
+
this.context = context;
port = context.getInteger(ConfigConstants.CONFIG_PORT);
@@ -215,8 +226,6 @@ public abstract class BaseSource
attr = attr.trim();
Preconditions.checkArgument(!attr.isEmpty(), "attr is empty");
- filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
-
statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, INTERVAL_SEC);
Preconditions.checkArgument((statIntervalSec >= STAT_INTERVAL_MUST_THAN), "statIntervalSec must be >= 0");
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 60db247..d861c8c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -100,7 +100,6 @@ public class SimpleTcpSource extends BaseSource
protected String topic;
- //
private DataProxyMetricItemSet metricItemSet;
public SimpleTcpSource() {
@@ -204,7 +203,6 @@ public class SimpleTcpSource extends BaseSource
MetricRegister.register(metricItemSet);
checkBlackListThread = new CheckBlackListThread();
checkBlackListThread.start();
-
ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
ChannelFactory factory =
new NioServerSocketChannelFactory(