You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/21 07:04:47 UTC

[incubator-inlong] branch master updated: [INLONG-2224][DataProxy] Source receive one message will be send to pulsar twice when config both memery channel and file channel (#2257)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9301823   [INLONG-2224][DataProxy] Source receive one message will be send to pulsar twice when config both memery channel and file channel (#2257)
9301823 is described below

commit 9301823501c3baff7eaa884f3a96ca586ec40919
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Jan 21 15:04:39 2022 +0800

     [INLONG-2224][DataProxy] Source receive one message will be send to pulsar twice when config both memery channel and file channel (#2257)
---
 ...o.conf => flume-mulit-pulsar-http-example.conf} | 56 ++++++++++++----------
 ...mo.conf => flume-mulit-pulsar-tcp-example.conf} | 29 ++++++-----
 ...mo.conf => flume-mulit-pulsar-udp-example.conf} | 55 +++++++++++----------
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 25 +++++-----
 .../dataproxy/sink/pulsar/PulsarClientService.java |  7 ++-
 .../dataproxy/sink/pulsar/SendMessageCallBack.java |  4 +-
 .../apache/inlong/dataproxy/source/BaseSource.java | 13 ++++-
 .../inlong/dataproxy/source/SimpleTcpSource.java   |  2 -
 8 files changed, 106 insertions(+), 85 deletions(-)

diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
similarity index 77%
copy from inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
copy to inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
index 599b97a..09ce407 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-http-example.conf
@@ -17,30 +17,35 @@
 # under the License.
 #
 
-agent1.sources = tcp-source
+#
+# Attention:
+# use this example, you must rename this file name to flume.conf,
+# and replace the file with the same name in conf directory.
+#
+
+agent1.sources = http-source
 agent1.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
 agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2 pulsar-sink-msg5 pulsar-sink-msg6
 
-
-agent1.sources.tcp-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
-agent1.sources.tcp-source.type = org.apache.inlong.dataproxy.source.SimpleTcpSource
-agent1.sources.tcp-source.msg-factory-name = org.apache.inlong.dataproxy.source.ServerMessageFactory
-agent1.sources.tcp-source.host = 0.0.0.0
-agent1.sources.tcp-source.port = 46801
-agent1.sources.tcp-source.max-msg-length = 524288
-agent1.sources.tcp-source.topic = test1
-agent1.sources.tcp-source.attr = m=9
-agent1.sources.tcp-source.connections = 30000
-agent1.sources.tcp-source.max-threads = 64
-agent1.sources.tcp-source.receiveBufferSize = 1048576
-agent1.sources.tcp-source.sendBufferSize = 1048576
-agent1.sources.tcp-source.custom-cp = true
-agent1.sources.tcp-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
-agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.tcp-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.tcp-source.metric-agent-port=8003
-agent1.sources.tcp-source.metric-cache-size=1000000
-agent1.sources.tcp-source.set=10
+agent1.sources.http-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
+agent1.sources.http-source.type = org.apache.inlong.dataproxy.http.SimpleHttpSource
+agent1.sources.http-source.message-handler-name = org.apache.inlong.dataproxy.http.SimpleMessageHandler
+agent1.sources.http-source.host = 0.0.0.0
+agent1.sources.http-source.port = 46802
+agent1.sources.http-source.max-msg-length = 524288
+agent1.sources.http-source.topic = test1
+agent1.sources.http-source.attr = m=9
+agent1.sources.http-source.connections = 30000
+agent1.sources.http-source.max-threads = 64
+agent1.sources.http-source.receiveBufferSize = 1048576
+agent1.sources.http-source.sendBufferSize = 1048576
+agent1.sources.http-source.custom-cp = true
+agent1.sources.http-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
+agent1.sources.http-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.http-source.metric-recovery-path=/data/DataProxy/file/recovery
+agent1.sources.http-source.metric-agent-port=8003
+agent1.sources.http-source.metric-cache-size=1000000
+agent1.sources.http-source.set=10
 
 agent1.channels.ch-msg1.type = memory
 agent1.channels.ch-msg1.capacity = 10000
@@ -70,10 +75,9 @@ agent1.channels.ch-msg6.dataDirs = /data/DataProxy/file/ch-msg6/data
 agent1.channels.ch-msg6.fsyncPerTransaction = false
 agent1.channels.ch-msg6.fsyncInterval = 10
 
-
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,7 +89,7 @@ agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -97,7 +101,7 @@ agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -109,7 +113,7 @@ agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg6.thread-num = 8
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
similarity index 89%
copy from inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
copy to inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
index 599b97a..3658f6c 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-tcp-example.conf
@@ -17,6 +17,12 @@
 # under the License.
 #
 
