You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/02 01:20:44 UTC
[incubator-inlong] branch master updated: [INLONG-2781][DataProxy] Update netty version to 4.x (#2782)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0d65c1f [INLONG-2781][DataProxy] Update netty version to 4.x (#2782)
0d65c1f is described below
commit 0d65c1f8019347e32d01c04383179f847bc7e0ea
Author: baomingyu <ba...@163.com>
AuthorDate: Wed Mar 2 09:20:37 2022 +0800
[INLONG-2781][DataProxy] Update netty version to 4.x (#2782)
---
inlong-dataproxy/conf/log4j2.xml | 18 ++
.../apache/inlong/dataproxy/sink/PulsarSink.java | 2 +-
.../inlong/dataproxy/source/SourceContext.java | 2 +-
.../source/tcp/InlongTcpChannelHandler.java | 194 +++++++++++----------
.../tcp/InlongTcpChannelPipelineFactory.java | 60 +++----
.../dataproxy/source/tcp/InlongTcpSource.java | 180 +++++--------------
6 files changed, 184 insertions(+), 272 deletions(-)
diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
index 34d2165..d4f70d4 100644
--- a/inlong-dataproxy/conf/log4j2.xml
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -27,6 +27,9 @@
<property name="info_fileName">${basePath}/info.log</property>
<property name="info_filePattern">${basePath}/info-%d{yyyy-MM-dd}-%i.log.gz</property>
<property name="info_max">10</property>
+ <property name="debug_fileName">${basePath}/debug.log</property>
+ <property name="debug_filePattern">${basePath}/debug-%d{yyyy-MM-dd}-%i.log.gz</property>
+ <property name="debug_max">10</property>
<property name="warn_fileName">${basePath}/warn.log</property>
<property name="warn_filePattern">${basePath}/warn-%d{yyyy-MM-dd}-%i.log.gz</property>
<property name="warn_max">10</property>
@@ -46,6 +49,17 @@
<PatternLayout pattern="${log_pattern}"/>
</Console>
+ <RollingFile name="DebugFile" fileName="${debug_fileName}" filePattern="${debug_filePattern}">
+ <PatternLayout pattern="${log_pattern}"/>
+ <SizeBasedTriggeringPolicy size="${every_file_size}"/>
+ <DefaultRolloverStrategy max="${debug_max}" />
+ <Filters>
+ <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="INFO" onMatch="DENY" onMismatch="NEUTRAL"/>
+ <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
+ </Filters>
+ </RollingFile>
+
<RollingFile name="InfoFile" fileName="${info_fileName}" filePattern="${info_filePattern}">
<PatternLayout pattern="${log_pattern}"/>
<SizeBasedTriggeringPolicy size="${every_file_size}"/>
@@ -104,8 +118,12 @@
<logger name="org.apache.inlong.common.monitor.MonitorIndex" level="info" additivity="false">
<appender-ref ref="IndexFile"/>
</logger>
+ <logger name="org.apache.pulsar" level="info" additivity="false">
+ <appender-ref ref="InfoFile"/>
+ </logger>
<root level="${output_log_level}">
<appender-ref ref="Console"/>
+ <appender-ref ref="DebugFile"/>
<appender-ref ref="InfoFile"/>
<appender-ref ref="WarnFile"/>
<appender-ref ref="ErrorFile"/>
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index e5a094b..965c41e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -24,6 +24,7 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
+import io.netty.handler.codec.TooLongFrameException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -53,7 +54,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
index 58abc2d..38bd7ae 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SourceContext.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.source;
+import io.netty.channel.group.ChannelGroup;
import java.util.Date;
import java.util.Map;
import java.util.Timer;
@@ -31,7 +32,6 @@ import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
index 56a34f5..5be334d 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelHandler.java
@@ -17,6 +17,11 @@
package org.apache.inlong.dataproxy.source.tcp;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,21 +34,14 @@ import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResponseInfo;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.ResultCode;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* InlongTcpChannelHandler
*/
-public class InlongTcpChannelHandler extends SimpleChannelHandler {
+public class InlongTcpChannelHandler extends ChannelInboundHandlerAdapter {
public static final Logger LOG = LoggerFactory.getLogger(InlongTcpChannelHandler.class);
public static final int LENGTH_PARAM_OFFSET = 0;
@@ -66,78 +64,64 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
}
/**
- * channelOpen
- *
- * @param ctx
- * @param e
- * @throws Exception
- */
- @Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- if (sourceContext.getAllChannels().size() - 1 >= sourceContext.getMaxConnections()) {
- LOG.warn("refuse to connect , and connections=" + (sourceContext.getAllChannels().size() - 1)
- + ", maxConnections="
- + sourceContext.getMaxConnections() + ",channel is " + e.getChannel());
- e.getChannel().disconnect();
- e.getChannel().close();
- }
- }
-
- /**
- * messageReceived
+ * channelRead
*
* @param ctx
- * @param e
+ * @param msg
* @throws Exception
*/
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.debug("message received");
- if (e == null) {
- LOG.error("get null messageevent, just skip");
- this.addMetric(false, 0, null);
- return;
- }
- ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
- int readableLength = cb.readableBytes();
- if (readableLength == 0) {
- LOG.warn("skip empty msg.");
- cb.clear();
+ if (msg == null) {
+ LOG.error("get null msg, just skip");
this.addMetric(false, 0, null);
return;
}
- if (readableLength > LENGTH_PARAM_LENGTH + VERSION_PARAM_LENGTH + sourceContext.getMaxMsgLength()) {
- this.addMetric(false, 0, null);
- throw new Exception("err msg, MSG_MAX_LENGTH_BYTES "
- + "< readableLength, and readableLength=" + readableLength + ", and MSG_MAX_LENGTH_BYTES="
- + sourceContext.getMaxMsgLength());
- }
- // save index, reset it if buffer is not satisfied.
- cb.markReaderIndex();
- int totalPackLength = cb.readInt();
- cb.resetReaderIndex();
- if (readableLength < totalPackLength + LENGTH_PARAM_LENGTH) {
- // reset index.
- this.addMetric(false, 0, null);
- throw new Exception("err msg, channel buffer is not satisfied, and readableLength="
- + readableLength + ", and totalPackLength=" + totalPackLength);
- }
-
- // read version
- int version = cb.readShort();
- switch (version) {
- case VERSION_1 :
- // decode version 1
- int bodyLength = totalPackLength - VERSION_PARAM_LENGTH;
- decodeVersion1(cb, bodyLength, e);
- break;
- default :
+ ByteBuf cb = (ByteBuf) msg;
+ try {
+ int readableLength = cb.readableBytes();
+ if (readableLength == 0) {
+ LOG.warn("skip empty msg.");
+ cb.clear();
this.addMetric(false, 0, null);
- throw new Exception("err version, unknown version:" + version);
+ return;
+ }
+ if (readableLength > LENGTH_PARAM_LENGTH + VERSION_PARAM_LENGTH + sourceContext.getMaxMsgLength()) {
+ this.addMetric(false, 0, null);
+ throw new Exception("err msg, MSG_MAX_LENGTH_BYTES "
+ + "< readableLength, and readableLength=" + readableLength + ", and MSG_MAX_LENGTH_BYTES="
+ + sourceContext.getMaxMsgLength());
+ }
+ // save index, reset it if buffer is not satisfied.
+ cb.markReaderIndex();
+ int totalPackLength = cb.readInt();
+ cb.resetReaderIndex();
+ if (readableLength < totalPackLength + LENGTH_PARAM_LENGTH) {
+ // reset index.
+ this.addMetric(false, 0, null);
+ throw new Exception("err msg, channel buffer is not satisfied, and readableLength="
+ + readableLength + ", and totalPackLength=" + totalPackLength);
+ }
+
+ // read version
+ int version = cb.readShort();
+ switch (version) {
+ case VERSION_1 :
+ // decode version 1
+ int bodyLength = totalPackLength - VERSION_PARAM_LENGTH;
+ decodeVersion1(ctx, cb, bodyLength);
+ break;
+ default :
+ this.addMetric(false, 0, null);
+ throw new Exception("err version, unknown version:" + version);
+ }
+ } finally {
+ cb.release();
}
}
- private void decodeVersion1(ChannelBuffer cb, int bodyLength, MessageEvent e) throws Exception {
+ private void decodeVersion1(ChannelHandlerContext ctx, ByteBuf cb, int bodyLength) throws Exception {
// read bytes
byte[] msgBytes = new byte[bodyLength];
cb.readBytes(msgBytes);
@@ -157,11 +141,11 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
} catch (Throwable ex) {
LOG.error("Process Controller Event error can't write event to channel.", ex);
this.addMetric(false, event.getBody().length, event);
- this.responsePackage(ResultCode.ERR_REJECT, e);
+ this.responsePackage(ctx, ResultCode.ERR_REJECT);
return;
}
}
- this.responsePackage(ResultCode.SUCCUSS, e);
+ this.responsePackage(ctx, ResultCode.SUCCUSS);
}
/**
@@ -191,12 +175,12 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
/**
* responsePackage
- *
+ *
+ * @param ctx
* @param code
- * @param e
* @throws Exception
*/
- private void responsePackage(ResultCode code, MessageEvent e)
+ private void responsePackage(ChannelHandlerContext ctx, ResultCode code)
throws Exception {
ResponseInfo.Builder builder = ResponseInfo.newBuilder();
builder.setResult(code);
@@ -204,15 +188,15 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
// encode
byte[] responseBytes = builder.build().toByteArray();
//
- ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(responseBytes);
-
- Channel remoteChannel = e.getChannel();
+ ByteBuf buffer = Unpooled.wrappedBuffer(responseBytes);
+ Channel remoteChannel = ctx.channel();
if (remoteChannel.isWritable()) {
- remoteChannel.write(buffer, e.getRemoteAddress());
+ remoteChannel.write(buffer);
} else {
LOG.warn(
"the send buffer2 is full, so disconnect it!please check remote client"
+ "; Connection info:" + remoteChannel);
+ buffer.release();
throw new Exception(
"the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
+ remoteChannel);
@@ -223,41 +207,59 @@ public class InlongTcpChannelHandler extends SimpleChannelHandler {
* exceptionCaught
*
* @param ctx
- * @param e
+ * @param cause
* @throws Exception
*/
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- LOG.error("exception caught", e.getCause());
- super.exceptionCaught(ctx, e);
- if (e.getChannel() != null) {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.error("exception caught cause = {}", cause);
+ ctx.fireExceptionCaught(cause);
+ if (ctx.channel() != null) {
try {
- e.getChannel().disconnect();
- e.getChannel().close();
+ ctx.channel().disconnect();
+ ctx.channel().close();
} catch (Exception ex) {
LOG.error("Close connection error!", ex);
}
- sourceContext.getAllChannels().remove(e.getChannel());
+ sourceContext.getAllChannels().remove(ctx.channel());
}
}
/**
- * channelClosed
- *
- * @param ctx
- * @param e
+ * channelInactive
+ *
+ * @param ctx
* @throws Exception
*/
@Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("Connection to {} disconnected.", e.getChannel());
- super.channelClosed(ctx, e);
+ public void channelInactive(ChannelHandlerContext ctx) {
+ LOG.debug("Connection to {} disconnected.", ctx.channel());
+ ctx.fireChannelInactive();
try {
- e.getChannel().disconnect();
- e.getChannel().close();
+ ctx.channel().disconnect();
+ ctx.channel().close();
} catch (Exception ex) {
- //
+ LOG.error("channelInactive has exception e = {}", ex);
+ }
+ sourceContext.getAllChannels().remove(ctx.channel());
+
+ }
+
+ /**
+ * channelActive
+ *
+ * @param ctx
+ * @throws Exception
+ */
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ if (sourceContext.getAllChannels().size() - 1 >= sourceContext.getMaxConnections()) {
+ LOG.warn("refuse to connect , and connections="
+ + (sourceContext.getAllChannels().size() - 1)
+ + ", maxConnections="
+ + sourceContext.getMaxConnections() + ",channel is " + ctx.channel());
+ ctx.channel().disconnect();
+ ctx.channel().close();
}
- sourceContext.getAllChannels().remove(e.getChannel());
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java
index 77333a2..30a3bb7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpChannelPipelineFactory.java
@@ -17,6 +17,11 @@
package org.apache.inlong.dataproxy.source.tcp;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.ReadTimeoutHandler;
import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
@@ -25,14 +30,6 @@ import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.source.SourceContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +38,8 @@ import com.google.common.base.Preconditions;
/**
* InlongTcpChannelPipelineFactory
*/
-public class InlongTcpChannelPipelineFactory implements ChannelPipelineFactory, Configurable {
+public class InlongTcpChannelPipelineFactory extends ChannelInitializer<SocketChannel>
+ implements Configurable {
public static final Logger LOG = LoggerFactory.getLogger(InlongTcpChannelPipelineFactory.class);
public static final int DEFAULT_LENGTH_FIELD_OFFSET = 0;
@@ -52,54 +50,48 @@ public class InlongTcpChannelPipelineFactory implements ChannelPipelineFactory,
private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
private SourceContext sourceContext;
private String messageHandlerName;
- private Timer timer = new HashedWheelTimer();
+ private String protocolType;
/**
* get server factory
*
* @param sourceContext
*/
- public InlongTcpChannelPipelineFactory(SourceContext sourceContext) {
+ public InlongTcpChannelPipelineFactory(SourceContext sourceContext, String protocolType) {
this.sourceContext = sourceContext;
+ this.protocolType = protocolType;
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline cp = Channels.pipeline();
- return addMessageHandlersTo(cp);
- }
+ protected void initChannel(SocketChannel ch) {
- /**
- * get message handlers
- *
- * @param cp
- * @return
- */
- @SuppressWarnings("unchecked")
- public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
- cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
- sourceContext.getMaxMsgLength(), DEFAULT_LENGTH_FIELD_OFFSET, DEFAULT_LENGTH_FIELD_LENGTH,
- DEFAULT_LENGTH_ADJUSTMENT, DEFAULT_INITIAL_BYTES_TO_STRIP, DEFAULT_FAIL_FAST));
- cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
- DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+ if (StringUtils.isEmpty(protocolType) || this.protocolType
+ .equalsIgnoreCase(ConfigConstants.TCP_PROTOCOL)) {
+ ch.pipeline().addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
+ sourceContext.getMaxMsgLength(), DEFAULT_LENGTH_FIELD_OFFSET,
+ DEFAULT_LENGTH_FIELD_LENGTH,
+ DEFAULT_LENGTH_ADJUSTMENT, DEFAULT_INITIAL_BYTES_TO_STRIP, DEFAULT_FAIL_FAST));
+ ch.pipeline().addLast("readTimeoutHandler",
+ new ReadTimeoutHandler(DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+ }
if (sourceContext.getSource().getChannelProcessor() != null) {
try {
- Class<? extends SimpleChannelHandler> clazz = (Class<? extends SimpleChannelHandler>) Class
+ Class<? extends ChannelInboundHandlerAdapter> clazz =
+ (Class<? extends ChannelInboundHandlerAdapter>) Class
.forName(messageHandlerName);
Constructor<?> ctor = clazz.getConstructor(SourceContext.class);
- SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
+ ChannelInboundHandlerAdapter messageHandler = (ChannelInboundHandlerAdapter) ctor
.newInstance(sourceContext);
- cp.addLast("messageHandler", messageHandler);
+ ch.pipeline().addLast("messageHandler", messageHandler);
} catch (Exception e) {
- LOG.error("SimpleChannelHandler.newInstance has error:" + sourceContext.getSource().getName(), e);
+ LOG.error("SimpleChannelHandler.newInstance has error:"
+ + sourceContext.getSource().getName(), e);
}
}
-
- return cp;
}
@Override
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java
index d6658d0..ed9d98a 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.java
@@ -17,28 +17,17 @@
package org.apache.inlong.dataproxy.source.tcp;
+import io.netty.channel.ChannelInitializer;
import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
-import org.apache.flume.source.AbstractSource;
-import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.source.SimpleTcpSource;
import org.apache.inlong.dataproxy.source.SourceContext;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.ThreadRenamingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,32 +36,16 @@ import com.google.common.base.Preconditions;
/**
* Inlong tcp source
*/
-public class InlongTcpSource extends AbstractSource implements Configurable, EventDrivenSource {
+public class InlongTcpSource extends SimpleTcpSource
+ implements Configurable, EventDrivenSource {
public static final Logger LOG = LoggerFactory.getLogger(InlongTcpSource.class);
- public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 64 * 1024;
- public static final int MAX_RECEIVE_BUFFER_SIZE = 16 * 1024 * 1024;
- public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
- public static final int MAX_SEND_BUFFER_SIZE = 16 * 1024 * 1024;
- protected ChannelGroup allChannels;
protected SourceContext sourceContext;
- protected ServerBootstrap serverBootstrap = null;
- protected int port;
- protected String host = null;
protected String msgFactoryName;
protected String messageHandlerName;
- private boolean tcpNoDelay = true;
- private boolean keepAlive = true;
- private int receiveBufferSize;
- private int highWaterMark;
- private int sendBufferSize;
- private int trafficClass;
-
- private Channel nettyChannel = null;
-
private Configurable pipelineFactoryConfigurable = null;
/**
@@ -80,7 +53,6 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
*/
public InlongTcpSource() {
super();
- allChannels = new DefaultChannelGroup();
}
/**
@@ -88,71 +60,8 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public synchronized void start() {
- try {
- LOG.info("start " + this.getName());
-
- ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
- ChannelFactory factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(
- new NamedThreadFactory("tcpSource-nettyBoss-threadGroup")),
- 1,
- Executors.newCachedThreadPool(
- new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")),
- this.sourceContext.getMaxThreads());
- LOG.info("Set max workers : {} ;", this.sourceContext.getMaxThreads());
- ChannelPipelineFactory pipelineFactory = null;
-
- serverBootstrap = new ServerBootstrap(factory);
- serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
- serverBootstrap.setOption("child.keepAlive", keepAlive);
- serverBootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
- serverBootstrap.setOption("child.sendBufferSize", sendBufferSize);
- serverBootstrap.setOption("child.trafficClass", trafficClass);
- serverBootstrap.setOption("child.writeBufferHighWaterMark", highWaterMark);
- LOG.info("load msgFactory=" + msgFactoryName
- + " and messageHandlerName=" + messageHandlerName);
- try {
- Class<? extends ChannelPipelineFactory> clazz = (Class<? extends ChannelPipelineFactory>) Class
- .forName(msgFactoryName);
-
- Constructor ctor = clazz.getConstructor(SourceContext.class);
-
- LOG.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
- pipelineFactory = (ChannelPipelineFactory) ctor.newInstance(sourceContext);
- if (pipelineFactory instanceof Configurable) {
- this.pipelineFactoryConfigurable = ((Configurable) pipelineFactory);
- this.pipelineFactoryConfigurable.configure(sourceContext.getParentContext());
- }
- } catch (Exception e) {
- LOG.error(
- "Inlong Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}",
- msgFactoryName, e);
- stop();
- throw new FlumeException(e.getMessage(), e);
- }
-
- serverBootstrap.setPipelineFactory(pipelineFactory);
-
- try {
- if (host == null) {
- nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
- } else {
- nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
- }
- } catch (Exception e) {
- LOG.error("Inlong TCP Source error bind host {} port {},program will exit!", host,
- port);
- System.exit(-1);
- }
-
- allChannels.add(nettyChannel);
-
- LOG.info("Inlong TCP Source started at host {}, port {}", host, port);
- super.start();
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- }
+ public synchronized void startSource() {
+ super.startSource();
}
/**
@@ -161,28 +70,6 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
@Override
public synchronized void stop() {
LOG.info("[STOP SOURCE]{} stopping...", super.toString());
- if (allChannels != null && !allChannels.isEmpty()) {
- try {
- allChannels.unbind().awaitUninterruptibly();
- allChannels.close().awaitUninterruptibly();
- } catch (Exception e) {
- LOG.warn("Inlong TCP Source netty server stop ex", e);
- } finally {
- allChannels.clear();
- }
- }
-
- if (serverBootstrap != null) {
- try {
- serverBootstrap.releaseExternalResources();
- } catch (Exception e) {
- LOG.warn("Inlong TCP Source serverBootstrap stop ex ", e);
- } finally {
- serverBootstrap = null;
- }
- }
-
- LOG.info("[STOP SOURCE]{} stopped", super.getName());
super.stop();
}
@@ -195,32 +82,13 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
public void configure(Context context) {
try {
LOG.info("context is {}", context);
+ super.configure(context);
this.sourceContext = new SourceContext(this, allChannels, context);
// start
this.sourceContext.start();
- tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true);
- keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true);
- highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, DEFAULT_RECEIVE_BUFFER_SIZE);
- receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, DEFAULT_RECEIVE_BUFFER_SIZE);
- if (receiveBufferSize > MAX_RECEIVE_BUFFER_SIZE) {
- receiveBufferSize = MAX_RECEIVE_BUFFER_SIZE;
- }
- Preconditions.checkArgument(receiveBufferSize > 0, "receiveBufferSize must be > 0");
-
- sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, DEFAULT_SEND_BUFFER_SIZE);
- if (sendBufferSize > MAX_SEND_BUFFER_SIZE) {
- sendBufferSize = MAX_SEND_BUFFER_SIZE;
- }
- Preconditions.checkArgument(sendBufferSize > 0, "sendBufferSize must be > 0");
-
- trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, 0);
- Preconditions.checkArgument((trafficClass == 0 || trafficClass == 96),
- "trafficClass must be == 0 or == 96");
-
msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME,
- InlongTcpChannelPipelineFactory.class.getName());
- msgFactoryName = msgFactoryName.trim();
+ InlongTcpChannelPipelineFactory.class.getName()).trim();
Preconditions.checkArgument(StringUtils.isNotBlank(msgFactoryName),
"msgFactoryName is empty");
@@ -237,4 +105,36 @@ public class InlongTcpSource extends AbstractSource implements Configurable, Eve
LOG.error(t.getMessage(), t);
}
}
+
+ public ChannelInitializer getChannelInitializerFactory() {
+ LOG.info(new StringBuffer("load msgFactory=").append(msgFactoryName)
+ .append(" and serviceDecoderName=").append(serviceDecoderName).toString());
+
+ ChannelInitializer fac = null;
+ try {
+ Class<? extends ChannelInitializer> clazz =
+ (Class<? extends ChannelInitializer>) Class.forName(msgFactoryName);
+
+ Constructor ctor = clazz.getConstructor(SourceContext.class, String.class);
+
+ LOG.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
+ fac = (ChannelInitializer) ctor.newInstance(sourceContext,
+ this.getProtocolName());
+ if (fac instanceof Configurable) {
+ this.pipelineFactoryConfigurable = ((Configurable) fac);
+ this.pipelineFactoryConfigurable.configure(sourceContext.getParentContext());
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Inlong Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}",
+ msgFactoryName, e);
+ stop();
+ throw new FlumeException(e.getMessage(), e);
+ }
+ return fac;
+ }
+
+ public String getProtocolName() {
+ return "tcp";
+ }
}