You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/19 12:45:56 UTC
[incubator-inlong] branch master updated: [INLONG-2491][Dataproxy] update dataproxy netty version to 4.1.72.Final and log4j to log4j2 (#2599)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5f4a259 [INLONG-2491][Dataproxy] update dataproxy netty version to 4.1.72.Final and log4j to log4j2 (#2599)
5f4a259 is described below
commit 5f4a2590ec2d5fdb43f08425ed54343c33df8000
Author: baomingyu <ba...@163.com>
AuthorDate: Sat Feb 19 20:45:51 2022 +0800
[INLONG-2491][Dataproxy] update dataproxy netty version to 4.1.72.Final and log4j to log4j2 (#2599)
---
inlong-dataproxy/bin/dataproxy-start.sh | 2 +-
inlong-dataproxy/bin/flume-ng | 24 +-
inlong-dataproxy/conf/log4j.properties | 119 ----------
inlong-dataproxy/conf/log4j2.xml | 114 +++++++++
inlong-dataproxy/dataproxy-source/pom.xml | 12 +-
.../inlong/dataproxy/config/pojo/PulsarConfig.java | 28 ++-
.../inlong/dataproxy/consts/ConfigConstants.java | 2 +
.../apache/inlong/dataproxy/sink/PulsarSink.java | 63 +++--
.../dataproxy/sink/pulsar/PulsarClientService.java | 30 +--
.../apache/inlong/dataproxy/source/BaseSource.java | 85 +++++--
.../dataproxy/source/DefaultServiceDecoder.java | 40 ++--
.../dataproxy/source/ServerMessageFactory.java | 63 ++---
.../dataproxy/source/ServerMessageHandler.java | 256 ++++++++++-----------
.../inlong/dataproxy/source/ServiceDecoder.java | 11 +-
.../dataproxy/source/SimpleMessageHandler.java | 234 ++++++++++---------
.../inlong/dataproxy/source/SimpleTcpSource.java | 94 ++++----
.../inlong/dataproxy/source/SimpleUdpSource.java | 37 ++-
.../inlong/dataproxy/utils/EventLoopUtil.java | 108 +++++++++
inlong-dataproxy/pom.xml | 58 ++++-
19 files changed, 791 insertions(+), 589 deletions(-)
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh
index 94c780c..f2b1188 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -28,4 +28,4 @@ do
done
cd .. || exit
-nohup bin/flume-ng agent --conf conf/ -f conf/flume.conf -n agent1 --no-reload-conf > dataproxy.log 2>&1 &
\ No newline at end of file
+nohup bin/flume-ng agent --conf conf/ -f conf/flume.conf -n agent1 --no-reload-conf > /dev/null 2>&1 &
\ No newline at end of file
diff --git a/inlong-dataproxy/bin/flume-ng b/inlong-dataproxy/bin/flume-ng
index e0d2e30..8923353 100755
--- a/inlong-dataproxy/bin/flume-ng
+++ b/inlong-dataproxy/bin/flume-ng
@@ -226,7 +226,29 @@ run_flume() {
# set default params
FLUME_CLASSPATH=""
-JAVA_OPTS="-Xmx1024m -Xms512m -Xmn512m -XX:SurvivorRatio=6"
+
+# Extra options to be passed to the jvm
+DATA_PROXY_MEM=${DATA_PROXY_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
+
+# Garbage collection options
+DATA_PROXY_GC=${DATA_PROXY_GC:-"-XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC"}
+
+# Garbage collection log.
+IS_JAVA_8=`java -version 2>&1 |grep version|grep '"1\.8'`
+# java version has space, use [[ -n $PARAM ]] to judge if variable exists
+if [[ -n $IS_JAVA_8 ]]; then
+ DATA_PROXY_GC_LOG=${DATA_PROXY_GC_LOG:-"-Xloggc:logs/dataproxy_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=20M"}
+else
+# After jdk 9, gc log param should config like this. Ignoring version less than jdk 8
+ DATA_PROXY_GC_LOG=${DATA_PROXY_GC_LOG:-"-Xlog:gc*:logs/dataproxy_gc_%p.log:time,uptime:filecount=10,filesize=20M"}
+fi
+
+LOG_PATH=$(pwd)/logs
+# Extra options to be passed to the jvm
+DATA_PROXY_EXTRA_OPTS=${DATA_PROXY_EXTRA_OPTS:-"-Ddataproxy.log.path="$LOG_PATH" -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}
+
+JAVA_OPTS="$DATA_PROXY_MEM $DATA_PROXY_GC $DATA_PROXY_GC_LOG $DATA_PROXY_EXTRA_OPTS"
+
#LD_LIBRARY_PATH=""
opt_conf=""
diff --git a/inlong-dataproxy/conf/log4j.properties b/inlong-dataproxy/conf/log4j.properties
deleted file mode 100644
index 53b6dd7..0000000
--- a/inlong-dataproxy/conf/log4j.properties
+++ /dev/null
@@ -1,119 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,debug,info,warn,error
-log.dir = ./logs
-log.file.name.prefix=dataproxy
-log.file.info.name.postfix=.info
-log.file.debug.name.postfix=.debug
-log.file.warn.name.postfix=.warn
-log.file.error.name.postfix=.error
-log.file.stat.name=dataproxy_stat.log
-log.file.monitors.name=dataproxy_monitors.log
-log.file.index.name=dataproxy_index.log
-
-
-log4j.logger.org.apache.inlong.commons.monitor.MonitorIndexExt = info,monitor
-log4j.additivity.org.apache.inlong.commons.monitor.MonitorIndexExt = false
-log4j.appender.monitor=org.apache.log4j.RollingFileAppender
-log4j.appender.monitor.MaxFileSize=100MB
-log4j.appender.monitor.MaxBackupIndex=10
-log4j.appender.monitor.BufferedIO=false
-log4j.appender.monitor.BufferSize=8192
-log4j.appender.monitor.layout=org.apache.log4j.PatternLayout
-log4j.appender.monitor.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
-log4j.appender.monitor.Threshold = INFO
-log4j.appender.monitor.Append=true
-log4j.appender.monitor.File=${log.dir}/${log.file.monitors.name}
-
-log4j.logger.org.apache.inlong.commons.monitor.MonitorIndex=info,index
-log4j.additivity.org.apache.inlong.commons.monitor.MonitorIndex = false
-log4j.appender.index=org.apache.log4j.RollingFileAppender
-log4j.appender.index.MaxFileSize=100MB
-log4j.appender.index.MaxBackupIndex=10
-log4j.appender.index.BufferedIO=false
-log4j.appender.index.BufferSize=8192
-log4j.appender.index.layout=org.apache.log4j.PatternLayout
-log4j.appender.index.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
-log4j.appender.index.Threshold = INFO
-log4j.appender.index.Append=true
-log4j.appender.index.File=${log.dir}/${log.file.index.name}
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-
-log4j.appender.stat=org.apache.log4j.RollingFileAppender
-log4j.appender.stat.MaxFileSize=100MB
-log4j.appender.stat.MaxBackupIndex=10
-log4j.appender.stat.BufferedIO=false
-log4j.appender.stat.BufferSize=8192
-log4j.appender.stat.layout=org.apache.log4j.PatternLayout
-log4j.appender.stat.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.stat.Threshold = INFO
-log4j.appender.stat.Append=true
-log4j.appender.stat.File=${log.dir}/${log.file.stat.name}
-
-log4j.logger.info=info
-log4j.appender.info=org.apache.log4j.RollingFileAppender
-log4j.appender.info.MaxFileSize=100MB
-log4j.appender.info.MaxBackupIndex=10
-log4j.appender.info.BufferedIO=false
-log4j.appender.info.BufferSize=8192
-log4j.appender.info.layout=org.apache.log4j.PatternLayout
-log4j.appender.info.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.info.Threshold = INFO
-log4j.appender.info.Append=true
-log4j.appender.info.File=${log.dir}/${log.file.name.prefix}${log.file.info.name.postfix}
-
-log4j.logger.debug=debug
-log4j.appender.debug=org.apache.log4j.RollingFileAppender
-log4j.appender.debug.MaxFileSize=100MB
-log4j.appender.debug.MaxBackupIndex=10
-log4j.appender.debug.BufferedIO=false
-log4j.appender.debug.BufferSize=8192
-log4j.appender.debug.layout=org.apache.log4j.PatternLayout
-log4j.appender.debug.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.debug.Threshold = DEBUG
-log4j.appender.debug.Append=true
-log4j.appender.debug.File=${log.dir}/${log.file.name.prefix}${log.file.debug.name.postfix}
-
-log4j.logger.warn=warn
-log4j.appender.warn=org.apache.log4j.RollingFileAppender
-log4j.appender.warn.MaxFileSize=100MB
-log4j.appender.warn.MaxBackupIndex=10
-log4j.appender.warn.BufferedIO=false
-log4j.appender.warn.BufferSize=8192
-log4j.appender.warn.layout=org.apache.log4j.PatternLayout
-log4j.appender.warn.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.warn.Threshold = WARN
-log4j.appender.warn.Append=true
-log4j.appender.warn.File=${log.dir}/${log.file.name.prefix}${log.file.warn.name.postfix}
-
-log4j.logger.error=error
-log4j.appender.error=org.apache.log4j.RollingFileAppender
-log4j.appender.error.MaxFileSize=100MB
-log4j.appender.error.MaxBackupIndex=10
-log4j.appender.error.BufferedIO=false
-log4j.appender.error.BufferSize=8192
-log4j.appender.error.layout=org.apache.log4j.PatternLayout
-log4j.appender.error.layout.ConversionPattern=%d{ISO8601} %p %c{2} (%F:%M:%L): %m%n
-log4j.appender.error.Threshold = ERROR
-log4j.appender.error.Append=true
-log4j.appender.error.File=${log.dir}/${log.file.name.prefix}${log.file.error.name.postfix}
diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
new file mode 100644
index 0000000..615de41
--- /dev/null
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration status="WARN" monitorInterval="30">
+ <Properties>
+ <property name="basePath">${sys:dataproxy.log.path}</property>
+ <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.} : %m%n</property>
+ <property name="every_file_size">1G</property>
+ <property name="output_log_level">DEBUG</property>
+ <property name="rolling_max">50</property>
+ <property name="info_fileName">${basePath}/info.log</property>
+ <property name="info_filePattern">${basePath}/info-%d{yyyy-MM-dd}-%i.log.gz</property>
+ <property name="info_max">10</property>
+ <property name="warn_fileName">${basePath}/warn.log</property>
+ <property name="warn_filePattern">${basePath}/warn-%d{yyyy-MM-dd}-%i.log.gz</property>
+ <property name="warn_max">10</property>
+ <property name="error_fileName">${basePath}/error.log</property>
+ <property name="error_filePattern">${basePath}/error-%d{yyyy-MM-dd}-%i.log.gz</property>
+ <property name="error_max">10</property>
+ <property name="console_print_level">DEBUG</property>
+ <property name="index_fileName">${basePath}/index.log</property>
+ <property name="index_filePattern">${basePath}/index-%d{yyyy-MM-dd}-%i.log.gz</property>
+ <property name="monitors_fileName">${basePath}/monitors.log</property>
+ <property name="monitors_filePattern">${basePath}/monitors-%d{yyyy-MM-dd}-%i.log.gz</property>
+ </Properties>
+
+ <appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="${log_pattern}"/>
+ </Console>
+
+ <RollingFile name="InfoFile" fileName="${info_fileName}" filePattern="${info_filePattern}">
+ <PatternLayout pattern="${log_pattern}"/>
+ <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+ <DefaultRolloverStrategy max="${info_max}" />
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ </RollingFile>
+
+ <RollingFile name="IndexFile" fileName="${index_fileName}" filePattern="${index_filePattern}">
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %m%n"/>
+ <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+ <DefaultRolloverStrategy max="${info_max}" />
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ </RollingFile>
+
+ <RollingFile name="MonitorFile" fileName="${monitors_fileName}" filePattern="${monitors_filePattern}">
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %m%n"/>
+ <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+ <DefaultRolloverStrategy max="${info_max}" />
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ </RollingFile>
+
+ <RollingFile name="WarnFile" fileName="${warn_fileName}" filePattern="${warn_filePattern}">
+ <PatternLayout pattern="${log_pattern}"/>
+ <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+ <DefaultRolloverStrategy max="${warn_max}" />
+ <Filters>
+ <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ </RollingFile>
+
+ <RollingFile name="ErrorFile" fileName="${error_fileName}" filePattern="${error_filePattern}">
+ <PatternLayout pattern="${log_pattern}"/>
+ <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+ <DefaultRolloverStrategy max="${error_max}" />
+ <Filters>
+ <ThresholdFilter level="FATAL" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ </RollingFile>
+ </appenders>
+
+ <loggers>
+ <logger name="org.apache.inlong.commons.monitor.MonitorIndexExt" level="info" additivity="false">
+ <appender-ref ref="MonitorFile"/>
+ </logger>
+ <logger name="org.apache.inlong.commons.monitor.MonitorIndex" level="info" additivity="false">
+ <appender-ref ref="IndexFile"/>
+ </logger>
+ <root level="${output_log_level}">
+ <appender-ref ref="Console"/>
+ <appender-ref ref="InfoFile"/>
+ <appender-ref ref="WarnFile"/>
+ <appender-ref ref="ErrorFile"/>
+ </root>
+ </loggers>
+</configuration>
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/pom.xml b/inlong-dataproxy/dataproxy-source/pom.xml
index 6950f75..6bffcf4 100644
--- a/inlong-dataproxy/dataproxy-source/pom.xml
+++ b/inlong-dataproxy/dataproxy-source/pom.xml
@@ -39,14 +39,14 @@
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
- <artifactId>tubemq-client</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java
index 10ef942..a3b2d38 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/PulsarConfig.java
@@ -48,6 +48,12 @@ public class PulsarConfig extends Context {
private static final String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
private static final String PULSAR_IO_THREADS = "pulsar_io_threads";
private static final String PULSAR_CONNECTIONS_PRE_BROKER = "connections_pre_broker";
+ private static final String MAX_BATCHING_BYTES = "max_batching_bytes";
+ private static final String MAX_BATCHING_PUBLISH_DELAY_MILLIS =
+ "max_batching_publish_delay_millis";
+ private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+
+ private static final String BAD_EVENT_QUEUE_SIZE = "bad_event_queue_size";
/*
* properties for stat
*/
@@ -64,16 +70,28 @@ public class PulsarConfig extends Context {
private static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
private static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
private static final int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+ private static final int DEFAULT_MAX_BATCHING_BYTES = 128 * 1024;
+ private static final long DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS = 1L;
private static final int DEFAULT_RETRY_CNT = -1;
private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
private static final int DEFAULT_STAT_INTERVAL_SEC = 60;
private static final int DEFAULT_THREAD_NUM = 4;
private static final boolean DEFAULT_CLIENT_ID_CACHE = true;
private static final long DEFAULT_DISK_IO_RATE_PER_SEC = 0L;
- private static int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
+ private static final int DEFAULT_EVENT_QUEUE_SIZE = 10000;
+ private static final int DEFAULT_BAD_EVENT_QUEUE_SIZE = 10000;
+ private static final int DEFAULT_PULSAR_IO_THREADS = Math.max(1, SystemPropertyUtil
.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
private static final int DEFAULT_CONNECTIONS_PRE_BROKER = 1;
+ public int getEventQueueSize() {
+ return getInteger(EVENT_QUEUE_SIZE, DEFAULT_EVENT_QUEUE_SIZE);
+ }
+
+ public int getBadEventQueueSize() {
+ return getInteger(BAD_EVENT_QUEUE_SIZE, DEFAULT_BAD_EVENT_QUEUE_SIZE);
+ }
+
public Map<String, String> getUrl2token() {
return url2token;
}
@@ -102,6 +120,14 @@ public class PulsarConfig extends Context {
return getInteger(CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT_SECOND);
}
+ public int getMaxBatchingBytes() {
+ return getInteger(MAX_BATCHING_BYTES, DEFAULT_MAX_BATCHING_BYTES);
+ }
+
+ public long getMaxBatchingPublishDelayMillis() {
+ return getLong(MAX_BATCHING_PUBLISH_DELAY_MILLIS, DEFAULT_MAX_BATCHING_PUBLISH_DELAY_MILLIS);
+ }
+
public boolean getEnableBatch() {
return getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index fedcc80..d498b9c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -54,6 +54,8 @@ public class ConfigConstants {
public static final String MAX_THREADS = "max-threads";
+ public static final String ENABLE_BUSY_WAIT = "enableBusyWait";
+
public static final String STAT_INTERVAL_SEC = "stat-interval-sec";
public static final String HEART_INTERVAL_SEC = "heart-interval-sec";
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 7d2d90e..2c20525 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -91,9 +91,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
/*
* default value
*/
- private static int BAD_EVENT_QUEUE_SIZE = 10000;
private static int BATCH_SIZE = 10000;
-
/*
* for log
*/
@@ -141,12 +139,16 @@ public class PulsarSink extends AbstractSink implements Configurable,
private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60 * 1000);
private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60 * 1000);
- private static int EVENT_QUEUE_SIZE = 1000;
+ private static final String SINK_THREAD_NUM = "thread-num";
+ private int eventQueueSize = 10000;
+ private int badEventQueueSize = 10000;
+ private int threadNum;
/*
* send thread pool
*/
private Thread[] sinkThreadPool;
+ private PulsarClientService pulsarClientService;
private LinkedBlockingQueue<Event> eventQueue;
private static final String SEPARATOR = "#";
@@ -156,9 +158,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
private SinkCounter sinkCounter;
private ConfigManager configManager;
private Map<String, String> topicProperties;
+
private Map<String, String> pulsarCluster;
private PulsarConfig pulsarConfig;
- private PulsarClientService pulsarClientService;
private static final Long PRINT_INTERVAL = 30L;
@@ -194,6 +196,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
* configure
* @param context
*/
+ @Override
public void configure(Context context) {
logger.info("PulsarSink started and context = {}", context.toString());
isNewMetricOn = context.getBoolean("new-metric-on", true);
@@ -203,11 +206,15 @@ public class PulsarSink extends AbstractSink implements Configurable,
topicProperties = configManager.getTopicProperties();
pulsarCluster = configManager.getPulsarUrl2Token();
pulsarConfig = configManager.getPulsarConfig(); //pulsar common config
+ pulsarClientService = new PulsarClientService(pulsarConfig);
configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
@Override
public void update() {
- diffSetPublish(new HashSet<String>(topicProperties.values()),
- new HashSet<String>(configManager.getTopicProperties().values()));
+ if (pulsarClientService != null) {
+ diffSetPublish(pulsarClientService,
+ new HashSet<String>(topicProperties.values()),
+ new HashSet<String>(configManager.getTopicProperties().values()));
+ }
}
});
configManager.getPulsarCluster().addUpdateCallback(new ConfigUpdateCallback() {
@@ -216,23 +223,24 @@ public class PulsarSink extends AbstractSink implements Configurable,
diffRestartPulsarClient(pulsarCluster.keySet(), configManager.getPulsarUrl2Token().keySet());
}
});
- resendQueue = new LinkedBlockingQueue<EventStat>(BAD_EVENT_QUEUE_SIZE);
+ badEventQueueSize = pulsarConfig.getBadEventQueueSize();
+ resendQueue = new LinkedBlockingQueue<EventStat>(badEventQueueSize);
Preconditions.checkArgument(pulsarConfig.getThreadNum() > 0, "threadNum must be > 0");
sinkThreadPool = new Thread[pulsarConfig.getThreadNum()];
- eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+ eventQueueSize = pulsarConfig.getEventQueueSize();
+ eventQueue = new LinkedBlockingQueue<Event>(eventQueueSize);
if (pulsarConfig.getDiskIoRatePerSec() != 0) {
diskRateLimiter = RateLimiter.create(pulsarConfig.getDiskIoRatePerSec());
}
- pulsarClientService = new PulsarClientService(pulsarConfig);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
- private void initTopicSet(Set<String> topicSet) throws Exception {
+ private void initTopicSet(PulsarClientService pulsarClientService, Set<String> topicSet) {
long startTime = System.currentTimeMillis();
if (topicSet != null) {
for (String topic : topicSet) {
@@ -250,8 +258,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
* @param originalSet
* @param endSet
*/
- public void diffSetPublish(Set<String> originalSet, Set<String> endSet) {
-
+ public void diffSetPublish(PulsarClientService pulsarClientService, Set<String> originalSet,
+ Set<String> endSet) {
boolean changed = false;
for (String s : endSet) {
if (!originalSet.contains(s)) {
@@ -284,7 +292,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
configManager.getPulsarConfig().setUrl2token(pulsarCluster);
pulsarClientService.initCreateConnection(this);
try {
- initTopicSet(new HashSet<String>(topicProperties.values()));
+ initTopicSet(pulsarClientService, new HashSet<String>(topicProperties.values()));
} catch (Exception e) {
logger.info("pulsar sink restart, publish topic fail.", e);
}
@@ -314,14 +322,14 @@ public class PulsarSink extends AbstractSink implements Configurable,
this.canSend = true;
this.canTake = true;
- try {
- initTopicSet(new HashSet<String>(topicProperties.values()));
- } catch (Exception e) {
- logger.info("pulsar sink start publish topic fail.", e);
- }
-
for (int i = 0; i < sinkThreadPool.length; i++) {
- sinkThreadPool[i] = new Thread(new SinkTask(), getName()
+ try {
+ initTopicSet(pulsarClientService,
+ new HashSet<String>(topicProperties.values()));
+ } catch (Exception e) {
+ logger.info("pulsar sink start publish topic fail.", e);
+ }
+ sinkThreadPool[i] = new Thread(new SinkTask(pulsarClientService), getName()
+ "_pulsar_sink_sender-"
+ i);
sinkThreadPool[i].start();
@@ -332,7 +340,6 @@ public class PulsarSink extends AbstractSink implements Configurable,
@Override
public void stop() {
logger.info("pulsar sink stopping");
- pulsarClientService.close();
this.canTake = false;
int waitCount = 0;
while (eventQueue.size() != 0 && waitCount++ < 10) {
@@ -351,6 +358,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
logger.warn("stat runner interrupted");
}
}
+ if (pulsarClientService != null) {
+ pulsarClientService.close();
+ }
if (sinkThreadPool != null) {
for (Thread thread : sinkThreadPool) {
if (thread != null) {
@@ -359,6 +369,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
sinkThreadPool = null;
}
+
super.stop();
if (!scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdown();
@@ -620,6 +631,13 @@ public class PulsarSink extends AbstractSink implements Configurable,
}
class SinkTask implements Runnable {
+
+ private PulsarClientService pulsarClientService;
+
+ public SinkTask(PulsarClientService pulsarClientService) {
+ this.pulsarClientService = pulsarClientService;
+ }
+
@Override
public void run() {
logger.info("Sink task {} started.", Thread.currentThread().getName());
@@ -673,6 +691,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
if (logger.isDebugEnabled()) {
logger.debug("Event is {}, topic = {} ", event, topic);
}
+
if (event == null) {
continue;
}
@@ -714,7 +733,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
getName(), clientId);
}
} else {
- if (clientId != null) {
+ if (pulsarConfig.getClientIdCache() && clientId != null) {
agentIdCache.put(clientId, System.currentTimeMillis());
}
boolean sendResult = pulsarClientService.sendMessage(topic, event,
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index d219395..b121875 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -55,7 +55,6 @@ public class PulsarClientService {
/*
* for pulsar client
*/
- private String[] pulsarServerUrls;
private Map<String, String> pulsarUrl2token;
private String authType;
@@ -67,7 +66,9 @@ public class PulsarClientService {
private boolean enableBatch = true;
private boolean blockIfQueueFull = true;
private int maxPendingMessages = 10000;
+ private int maxBatchingBytes = 128 * 1024;
private int maxBatchingMessages = 1000;
+ private long maxBatchingPublishDelayMillis = 1;
private long retryIntervalWhenSendMsgError = 30 * 1000L;
public Map<String, List<TopicProducerInfo>> producerInfoMap;
public Map<String, AtomicLong> topicSendIndexMap;
@@ -99,18 +100,11 @@ public class PulsarClientService {
blockIfQueueFull = pulsarConfig.getBlockIfQueueFull();
maxPendingMessages = pulsarConfig.getMaxPendingMessages();
maxBatchingMessages = pulsarConfig.getMaxBatchingMessages();
+ maxBatchingBytes = pulsarConfig.getMaxBatchingBytes();
+ maxBatchingPublishDelayMillis = pulsarConfig.getMaxBatchingPublishDelayMillis();
producerInfoMap = new ConcurrentHashMap<String, List<TopicProducerInfo>>();
topicSendIndexMap = new ConcurrentHashMap<String, AtomicLong>();
localIp = NetworkUtils.getLocalIp();
-
- // retryIntervalWhenSendMsgError = context.getLong(RETRY_INTERVAL_WHEN_SEND_ERROR_MILL,
-// DEFAULT_RETRY_INTERVAL_WHEN_SEND_ERROR_MILL);
-// clientTimeout = context.getInteger(CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT_SECOND);
-// enableBatch = context.getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
-// blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, DEFAULT_BLOCK_IF_QUEUE_FULL);
-// maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, DEFAULT_MAX_PENDING_MESSAGES);
-// maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, DEFAULT_MAX_BATCHING_MESSAGES);
-
}
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
@@ -157,6 +151,8 @@ public class PulsarClientService {
* After 30s, reopen the topic check, if it is still a null value,
* put it back into the illegal map
*/
+ sendMessageCallBack.handleMessageSendException(topic, es, new Exception("producer is "
+ + "null"));
return false;
}
@@ -240,12 +236,18 @@ public class PulsarClientService {
public List<TopicProducerInfo> initTopicProducer(String topic) {
List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
- List<TopicProducerInfo> newList = new ArrayList<>();
+ List<TopicProducerInfo> newList = null;
if (pulsarClients != null) {
+ newList = new ArrayList<>();
for (PulsarClient pulsarClient : pulsarClients) {
TopicProducerInfo info = new TopicProducerInfo(pulsarClient, topic);
info.initProducer();
- newList.add(info);
+ if (info.isCanUseToSendMessage()) {
+ newList.add(info);
+ }
+ }
+ if (newList.size() == 0) {
+ newList = null;
}
}
return newList;
@@ -258,7 +260,7 @@ public class PulsarClientService {
AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> {
return new AtomicLong(0);
});
- int maxTryToGetProducer = producerList.size();
+ int maxTryToGetProducer = producerList == null ? 0 : producerList.size();
if (maxTryToGetProducer == 0) {
return null;
}
@@ -329,6 +331,8 @@ public class PulsarClientService {
.blockIfQueueFull(blockIfQueueFull)
.maxPendingMessages(maxPendingMessages)
.batchingMaxMessages(maxBatchingMessages)
+ .batchingMaxBytes(maxBatchingBytes)
+ .batchingMaxPublishDelay(maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS)
.create();
isFinishInit = true;
} catch (PulsarClientException e) {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 9e0c651..83bc962 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -18,6 +18,13 @@
package org.apache.inlong.dataproxy.source;
import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.lang.reflect.Constructor;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelSelector;
@@ -32,11 +39,6 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder;
-import org.jboss.netty.bootstrap.Bootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,11 +94,17 @@ public abstract class BaseSource
* netty server
*/
- protected Bootstrap serverBootstrap = null;
+ protected EventLoopGroup acceptorGroup;
+
+ protected EventLoopGroup workerGroup;
+
+ protected DefaultThreadFactory acceptorThreadFactory;
+
+ protected boolean enableBusyWait = false;
protected ChannelGroup allChannels;
- protected Channel nettyChannel = null;
+ protected ChannelFuture channelFuture;
private static String HOST_DEFAULT_VALUE = "0.0.0.0";
@@ -114,13 +122,29 @@ public abstract class BaseSource
private static int INTERVAL_SEC = 60;
+ protected static int BUFFER_SIZE_MUST_THAN = 0;
+
protected static int DEFAULT_MAX_THREADS = 32;
+ protected static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+ protected static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+ protected static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+ protected static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+ protected int receiveBufferSize;
+
+ protected int sendBufferSize;
+
protected int maxThreads = 32;
+ protected int acceptorThreads = 1;
+
public BaseSource() {
super();
- allChannels = new DefaultChannelGroup();
+ allChannels = new DefaultChannelGroup("DefaultChannelGroup", GlobalEventExecutor.INSTANCE);
}
@Override
@@ -149,21 +173,12 @@ public abstract class BaseSource
try {
allChannels.close().awaitUninterruptibly();
} catch (Exception e) {
- logger.warn("Simple UDP Source netty server stop ex, {}", e);
+ logger.warn("Simple Source netty server stop ex, {}", e);
} finally {
allChannels.clear();
}
}
- if (serverBootstrap != null) {
- try {
- serverBootstrap.releaseExternalResources();
- } catch (Exception e) {
- logger.warn("Simple UDP Source serverBootstrap stop ex {}", e);
- } finally {
- serverBootstrap = null;
- }
- }
super.stop();
if (monitorIndex != null) {
monitorIndex.shutDown();
@@ -171,6 +186,14 @@ public abstract class BaseSource
if (monitorIndexExt != null) {
monitorIndexExt.shutDown();
}
+
+ if (channelFuture != null) {
+ try {
+ channelFuture.channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ logger.warn("Simple Source netty server stop ex, {}", e);
+ }
+ }
logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.getName());
}
@@ -245,6 +268,22 @@ public abstract class BaseSource
context.getString(ConfigConstants.MAX_THREADS));
}
+ receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, RECEIVE_BUFFER_DEFAULT_SIZE);
+ if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
+ receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
+ }
+ Preconditions.checkArgument(receiveBufferSize > BUFFER_SIZE_MUST_THAN,
+ "receiveBufferSize must be > 0");
+
+ sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, SEND_BUFFER_DEFAULT_SIZE);
+ if (sendBufferSize > SEND_BUFFER_MAX_SIZE) {
+ sendBufferSize = SEND_BUFFER_MAX_SIZE;
+ }
+ Preconditions.checkArgument(sendBufferSize > BUFFER_SIZE_MUST_THAN,
+ "sendBufferSize must be > 0");
+
+ enableBusyWait = context.getBoolean(ConfigConstants.ENABLE_BUSY_WAIT, false);
+
this.customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false);
}
@@ -252,21 +291,21 @@ public abstract class BaseSource
* channel factory
* @return
*/
- public ChannelPipelineFactory getChannelPiplineFactory() {
+ public ChannelInitializer getChannelInitializerFactory() {
logger.info(new StringBuffer("load msgFactory=").append(msgFactoryName)
.append(" and serviceDecoderName=").append(serviceDecoderName).toString());
- ChannelPipelineFactory fac = null;
+ ChannelInitializer fac = null;
try {
ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance();
- Class<? extends ChannelPipelineFactory> clazz =
- (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
+ Class<? extends ChannelInitializer> clazz =
+ (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
String.class, ServiceDecoder.class, String.class, Integer.class,
String.class, String.class, Boolean.class,
Integer.class, Boolean.class, MonitorIndex.class,
MonitorIndexExt.class, String.class);
logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
- fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
+ fac = (ChannelInitializer) ctor.newInstance(this, allChannels,
this.getProtocolName(), serviceDecoder, messageHandlerName, maxMsgLength,
topic, attr, filterEmptyMsg,
maxConnections, isCompressed, monitorIndex,
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index d2c7ff6..c79a4a8 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -19,6 +19,8 @@ package org.apache.inlong.dataproxy.source;
import com.google.common.base.Splitter;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -34,9 +36,6 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
@@ -77,14 +76,12 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* @param resultMap
* @param cb
* @param channel
- * @param msgEvent
* @param totalDataLen
* @return
* @throws
*/
private Map<String, Object> extractNewBinHB(Map<String, Object> resultMap,
- ChannelBuffer cb, Channel channel,
- MessageEvent msgEvent, int totalDataLen) throws Exception {
+ ByteBuf cb, Channel channel, int totalDataLen) throws Exception {
int msgHeadPos = cb.readerIndex() - 5;
// check validation
@@ -119,7 +116,6 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
private void handleDateTime(Map<String, String> commonAttrMap, Channel channel,
- MessageEvent msgEvent,
long uniq, long dataTime, int msgCount) {
commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
String time = "";
@@ -134,10 +130,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* udp need use msgEvent get remote address
*/
String remoteAddress = "";
- if (channel != null && channel.getRemoteAddress() != null) {
- remoteAddress = channel.getRemoteAddress().toString();
- } else if (msgEvent != null && msgEvent.getRemoteAddress() != null) {
- remoteAddress = msgEvent.getRemoteAddress().toString();
+ if (channel != null && channel.remoteAddress() != null) {
+ remoteAddress = channel.remoteAddress().toString();
}
sidBuilder.append(remoteAddress).append("#").append(time)
.append("#").append(uniq);
@@ -151,7 +145,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
String.valueOf(msgCount != 0 ? msgCount : 1));
}
- private boolean handleExtMap(Map<String, String> commonAttrMap, ChannelBuffer cb,
+ private boolean handleExtMap(Map<String, String> commonAttrMap, ByteBuf cb,
Map<String, Object> resultMap, int extendField, int msgHeadPos) {
boolean index = false;
if ((extendField & 0x8) == 0x8) {
@@ -177,7 +171,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
return index;
}
- private ByteBuffer handleTrace(Channel channel, ChannelBuffer cb, int extendField,
+ private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int extendField,
int msgHeadPos, int totalDataLen, int attrLen, String strAttr, int bodyLen) {
// whether enable trace
boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
@@ -190,7 +184,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
String traceInfo;
String strNode2Ip = null;
- SocketAddress loacalSockAddr = channel.getLocalAddress();
+ SocketAddress loacalSockAddr = channel.localAddress();
if (null != loacalSockAddr) {
strNode2Ip = loacalSockAddr.toString();
try {
@@ -237,14 +231,13 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* @param resultMap
* @param cb
* @param channel
- * @param msgEvent
* @param totalDataLen
* @param msgType
* @return
* @throws Exception
*/
private Map<String, Object> extractNewBinData(Map<String, Object> resultMap,
- ChannelBuffer cb, Channel channel, MessageEvent msgEvent,
+ ByteBuf cb, Channel channel,
int totalDataLen, MsgType msgType) throws Exception {
int msgHeadPos = cb.readerIndex() - 5;
@@ -306,7 +299,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
try {
- handleDateTime(commonAttrMap, channel, msgEvent, uniq, dataTime, msgCount);
+ handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount);
final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
totalDataLen, attrLen, strAttr, bodyLen);
@@ -358,14 +351,12 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* @param cb
* @param channel
* @param totalDataLen
- * @param msgEvent
* @param msgType
* @return
* @throws Exception
*/
private Map<String, Object> extractDefaultData(Map<String, Object> resultMap,
- ChannelBuffer cb, Channel channel,
- MessageEvent msgEvent,
+ ByteBuf cb, Channel channel,
int totalDataLen, MsgType msgType) throws Exception {
int bodyLen = cb.readInt();
if (bodyLen == 0) {
@@ -481,8 +472,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* +--------+--------+--------+----------------+--------+----------------+------------------------+
*/
@Override
- public Map<String, Object> extractData(ChannelBuffer cb, Channel channel,
- MessageEvent msgEvent) throws Exception {
+ public Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception {
Map<String, Object> resultMap = new HashMap<String, Object>();
if (null == cb) {
LOG.error("cb == null");
@@ -509,14 +499,14 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
// if it's bin heart beat.
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
- return extractNewBinHB(resultMap, cb, channel, msgEvent, totalDataLen);
+ return extractNewBinHB(resultMap, cb, channel, totalDataLen);
}
if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) {
resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : "");
- return extractNewBinData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
+ return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType);
} else {
- return extractDefaultData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
+ return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType);
}
} else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index fa42169..91dd75d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -17,6 +17,12 @@
package org.apache.inlong.dataproxy.source;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.ReadTimeoutHandler;
import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
import org.apache.flume.channel.ChannelProcessor;
@@ -24,21 +30,11 @@ import org.apache.flume.source.AbstractSource;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.commons.monitor.MonitorIndex;
import org.apache.inlong.commons.monitor.MonitorIndexExt;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerMessageFactory implements ChannelPipelineFactory {
+public class ServerMessageFactory
+ extends ChannelInitializer<SocketChannel> {
private static final Logger LOG = LoggerFactory.getLogger(ServerMessageFactory.class);
@@ -56,8 +52,6 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
private ChannelGroup allChannels;
- private ExecutionHandler executionHandler;
-
private String protocolType;
private ServiceDecoder serviceDecoder;
@@ -82,8 +76,6 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
private MonitorIndexExt monitorIndexExt;
- private Timer timer = new HashedWheelTimer();
-
/**
* get server factory
*
@@ -122,37 +114,24 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
this.isCompressed = isCompressed;
this.monitorIndex = monitorIndex;
this.monitorIndexExt = monitorIndexExt;
- if (protocolType.equalsIgnoreCase(ConfigConstants.UDP_PROTOCOL)) {
- this.executionHandler = new ExecutionHandler(
- new OrderedMemoryAwareThreadPoolExecutor(cores * 2,
- MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE));
- }
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline cp = Channels.pipeline();
- return addMessageHandlersTo(cp);
- }
-
- /**
- * get message handlers
- * @param cp
- * @return
- */
- public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
+ protected void initChannel(SocketChannel ch) throws Exception {
if (this.protocolType
.equalsIgnoreCase(ConfigConstants.TCP_PROTOCOL)) {
- cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
- this.maxMsgLength, 0, MSG_LENGTH_LEN, 0, 0, true));
- cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
- DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+ ch.pipeline().addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
+ this.maxMsgLength, 0,
+ MSG_LENGTH_LEN, 0, 0, true));
+ ch.pipeline().addLast("readTimeoutHandler",
+ new ReadTimeoutHandler(DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
}
if (processor != null) {
try {
- Class<? extends SimpleChannelHandler> clazz = (Class<? extends SimpleChannelHandler>) Class
+ Class<? extends ChannelInboundHandlerAdapter> clazz
+ = (Class<? extends ChannelInboundHandlerAdapter>) Class
.forName(messageHandlerName);
Constructor<?> ctor = clazz.getConstructor(
@@ -161,22 +140,16 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
Integer.class, Boolean.class, MonitorIndex.class,
MonitorIndexExt.class, String.class);
- SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
+ ChannelInboundHandlerAdapter messageHandler = (ChannelInboundHandlerAdapter) ctor
.newInstance(source, serviceDecoder, allChannels, topic, attr,
filterEmptyMsg, maxConnections,
isCompressed, monitorIndex, monitorIndexExt, protocolType
);
- cp.addLast("messageHandler", messageHandler);
+ ch.pipeline().addLast("messageHandler", messageHandler);
} catch (Exception e) {
LOG.info("SimpleChannelHandler.newInstance has error:" + name, e);
}
}
-
- if (this.protocolType.equalsIgnoreCase(ConfigConstants.UDP_PROTOCOL)) {
- cp.addLast("execution", executionHandler);
- }
-
- return cp;
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index baef74b..e433716 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -22,6 +22,12 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -52,15 +58,6 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +68,7 @@ import com.google.common.base.Splitter;
* Server message handler
*
*/
-public class ServerMessageHandler extends SimpleChannelHandler {
+public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
@@ -120,8 +117,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
private String defaultMXAttr = "m=3";
- private final ChannelBuffer heartbeatBuffer;
-
private final String protocolType;
@@ -148,7 +143,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
this.filterEmptyMsg = filterEmptyMsg;
this.isCompressed = isCompressed;
- this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
this.maxConnections = maxCons;
this.protocolType = protocolType;
if (source instanceof SimpleTcpSource) {
@@ -166,7 +160,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
private String getRemoteIp(Channel channel, SocketAddress remoteAddress) {
String strRemoteIp = DEFAULT_REMOTE_IP_VALUE;
- SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+ SocketAddress remoteSocketAddress = channel.remoteAddress();
if (remoteSocketAddress == null) {
remoteSocketAddress = remoteAddress;
}
@@ -242,21 +236,28 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (allChannels.size() - 1 >= maxConnections) {
logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1)
+ ", maxConnections="
- + maxConnections + ",channel is " + e.getChannel());
- e.getChannel().disconnect();
- e.getChannel().close();
+ + maxConnections + ",channel is " + ctx.channel());
+ ctx.channel().disconnect();
+ ctx.channel().close();
}
- if (!checkBlackIp(e.getChannel())) {
+ if (!checkBlackIp(ctx.channel())) {
logger.info("connections={},maxConnections={}", allChannels.size() - 1, maxConnections);
- allChannels.add(e.getChannel());
- super.channelOpen(ctx, e);
+ allChannels.add(ctx.channel());
+ ctx.fireChannelActive();
}
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ logger.error("channel inactive {}", ctx.channel());
+ ctx.fireChannelInactive();
+ allChannels.remove(ctx.channel());
+ }
+
private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
Map<String, String> attrMap, AtomicReference<String> topicInfo) {
String groupId = message.getGroupId();
@@ -510,7 +511,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
backBody = new byte[]{50};
}
int backTotalLen = 1 + 4 + backBody.length + 4 + backAttr.length;
- ChannelBuffer buffer = ChannelBuffers.buffer(4 + backTotalLen);
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
buffer.writeInt(backTotalLen);
buffer.writeByte(msgType.getValue());
buffer.writeInt(backBody.length);
@@ -518,13 +519,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
buffer.writeInt(backAttr.length);
buffer.writeBytes(backAttr);
if (remoteChannel.isWritable()) {
- remoteChannel.write(buffer, remoteSocketAddress);
+ remoteChannel.write(buffer);
} else {
String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
logger.warn(
"the send buffer1 is full, so disconnect it!please check remote client"
+ "; Connection info:"
+ remoteChannel + ";attr is " + backAttrStr);
+ buffer.release();
throw new Exception(new Throwable(
"the send buffer1 is full, so disconnect it!please check remote client"
+
@@ -543,7 +545,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
binTotalLen += backattrs.length();
}
- ChannelBuffer binBuffer = ChannelBuffers.buffer(4 + binTotalLen);
+ ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
binBuffer.writeByte(msgType.getValue());
@@ -564,8 +566,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
binBuffer.writeShort(0xee01);
if (remoteChannel.isWritable()) {
- remoteChannel.write(binBuffer, remoteSocketAddress);
+ remoteChannel.write(binBuffer);
} else {
+ binBuffer.release();
logger.warn(
"the send buffer2 is full, so disconnect it!please check remote client"
+ "; Connection info:" + remoteChannel + ";attr is "
@@ -579,129 +582,122 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("message received");
- if (e == null) {
- logger.error("get null messageevent, just skip");
- this.addMetric(false, 0, null);
- return;
- }
- ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
- String strRemoteIP = getRemoteIp(e.getChannel(), e.getRemoteAddress());
- SocketAddress remoteSocketAddress = e.getRemoteAddress();
- int len = cb.readableBytes();
- if (len == 0 && this.filterEmptyMsg) {
- logger.warn("skip empty msg.");
- cb.clear();
+ if (msg == null) {
+ logger.error("get null msg, just skip");
this.addMetric(false, 0, null);
return;
}
-
- Channel remoteChannel = e.getChannel();
- Map<String, Object> resultMap = null;
+ ByteBuf cb = (ByteBuf) msg;
try {
- resultMap = serviceDecoder.extractData(cb, remoteChannel, e);
- } catch (MessageIDException ex) {
- logger.error("MessageIDException ex = {}", ex);
- this.addMetric(false, 0, null);
- throw new IOException(ex.getCause());
- }
+ Channel remoteChannel = ctx.channel();
+ String strRemoteIP = getRemoteIp(remoteChannel);
+ int len = cb.readableBytes();
+ if (len == 0 && this.filterEmptyMsg) {
+ logger.warn("skip empty msg.");
+ this.addMetric(false, 0, null);
+ return;
+ }
- if (resultMap == null) {
- logger.info("result is null");
- this.addMetric(false, 0, null);
- return;
- }
+ Map<String, Object> resultMap = null;
+ try {
+ resultMap = serviceDecoder.extractData(cb, remoteChannel);
+ } catch (MessageIDException ex) {
+ logger.error("MessageIDException ex = {}", ex);
+ this.addMetric(false, 0, null);
+ throw new IOException(ex.getCause());
+ }
- MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
- if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
- remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
- this.addMetric(false, 0, null);
- return;
- }
+ if (resultMap == null) {
+ logger.info("result is null");
+ this.addMetric(false, 0, null);
+ return;
+ }
- if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
- this.addMetric(false, 0, null);
- return;
- }
+ MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
+ if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+ ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
+ heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+ remoteChannel.write(heartbeatBuffer);
+ this.addMetric(false, 0, null);
+ return;
+ }
- Map<String, String> commonAttrMap =
- (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
- if (commonAttrMap == null) {
- commonAttrMap = new HashMap<String, String>();
- }
+ if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+ this.addMetric(false, 0, null);
+ return;
+ }
- List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
- if (msgList != null
- && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
- && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
- new HashMap<String, HashMap<String, List<ProxyMessage>>>(
- msgList.size());
-
- updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
-
- formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
-
- } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "filestatus");
- headers.put(ConfigConstants.FILE_CHECK_DATA,
- "true");
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
- Event event = EventBuilder.withBody(body, headers);
- try {
- processor.processEvent(event);
- this.addMetric(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
- }
+ Map<String, String> commonAttrMap =
+ (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+ if (commonAttrMap == null) {
+ commonAttrMap = new HashMap<String, String>();
}
- } else if (msgList != null && commonAttrMap
- .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- logger.info("i am in MINUTE_CHECK_DATA");
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "measure");
- headers.put(ConfigConstants.FILE_CHECK_DATA,
- "true");
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
- Event event = EventBuilder.withBody(body, headers);
- try {
- processor.processEvent(event);
- this.addMetric(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
+
+ List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
+ if (msgList != null
+ && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
+ && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+ new HashMap<String, HashMap<String, List<ProxyMessage>>>(
+ msgList.size());
+
+ updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
+
+ formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
+
+ } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("msgtype", "filestatus");
+ headers.put(ConfigConstants.FILE_CHECK_DATA,
+ "true");
+ for (ProxyMessage message : msgList) {
+ byte[] body = message.getData();
+ Event event = EventBuilder.withBody(body, headers);
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, body.length, event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data will discard.", ex);
+ this.addMetric(false, body.length, event);
+ throw new ChannelException(
+ "Process Controller Event error can't write event to channel.");
+ }
+ }
+ } else if (msgList != null && commonAttrMap
+ .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+ logger.info("i am in MINUTE_CHECK_DATA");
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("msgtype", "measure");
+ headers.put(ConfigConstants.FILE_CHECK_DATA,
+ "true");
+ for (ProxyMessage message : msgList) {
+ byte[] body = message.getData();
+ Event event = EventBuilder.withBody(body, headers);
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, body.length, event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data will discard.", ex);
+ this.addMetric(false, body.length, event);
+ throw new ChannelException(
+ "Process Controller Event error can't write event to channel.");
+ }
}
}
+ SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
+ responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
+ } finally {
+ cb.release();
}
- responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- logger.error("exception caught", e.getCause());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exception caught cause = {}", cause);
monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
- }
-
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- logger.error("channel closed {}", ctx.getChannel());
- super.channelClosed(ctx, e);
- try {
- e.getChannel().disconnect();
- e.getChannel().close();
- } catch (Exception ex) {
- //
- }
- allChannels.remove(e.getChannel());
+ ctx.fireExceptionCaught(cause);
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 162d614..ba7778b 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -17,15 +17,10 @@
package org.apache.inlong.dataproxy.source;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import java.util.Map;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.MessageEvent;
-
-/**
- * decoder interface definition
- */
public interface ServiceDecoder {
int HEAD_LENGTH = 4;
@@ -37,5 +32,5 @@ public interface ServiceDecoder {
* @return
* @throws
*/
- Map<String, Object> extractData(ChannelBuffer cb, Channel channel, MessageEvent e) throws Exception;
+ Map<String, Object> extractData(ByteBuf cb, Channel channel) throws Exception;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index f00f19f..caae85e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -22,6 +22,12 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -50,15 +56,6 @@ import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.Constants;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +66,7 @@ import com.google.common.base.Splitter;
* Server message handler
*
*/
-public class SimpleMessageHandler extends SimpleChannelHandler {
+public class SimpleMessageHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(SimpleMessageHandler.class);
@@ -105,7 +102,6 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
private final ServiceDecoder serviceProcessor;
private final String defaultTopic;
private String defaultMXAttr = "m=3";
- private final ChannelBuffer heartbeatBuffer;
private final String protocolType;
private final DataProxyMetricItemSet metricItemSet;
@@ -138,7 +134,6 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
this.filterEmptyMsg = filterEmptyMsg;
this.isCompressed = isCompressed;
- this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
this.maxConnections = maxCons;
this.protocolType = protocolType;
if (source instanceof SimpleTcpSource) {
@@ -150,7 +145,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
private String getRemoteIp(Channel channel) {
String strRemoteIp = DEFAULT_REMOTE_IP_VALUE;
- SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+ SocketAddress remoteSocketAddress = channel.remoteAddress();
if (null != remoteSocketAddress) {
strRemoteIp = remoteSocketAddress.toString();
try {
@@ -223,18 +218,18 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (allChannels.size() - 1 >= maxConnections) {
logger.warn("refuse to connect , and connections=" + (allChannels.size() - 1)
+ ", maxConnections="
- + maxConnections + ",channel is " + e.getChannel());
- e.getChannel().disconnect();
- e.getChannel().close();
+ + maxConnections + ",channel is " + ctx.channel());
+ ctx.channel().disconnect();
+ ctx.channel().close();
}
- if (!checkBlackIp(e.getChannel())) {
+ if (!checkBlackIp(ctx.channel())) {
logger.info("connections={},maxConnections={}", allChannels.size() - 1, maxConnections);
- allChannels.add(e.getChannel());
- super.channelOpen(ctx, e);
+ allChannels.add(ctx.channel());
+ ctx.fireChannelActive();
}
}
@@ -507,7 +502,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
backBody = new byte[]{50};
}
int backTotalLen = 1 + 4 + backBody.length + 4 + backAttr.length;
- ChannelBuffer buffer = ChannelBuffers.buffer(4 + backTotalLen);
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
buffer.writeInt(backTotalLen);
buffer.writeByte(msgType.getValue());
buffer.writeInt(backBody.length);
@@ -515,13 +510,14 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
buffer.writeInt(backAttr.length);
buffer.writeBytes(backAttr);
if (remoteChannel.isWritable()) {
- remoteChannel.write(buffer, remoteSocketAddress);
+ remoteChannel.write(buffer);
} else {
String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
logger.warn(
"the send buffer1 is full, so disconnect it!please check remote client"
+ "; Connection info:"
+ remoteChannel + ";attr is " + backAttrStr);
+ buffer.release();
throw new Exception(new Throwable(
"the send buffer1 is full, so disconnect it!please check remote client"
+
@@ -540,7 +536,7 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
binTotalLen += backattrs.length();
}
- ChannelBuffer binBuffer = ChannelBuffers.buffer(4 + binTotalLen);
+ ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
binBuffer.writeInt(binTotalLen);
binBuffer.writeByte(msgType.getValue());
@@ -561,12 +557,13 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
binBuffer.writeShort(0xee01);
if (remoteChannel.isWritable()) {
- remoteChannel.write(binBuffer, remoteSocketAddress);
+ remoteChannel.write(binBuffer);
} else {
logger.warn(
"the send buffer2 is full, so disconnect it!please check remote client"
+ "; Connection info:" + remoteChannel + ";attr is "
+ backattrs);
+ binBuffer.release();
throw new Exception(new Throwable(
"the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
+ remoteChannel + ";attr is " + backattrs));
@@ -576,121 +573,122 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent msgEvent) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("message received");
- if (msgEvent == null) {
+ if (msg == null) {
logger.error("get null messageevent, just skip");
this.addMetric(false, 0, null);
return;
}
- ChannelBuffer cb = ((ChannelBuffer) msgEvent.getMessage());
- String strRemoteIP = getRemoteIp(msgEvent.getChannel());
- SocketAddress remoteSocketAddress = msgEvent.getRemoteAddress();
- int len = cb.readableBytes();
- if (len == 0 && this.filterEmptyMsg) {
- logger.warn("skip empty msg.");
- cb.clear();
- this.addMetric(false, 0, null);
- return;
- }
-
- Channel remoteChannel = msgEvent.getChannel();
- Map<String, Object> resultMap = null;
+ Channel remoteChannel = ctx.channel();
+ ByteBuf cb = (ByteBuf) msg;
try {
- resultMap = serviceProcessor.extractData(cb, remoteChannel, msgEvent);
- } catch (MessageIDException ex) {
- this.addMetric(false, 0, null);
- throw new IOException(ex.getCause());
- }
+ int len = cb.readableBytes();
+ if (len == 0 && this.filterEmptyMsg) {
+ logger.warn("skip empty msg.");
+ cb.clear();
+ this.addMetric(false, 0, null);
+ return;
+ }
+ Map<String, Object> resultMap = null;
+ try {
+ resultMap = serviceProcessor.extractData(cb, remoteChannel);
+ } catch (MessageIDException ex) {
+ this.addMetric(false, 0, null);
+ throw new IOException(ex.getCause());
+ }
- if (resultMap == null) {
- logger.info("result is null");
- this.addMetric(false, 0, null);
- return;
- }
+ if (resultMap == null) {
+ logger.info("result is null");
+ this.addMetric(false, 0, null);
+ return;
+ }
- MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
- if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
- remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
- this.addMetric(false, 0, null);
- return;
- }
+ MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
+ if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
+ ByteBuf heartbeatBuffer = ByteBufAllocator.DEFAULT.buffer(5);
+ heartbeatBuffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
+ remoteChannel.write(heartbeatBuffer);
+ this.addMetric(false, 0, null);
+ return;
+ }
- if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-// ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
-// remoteChannel.write(binBuffer, remoteSocketAddress);
- this.addMetric(false, 0, null);
- return;
- }
+ if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
+ this.addMetric(false, 0, null);
+ return;
+ }
- Map<String, String> commonAttrMap = (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
- if (commonAttrMap == null) {
- commonAttrMap = new HashMap<String, String>();
- }
+ Map<String, String> commonAttrMap = (Map<String, String>) resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+ if (commonAttrMap == null) {
+ commonAttrMap = new HashMap<String, String>();
+ }
- List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
- if (msgList != null
- && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
- && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size());
-
- updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
-
- formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
-
- } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-// logger.info("i am in FILE_CHECK_DATA ");
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "filestatus");
- headers.put(ConfigConstants.FILE_CHECK_DATA,
- "true");
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
-// logger.info("data:"+new String(body));
- Event event = EventBuilder.withBody(body, headers);
- try {
- processor.processEvent(event);
- this.addMetric(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
+ List<ProxyMessage> msgList = (List<ProxyMessage>) resultMap.get(ConfigConstants.MSG_LIST);
+ if (msgList != null
+ && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
+ && !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new HashMap<>(msgList.size());
+ String strRemoteIP = getRemoteIp(remoteChannel);
+ updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
+
+ formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
+
+ } else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("msgtype", "filestatus");
+ headers.put(ConfigConstants.FILE_CHECK_DATA,
+ "true");
+ for (ProxyMessage message : msgList) {
+ byte[] body = message.getData();
+ Event event = EventBuilder.withBody(body, headers);
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, body.length, event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data will discard.", ex);
+ this.addMetric(false, body.length, event);
+ throw new ChannelException(
+ "Process Controller Event error can't write event to channel.");
+ }
}
- }
- } else if (msgList != null && commonAttrMap
- .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- logger.info("i am in MINUTE_CHECK_DATA");
- Map<String, String> headers = new HashMap<String, String>();
- headers.put("msgtype", "measure");
- headers.put(ConfigConstants.FILE_CHECK_DATA,
- "true");
- for (ProxyMessage message : msgList) {
- byte[] body = message.getData();
-// logger.info("data:"+new String(body));
- Event event = EventBuilder.withBody(body, headers);
- try {
- processor.processEvent(event);
- this.addMetric(true, body.length, event);
- } catch (Throwable ex) {
- logger.error("Error writing to controller,data will discard.", ex);
- this.addMetric(false, body.length, event);
- throw new ChannelException(
- "Process Controller Event error can't write event to channel.");
+ } else if (msgList != null && commonAttrMap
+ .containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
+ logger.info("i am in MINUTE_CHECK_DATA");
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("msgtype", "measure");
+ headers.put(ConfigConstants.FILE_CHECK_DATA,
+ "true");
+ for (ProxyMessage message : msgList) {
+ byte[] body = message.getData();
+ Event event = EventBuilder.withBody(body, headers);
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, body.length, event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data will discard.", ex);
+ this.addMetric(false, body.length, event);
+ throw new ChannelException(
+ "Process Controller Event error can't write event to channel.");
+ }
}
}
+ SocketAddress remoteSocketAddress = remoteChannel.remoteAddress();
+ responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
+ } finally {
+ cb.release();
}
- responsePackage(commonAttrMap, resultMap, remoteChannel, remoteSocketAddress, msgType);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- logger.error("exception caught", e.getCause());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exception caught cause = {}", cause);
+ ctx.fireExceptionCaught(cause);
}
@Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- logger.error("channel closed {}", ctx.getChannel());
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ logger.error("channel inactive {}", ctx.channel());
+ ctx.fireChannelInactive();
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index d861c8c..dd36012 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -18,6 +18,13 @@
package org.apache.inlong.dataproxy.source;
import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -28,24 +35,14 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
-import java.util.concurrent.Executors;
import org.apache.commons.io.IOUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.commons.config.metrics.MetricRegister;
-import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.apache.inlong.dataproxy.utils.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,45 +63,31 @@ public class SimpleTcpSource extends BaseSource
private static int TRAFFIC_CLASS_TYPE_96 = 96;
- private static int BUFFER_SIZE_MUST_THAN = 0;
-
private static int HIGH_WATER_MARK_DEFAULT_VALUE = 64 * 1024;
- private static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
-
- private static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
-
- private static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
-
- private static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
-
private static int DEFAULT_SLEEP_TIME_MS = 5 * 1000;
private static long propsLastModified;
private CheckBlackListThread checkBlackListThread;
- private int maxThreads = 32;
-
private boolean tcpNoDelay = true;
private boolean keepAlive = true;
- private int receiveBufferSize;
-
private int highWaterMark;
- private int sendBufferSize;
-
private int trafficClass;
protected String topic;
+ private ServerBootstrap bootstrap;
+
private DataProxyMetricItemSet metricItemSet;
public SimpleTcpSource() {
super();
- allChannels = new DefaultChannelGroup();
+
}
/**
@@ -118,7 +101,7 @@ public class SimpleTcpSource extends BaseSource
while (it.hasNext()) {
Channel channel = it.next();
String strRemoteIP = null;
- SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+ SocketAddress remoteSocketAddress = channel.remoteAddress();
if (null != remoteSocketAddress) {
strRemoteIP = remoteSocketAddress.toString();
try {
@@ -203,43 +186,48 @@ public class SimpleTcpSource extends BaseSource
MetricRegister.register(metricItemSet);
checkBlackListThread = new CheckBlackListThread();
checkBlackListThread.start();
- ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
- ChannelFactory factory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(
- new NamedThreadFactory("tcpSource-nettyBoss-threadGroup")), 1,
- Executors.newCachedThreadPool(
- new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")), maxThreads);
+// ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+
logger.info("Set max workers : {} ;", maxThreads);
- serverBootstrap = new ServerBootstrap(factory);
- serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
- serverBootstrap.setOption("child.keepAlive", keepAlive);
- serverBootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
- serverBootstrap.setOption("child.sendBufferSize", sendBufferSize);
- serverBootstrap.setOption("child.trafficClass", trafficClass);
- serverBootstrap.setOption("child.writeBufferHighWaterMark", highWaterMark);
+ acceptorThreadFactory = new DefaultThreadFactory("tcpSource-nettyBoss-threadGroup");
+
+ this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
+ acceptorThreads, false, acceptorThreadFactory);
+
+ this.workerGroup = EventLoopUtil
+ .newEventLoopGroup(maxThreads, enableBusyWait,
+ new DefaultThreadFactory("tcpSource-nettyWorker-threadGroup"));
+
+ bootstrap = new ServerBootstrap();
+
+ bootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
+ bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
+ bootstrap.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
+ bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
+ bootstrap.childOption(ChannelOption.SO_SNDBUF, sendBufferSize);
+// serverBootstrap.childOption("child.trafficClass", trafficClass);
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark);
+ bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
+ EventLoopUtil.enableTriggeredMode(bootstrap);
+ bootstrap.group(acceptorGroup, workerGroup);
logger.info("load msgFactory=" + msgFactoryName
+ " and serviceDecoderName=" + serviceDecoderName);
- ChannelPipelineFactory fac = this.getChannelPiplineFactory();
- serverBootstrap.setPipelineFactory(fac);
-
+ ChannelInitializer fac = this.getChannelInitializerFactory();
+ bootstrap.childHandler(fac);
try {
if (host == null) {
- nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
+ channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
} else {
- nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
+ channelFuture = bootstrap.bind(new InetSocketAddress(host, port)).sync();
}
} catch (Exception e) {
- logger.error("Simple TCP Source error bind host {} port {},program will exit!", host, port);
+ logger.error("Simple TCP Source error bind host {} port {},program will exit! e = {}",
+ host, port, e);
System.exit(-1);
}
-
- allChannels.add(nettyChannel);
-
logger.info("Simple TCP Source started at host {}, port {}", host, port);
-
}
@Override
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
index 7c09c30..5c17451 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -19,19 +19,15 @@
package org.apache.inlong.dataproxy.source;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
-import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.ThreadRenamingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +40,8 @@ public class SimpleUdpSource
private static int UPD_BUFFER_DEFAULT_SIZE = 8192;
+ private Bootstrap bootstrap;
+
public SimpleUdpSource() {
super();
}
@@ -51,24 +49,20 @@ public class SimpleUdpSource
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void startSource() {
- ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
// setup Netty server
- serverBootstrap = new ConnectionlessBootstrap(
- new NioDatagramChannelFactory(Executors
- .newCachedThreadPool(new NamedThreadFactory("udpSource-nettyWorker"
- + "-threadGroup")), maxThreads));
+ bootstrap = new Bootstrap();
logger.info("Set max workers : {} ;",maxThreads);
- serverBootstrap.setOption("receiveBufferSizePredictorFactory",
- new FixedReceiveBufferSizePredictorFactory(UPD_BUFFER_DEFAULT_SIZE));
- ChannelPipelineFactory fac = this.getChannelPiplineFactory();
- serverBootstrap.setPipelineFactory(fac);
+ bootstrap.channel(NioDatagramChannel.class);
+ bootstrap.option(ChannelOption.SO_RCVBUF, receiveBufferSize);
+ bootstrap.option(ChannelOption.SO_SNDBUF, sendBufferSize);
+ ChannelInitializer fac = this.getChannelInitializerFactory();
+ bootstrap.handler(fac);
try {
if (host == null) {
- nettyChannel =
- ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
+ channelFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
} else {
- nettyChannel =
- ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
+
+ channelFuture = bootstrap.bind(new InetSocketAddress(host, port)).sync();
}
} catch (Exception e) {
logger.error("Simple UDP Source error bind host {} port {}, program will exit!",
@@ -76,7 +70,6 @@ public class SimpleUdpSource
System.exit(-1);
//throw new FlumeException(e.getMessage());
}
- allChannels.add(nettyChannel);
logger.info("Simple UDP Source started at host {}, port {}", host, port);
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/EventLoopUtil.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/EventLoopUtil.java
new file mode 100644
index 0000000..679a30b
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/EventLoopUtil.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollMode;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
+
+public class EventLoopUtil {
+
+ public EventLoopUtil() {
+ }
+
+ public static EventLoopGroup newEventLoopGroup(int nThreads, boolean enableBusyWait, ThreadFactory threadFactory) {
+ if (!Epoll.isAvailable()) {
+ return new NioEventLoopGroup(nThreads, threadFactory);
+ } else if (!enableBusyWait) {
+ return new EpollEventLoopGroup(nThreads, threadFactory);
+ } else {
+ EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory, () -> {
+ return (selectSupplier, hasTasks) -> {
+ return -3;
+ };
+ });
+ return eventLoopGroup;
+ }
+ }
+
+ public static Class<? extends SocketChannel> getClientSocketChannelClass(EventLoopGroup eventLoopGroup) {
+ return eventLoopGroup instanceof EpollEventLoopGroup
+ ? EpollSocketChannel.class : NioSocketChannel.class;
+ }
+
+ public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
+ return eventLoopGroup instanceof EpollEventLoopGroup
+ ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+ }
+
+ public static Class<? extends DatagramChannel> getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
+ return eventLoopGroup instanceof EpollEventLoopGroup
+ ? EpollDatagramChannel.class : NioDatagramChannel.class;
+ }
+
+ public static void enableTriggeredMode(ServerBootstrap bootstrap) {
+ if (Epoll.isAvailable()) {
+ bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
+ }
+
+ }
+
+ public static CompletableFuture<Void> shutdownGracefully(EventLoopGroup eventLoopGroup) {
+ return toCompletableFutureVoid(eventLoopGroup.shutdownGracefully());
+ }
+
+ public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> future) {
+ Objects.requireNonNull(future, "future cannot be null");
+
+ CompletableFuture<Void> adapter = new CompletableFuture<>();
+ if (future.isDone()) {
+ if (future.isSuccess()) {
+ adapter.complete(null);
+ } else {
+ adapter.completeExceptionally(future.cause());
+ }
+ } else {
+ future.addListener(f -> {
+ if (f.isSuccess()) {
+ adapter.complete(null);
+ } else {
+ adapter.completeExceptionally(f.cause());
+ }
+ });
+ }
+ return adapter;
+ }
+}
diff --git a/inlong-dataproxy/pom.xml b/inlong-dataproxy/pom.xml
index 78feae2..3d0e413 100644
--- a/inlong-dataproxy/pom.xml
+++ b/inlong-dataproxy/pom.xml
@@ -42,7 +42,7 @@
<maven.compiler.target>8</maven.compiler.target>
<flume.version>1.9.0</flume.version>
<plugin.assembly.version>3.2.0</plugin.assembly.version>
- <netty.version>3.8.0.Final</netty.version>
+ <netty.version>4.1.72.Final</netty.version>
<codec.version>1.15</codec.version>
<servlet.version>2.5-20110124</servlet.version>
<inlong-common.version>1.3.4</inlong-common.version>
@@ -52,6 +52,8 @@
<guava.version>19.0</guava.version>
<powermock.version>2.0.9</powermock.version>
<simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
+ <log4j2.version>2.17.1</log4j2.version>
+ <slf4j.version>1.7.36</slf4j.version>
</properties>
<dependencies>
@@ -64,6 +66,12 @@
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-node</artifactId>
<version>${flume.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
@@ -77,7 +85,27 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-transport</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
@@ -94,6 +122,12 @@
<groupId>org.apache.inlong</groupId>
<artifactId>tubemq-client</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
@@ -144,6 +178,26 @@
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file