+#
+# Attention:
+# use this example, you must rename this file name to flume.conf,
+# and replace the file with the same name in conf directory.
+#
+
 agent1.sources = tcp-source
 agent1.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
 agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2 pulsar-sink-msg5 pulsar-sink-msg6
@@ -37,10 +43,10 @@ agent1.sources.tcp-source.sendBufferSize = 1048576
 agent1.sources.tcp-source.custom-cp = true
 agent1.sources.tcp-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
 agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.tcp-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.tcp-source.metric-agent-port=8003
-agent1.sources.tcp-source.metric-cache-size=1000000
-agent1.sources.tcp-source.set=10
+agent1.sources.tcp-source.metric-recovery-path = /data/DataProxy/file/recovery
+agent1.sources.tcp-source.metric-agent-port = 8003
+agent1.sources.tcp-source.metric-cache-size = 1000000
+agent1.sources.tcp-source.set = 10
 
 agent1.channels.ch-msg1.type = memory
 agent1.channels.ch-msg1.capacity = 10000
@@ -70,10 +76,9 @@ agent1.channels.ch-msg6.dataDirs = /data/DataProxy/file/ch-msg6/data
 agent1.channels.ch-msg6.fsyncPerTransaction = false
 agent1.channels.ch-msg6.fsyncInterval = 10
 
-
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,7 +90,7 @@ agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -93,11 +98,11 @@ agent1.sinks.pulsar-sink-msg2.client-id-cache = true
 agent1.sinks.pulsar-sink-msg2.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg2.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg2.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -105,11 +110,11 @@ agent1.sinks.pulsar-sink-msg5.client-id-cache = true
 agent1.sinks.pulsar-sink-msg5.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg5.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg5.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
+agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec = 20000000
 
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg6.thread-num = 8
@@ -117,4 +122,4 @@ agent1.sinks.pulsar-sink-msg6.client-id-cache = true
 agent1.sinks.pulsar-sink-msg6.max_survived_time = 300000
 agent1.sinks.pulsar-sink-msg6.max_survived_size = 3000000
 agent1.sinks.pulsar-sink-msg6.netty_write_buffer_high_water_mark = 20971520
-agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec=20000000
\ No newline at end of file
+agent1.sinks.pulsar-sink-msg6.disk-io-rate-per-sec = 20000000
\ No newline at end of file
diff --git a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
similarity index 78%
rename from inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
rename to inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
index 599b97a..c2c688d 100644
--- a/inlong-dataproxy/conf/flume-mulit-pulsar-demo.conf
+++ b/inlong-dataproxy/conf/flume-mulit-pulsar-udp-example.conf
@@ -17,30 +17,35 @@
 # under the License.
 #
 
-agent1.sources = tcp-source
+#
+# Attention:
+# use this example, you must rename this file name to flume.conf,
+# and replace the file with the same name in conf directory.
+#
+
+agent1.sources = udp-source
 agent1.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
 agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2 pulsar-sink-msg5 pulsar-sink-msg6
 
-
-agent1.sources.tcp-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
-agent1.sources.tcp-source.type = org.apache.inlong.dataproxy.source.SimpleTcpSource
-agent1.sources.tcp-source.msg-factory-name = org.apache.inlong.dataproxy.source.ServerMessageFactory
-agent1.sources.tcp-source.host = 0.0.0.0
-agent1.sources.tcp-source.port = 46801
-agent1.sources.tcp-source.max-msg-length = 524288
-agent1.sources.tcp-source.topic = test1
-agent1.sources.tcp-source.attr = m=9
-agent1.sources.tcp-source.connections = 30000
-agent1.sources.tcp-source.max-threads = 64
-agent1.sources.tcp-source.receiveBufferSize = 1048576
-agent1.sources.tcp-source.sendBufferSize = 1048576
-agent1.sources.tcp-source.custom-cp = true
-agent1.sources.tcp-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
-agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
-agent1.sources.tcp-source.metric-recovery-path=/data/DataProxy/file/recovery
-agent1.sources.tcp-source.metric-agent-port=8003
-agent1.sources.tcp-source.metric-cache-size=1000000
-agent1.sources.tcp-source.set=10
+agent1.sources.upd-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6
+agent1.sources.upd-source.type = org.apache.inlong.dataproxy.source.SimpleUdpSource
+agent1.sources.upd-source.msg-factory-name = org.apache.inlong.dataproxy.source.ServerMessageFactory
+agent1.sources.upd-source.host = 0.0.0.0
+agent1.sources.upd-source.port = 46803
+agent1.sources.upd-source.max-msg-length = 524288
+agent1.sources.upd-source.topic = test1
+agent1.sources.upd-source.attr = m=9
+agent1.sources.upd-source.connections = 30000
+agent1.sources.upd-source.max-threads = 64
+agent1.sources.upd-source.receiveBufferSize = 1048576
+agent1.sources.upd-source.sendBufferSize = 1048576
+agent1.sources.upd-source.custom-cp = true
+agent1.sources.upd-source.selector.type = org.apache.inlong.dataproxy.channel.FailoverChannelSelector
+agent1.sources.upd-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.upd-source.metric-recovery-path=/data/DataProxy/file/recovery
+agent1.sources.upd-source.metric-agent-port=8003
+agent1.sources.upd-source.metric-cache-size=1000000
+agent1.sources.upd-source.set=10
 
 agent1.channels.ch-msg1.type = memory
 agent1.channels.ch-msg1.capacity = 10000
@@ -73,7 +78,7 @@ agent1.channels.ch-msg6.fsyncInterval = 10
 
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg1.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg1.thread-num = 8
@@ -85,7 +90,7 @@ agent1.sinks.pulsar-sink-msg1.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg2.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg2.thread-num = 8
@@ -97,7 +102,7 @@ agent1.sinks.pulsar-sink-msg2.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg5.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg5.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg5.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg5.thread-num = 8
@@ -109,7 +114,7 @@ agent1.sinks.pulsar-sink-msg5.disk-io-rate-per-sec=20000000
 
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://PULSAR_LIST
+agent1.sinks.pulsar-sink-msg6.pulsar_server_url_list = pulsar://127.0.0.1:6650
 agent1.sinks.pulsar-sink-msg6.send_timeout_MILL = 30000
 agent1.sinks.pulsar-sink-msg6.stat_interval_sec = 60
 agent1.sinks.pulsar-sink-msg6.thread-num = 8
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 9f92131..e846265 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -310,9 +310,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
              * switch for lots of metrics
              */
             if (isNewMetricOn) {
-                monitorIndex = new MonitorIndex("Sink", statIntervalSec, maxMonitorCnt);
+                monitorIndex = new MonitorIndex("Pulsar_Sink", statIntervalSec, maxMonitorCnt);
             }
-            monitorIndexExt = new MonitorIndexExt("TDBus_monitors#" + this.getName(),
+            monitorIndexExt = new MonitorIndexExt("Data_proxy_monitors#" + this.getName(),
                     statIntervalSec, maxMonitorCnt);
         }
 
@@ -323,7 +323,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
         try {
             initTopicSet(new HashSet<String>(topicProperties.values()));
         } catch (Exception e) {
-            logger.info("meta sink start publish topic fail.",e);
+            logger.info("pulsar sink start publish topic fail.",e);
         }
 
         for (int i = 0; i < sinkThreadPool.length; i++) {
@@ -332,7 +332,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     + i);
             sinkThreadPool[i].start();
         }
-        logger.debug("meta sink started");
+        logger.debug("pulsar sink started");
     }
 
     @Override
@@ -354,7 +354,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
             try {
                 monitorIndex.shutDown();
             } catch (Exception e) {
-                logger.warn("statrunner interrupted");
+                logger.warn("stat runner interrupted");
             }
         }
         if (sinkThreadPool != null) {
@@ -406,7 +406,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
             try {
                 tx.rollback();
             } catch (Throwable e) {
-                logger.error("metasink transaction rollback exception",e);
+                logger.error("pulsar sink transaction rollback exception",e);
 
             }
         } finally {
@@ -463,7 +463,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     }
 
                     /*
-                     * SINK_INTF#metasink1#topic#streamId#clientIp#busIP#pkgTime#successCnt#packcnt
+                     * SINK_INTF#pulsarsink1#topic#streamId#clientIp#busIP#pkgTime#successCnt
+                     * #packcnt
                      * #packsize#failCnt
                      */
                     StringBuilder newbase = new StringBuilder();
@@ -505,7 +506,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
     }
 
     @Override
-    public void handleMessageSendSuccess(Object result,  EventStat eventStat) {
+    public void handleMessageSendSuccess(String topic, Object result,  EventStat eventStat) {
         /*
          * Statistics pulsar performance
          */
@@ -522,19 +523,19 @@ public class PulsarSink extends AbstractSink implements Configurable,
         if (nowCnt % logEveryNEvents == 0 && nowCnt != lastSuccessSendCnt.get()) {
             lastSuccessSendCnt.set(nowCnt);
             t2 = System.currentTimeMillis();
-            logger.info("metasink {}, succ put {} events to pulsar,"
+            logger.info("Pulsar sink {}, succ put {} events to pulsar,"
                     + " in the past {} millsec", new Object[] {
                     getName(), (nowCnt - oldCnt), (t2 - t1)
             });
             t1 = t2;
         }
-        monitorIndexExt.incrementAndGet("METASINK_SUCCESS");
+        monitorIndexExt.incrementAndGet("PULSAR_SINK_SUCCESS");
         editStatistic(eventStat.getEvent(), null, result.toString());
     }
 
     @Override
-    public void handleMessageSendException(EventStat eventStat,  Object e) {
-        monitorIndexExt.incrementAndGet("METASINK_EXP");
+    public void handleMessageSendException(String topic, EventStat eventStat,  Object e) {
+        monitorIndexExt.incrementAndGet("PULSAR_SINK_EXP");
         if (e instanceof TooLongFrameException) {
             PulsarSink.this.overflow = true;
         } else if (e instanceof ProducerQueueIsFullError) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 02dbfca..83b57a0 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -166,7 +166,7 @@ public class PulsarClientService {
         }
 
         Map<String, String> proMap = new HashMap<>();
-        proMap.put("tdbusip", localIp);
+        proMap.put("data_proxy_ip", localIp);
         String streamId = "";
         if (event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
             streamId = event.getHeaders().get(AttributeConstants.INTERFACE_ID);
@@ -179,11 +179,11 @@ public class PulsarClientService {
         forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
                 .sendAsync().thenAccept((msgId) -> {
             forCallBackP.setCanUseSend(true);
-            sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl)msgId, es);
+            sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl)msgId, es);
 
         }).exceptionally((e) -> {
             forCallBackP.setCanUseSend(false);
-            sendMessageCallBack.handleMessageSendException(es, e);
+            sendMessageCallBack.handleMessageSendException(topic, es, e);
             return null;
         });
         return true;
@@ -235,7 +235,6 @@ public class PulsarClientService {
     }
 
     public List<TopicProducerInfo> initTopicProducer(String topic) {
-        logger.info("initTopicProducer topic = {}", topic);
         List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
             List<TopicProducerInfo> newList = new ArrayList<>();
             if (pulsarClients != null) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
index 66323e7..ce02975 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/SendMessageCallBack.java
@@ -21,7 +21,7 @@ import org.apache.inlong.dataproxy.sink.EventStat;
 
 public interface SendMessageCallBack {
 
-    void handleMessageSendSuccess(Object msgId, EventStat es);
+    void handleMessageSendSuccess(String topic, Object msgId, EventStat es);
 
-    void handleMessageSendException(EventStat es, Object exception);
+    void handleMessageSendException(String topic, EventStat es, Object exception);
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 5ac6121..9e0c651 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -20,15 +20,18 @@ package org.apache.inlong.dataproxy.source;
 import com.google.common.base.Preconditions;
 import java.lang.reflect.Constructor;
 import org.apache.commons.lang.StringUtils;
+import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.commons.monitor.MonitorIndex;
 import org.apache.inlong.commons.monitor.MonitorIndexExt;
+import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
 import org.jboss.netty.bootstrap.Bootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -122,6 +125,13 @@ public abstract class BaseSource
 
   @Override
   public synchronized void start() {
+    if (customProcessor) {
+      ChannelSelector selector = getChannelProcessor().getSelector();
+      FailoverChannelProcessor newProcessor = new FailoverChannelProcessor(selector);
+      newProcessor.configure(this.context);
+      setChannelProcessor(newProcessor);
+      FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
+    }
     super.start();
     /*
      * init monitor logic
@@ -166,6 +176,7 @@ public abstract class BaseSource
 
   @Override
   public void configure(Context context) {
+
     this.context = context;
 
     port = context.getInteger(ConfigConstants.CONFIG_PORT);
@@ -215,8 +226,6 @@ public abstract class BaseSource
     attr = attr.trim();
     Preconditions.checkArgument(!attr.isEmpty(), "attr is empty");
 
-    filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
-
     statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, INTERVAL_SEC);
     Preconditions.checkArgument((statIntervalSec >= STAT_INTERVAL_MUST_THAN), "statIntervalSec must be >= 0");
 
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 60db247..d861c8c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -100,7 +100,6 @@ public class SimpleTcpSource extends BaseSource
 
     protected String topic;
 
-    //
     private DataProxyMetricItemSet metricItemSet;
 
     public SimpleTcpSource() {
@@ -204,7 +203,6 @@ public class SimpleTcpSource extends BaseSource
         MetricRegister.register(metricItemSet);
         checkBlackListThread = new CheckBlackListThread();
         checkBlackListThread.start();
-
         ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
         ChannelFactory factory =
                 new NioServerSocketChannelFactory(