You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/21 01:22:11 UTC
[incubator-inlong] branch master updated: [INLONG-1950][DataProxy] DataProxy add supporting to udp protocol for reporting data (#2185)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 897c895 [INLONG-1950][DataProxy] DataProxy add supporting to udp protocol for reporting data (#2185)
897c895 is described below
commit 897c8958385f33a9ebb25e5cfb8cc1d5388e9e38
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Jan 21 09:22:03 2022 +0800
[INLONG-1950][DataProxy] DataProxy add supporting to udp protocol for reporting data (#2185)
---
.../apache/inlong/dataproxy/source/BaseSource.java | 283 +++++++++++++++++++++
.../inlong/dataproxy/source/ConfStringUtils.java | 48 ++++
.../dataproxy/source/DefaultServiceDecoder.java | 46 +++-
.../dataproxy/source/ServerMessageFactory.java | 65 +++--
.../dataproxy/source/ServerMessageHandler.java | 195 ++++++++------
.../inlong/dataproxy/source/ServiceDecoder.java | 3 +-
.../dataproxy/source/SimpleMessageHandler.java | 14 +-
.../inlong/dataproxy/source/SimpleTcpSource.java | 196 ++++----------
.../inlong/dataproxy/source/SimpleUdpSource.java | 103 ++++++++
.../inlong/dataproxy/source/UdpSourceTest.java | 43 ++++
10 files changed, 740 insertions(+), 256 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
new file mode 100644
index 0000000..5ac6121
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.commons.monitor.MonitorIndex;
+import org.apache.inlong.commons.monitor.MonitorIndexExt;
+import org.jboss.netty.bootstrap.Bootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * source base clase
+ *
+ */
+public abstract class BaseSource
+ extends AbstractSource
+ implements EventDrivenSource, Configurable {
+ private static final Logger logger = LoggerFactory.getLogger(BaseSource.class);
+
+ protected Context context;
+
+ protected int port;
+
+ protected String host = null;
+
+ protected String msgFactoryName;
+
+ protected String serviceDecoderName;
+
+ protected String messageHandlerName;
+
+ protected int maxMsgLength;
+
+ protected boolean isCompressed;
+
+ protected String topic;
+
+ protected String attr;
+
+ protected boolean filterEmptyMsg;
+
+ private int statIntervalSec;
+
+ protected int pkgTimeoutSec;
+
+ protected int maxConnections = Integer.MAX_VALUE;
+
+ private static final String CONNECTIONS = "connections";
+
+ protected boolean customProcessor = false;
+
+ /*
+ * monitor
+ */
+ private MonitorIndex monitorIndex;
+
+ private MonitorIndexExt monitorIndexExt;
+
+ /*
+ * netty server
+ */
+
+ protected Bootstrap serverBootstrap = null;
+
+ protected ChannelGroup allChannels;
+
+ protected Channel nettyChannel = null;
+
+ private static String HOST_DEFAULT_VALUE = "0.0.0.0";
+
+ private static int maxMonitorCnt = 300000;
+
+ private static int DEFAULT_MAX_CONNECTIONS = 5000;
+
+ private static int STAT_INTERVAL_MUST_THAN = 0;
+
+ private static int PKG_TIMEOUT_DEFAULT_SEC = 3;
+
+ private static int MSG_MIN_LENGTH = 4;
+
+ private static int MAX_MSG_DEFAULT_LENGTH = 1024 * 64;
+
+ private static int INTERVAL_SEC = 60;
+
+ protected static int DEFAULT_MAX_THREADS = 32;
+
+ protected int maxThreads = 32;
+
+ public BaseSource() {
+ super();
+ allChannels = new DefaultChannelGroup();
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ /*
+ * init monitor logic
+ */
+ monitorIndex = new MonitorIndex("Source",INTERVAL_SEC, maxMonitorCnt);
+ monitorIndexExt = new MonitorIndexExt("DataProxy_monitors#"
+ + this.getProtocolName(),INTERVAL_SEC, maxMonitorCnt);
+ startSource();
+ }
+
+ @Override
+ public synchronized void stop() {
+ logger.info("[STOP {} SOURCE]{} stopping...", this.getProtocolName(), this.getName());
+ if (!allChannels.isEmpty()) {
+ try {
+ allChannels.close().awaitUninterruptibly();
+ } catch (Exception e) {
+ logger.warn("Simple UDP Source netty server stop ex, {}", e);
+ } finally {
+ allChannels.clear();
+ }
+ }
+
+ if (serverBootstrap != null) {
+ try {
+ serverBootstrap.releaseExternalResources();
+ } catch (Exception e) {
+ logger.warn("Simple UDP Source serverBootstrap stop ex {}", e);
+ } finally {
+ serverBootstrap = null;
+ }
+ }
+ super.stop();
+ if (monitorIndex != null) {
+ monitorIndex.shutDown();
+ }
+ if (monitorIndexExt != null) {
+ monitorIndexExt.shutDown();
+ }
+ logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(), this.getName());
+ }
+
+ @Override
+ public void configure(Context context) {
+ this.context = context;
+
+ port = context.getInteger(ConfigConstants.CONFIG_PORT);
+
+ host = context.getString(ConfigConstants.CONFIG_HOST, HOST_DEFAULT_VALUE);
+
+ Configurables.ensureRequiredNonNull(context, ConfigConstants.CONFIG_PORT);
+
+ Preconditions.checkArgument(ConfStringUtils.isValidIp(host), "ip config not valid");
+ Preconditions.checkArgument(ConfStringUtils.isValidPort(port), "port config not valid");
+
+ msgFactoryName =
+ context.getString(ConfigConstants.MSG_FACTORY_NAME,
+ "org.apache.inlong.dataproxy.source.ServerMessageFactory");
+ msgFactoryName = msgFactoryName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(msgFactoryName),
+ "msgFactoryName is empty");
+
+ serviceDecoderName =
+ context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME,
+ "org.apache.inlong.dataproxy.source.DefaultServiceDecoder");
+ serviceDecoderName = serviceDecoderName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(serviceDecoderName),
+ "serviceProcessorName is empty");
+
+ messageHandlerName =
+ context.getString(ConfigConstants.MESSAGE_HANDLER_NAME,
+ "org.apache.inlong.dataproxy.source.ServerMessageHandler");
+ messageHandlerName = messageHandlerName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(messageHandlerName),
+ "messageHandlerName is empty");
+
+ maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, MAX_MSG_DEFAULT_LENGTH);
+ Preconditions.checkArgument(
+ (maxMsgLength >= MSG_MIN_LENGTH && maxMsgLength <= ConfigConstants.MSG_MAX_LENGTH_BYTES),
+ "maxMsgLength must be >= 4 and <= " + ConfigConstants.MSG_MAX_LENGTH_BYTES);
+ isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true);
+
+ filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
+
+ topic = context.getString(ConfigConstants.TOPIC);
+ attr = context.getString(ConfigConstants.ATTR);
+ Configurables.ensureRequiredNonNull(context, ConfigConstants.TOPIC, ConfigConstants.ATTR);
+
+ topic = topic.trim();
+ Preconditions.checkArgument(!topic.isEmpty(), "topic is empty");
+ attr = attr.trim();
+ Preconditions.checkArgument(!attr.isEmpty(), "attr is empty");
+
+ filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
+
+ statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, INTERVAL_SEC);
+ Preconditions.checkArgument((statIntervalSec >= STAT_INTERVAL_MUST_THAN), "statIntervalSec must be >= 0");
+
+ pkgTimeoutSec = context.getInteger(ConfigConstants.PACKAGE_TIMEOUT_SEC, PKG_TIMEOUT_DEFAULT_SEC);
+
+ try {
+ maxConnections = context.getInteger(CONNECTIONS, DEFAULT_MAX_CONNECTIONS);
+ } catch (NumberFormatException e) {
+ logger.warn("BaseSource\'s \"connections\" property must specify an integer value.",
+ context.getString(CONNECTIONS));
+ }
+
+ try {
+ maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, DEFAULT_MAX_THREADS);
+ } catch (NumberFormatException e) {
+ logger.warn("Simple TCP Source max-threads property must specify an integer value. {}",
+ context.getString(ConfigConstants.MAX_THREADS));
+ }
+
+ this.customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false);
+ }
+
+ /**
+ * channel factory
+ * @return
+ */
+ public ChannelPipelineFactory getChannelPiplineFactory() {
+ logger.info(new StringBuffer("load msgFactory=").append(msgFactoryName)
+ .append(" and serviceDecoderName=").append(serviceDecoderName).toString());
+ ChannelPipelineFactory fac = null;
+ try {
+ ServiceDecoder serviceDecoder = (ServiceDecoder)Class.forName(serviceDecoderName).newInstance();
+ Class<? extends ChannelPipelineFactory> clazz =
+ (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
+ Constructor ctor = clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
+ String.class, ServiceDecoder.class, String.class, Integer.class,
+ String.class, String.class, Boolean.class,
+ Integer.class, Boolean.class, MonitorIndex.class,
+ MonitorIndexExt.class, String.class);
+ logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
+ fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
+ this.getProtocolName(), serviceDecoder, messageHandlerName, maxMsgLength,
+ topic, attr, filterEmptyMsg,
+ maxConnections, isCompressed, monitorIndex,
+ monitorIndexExt, this.getProtocolName());
+ } catch (Exception e) {
+ logger.error(
+ "Simple {} Source start error, fail to construct ChannelPipelineFactory with name "
+ + "{}, ex {}",this.getProtocolName(), msgFactoryName, e);
+ stop();
+ throw new FlumeException(e.getMessage());
+ }
+ return fac;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public abstract String getProtocolName();
+
+ public abstract void startSource();
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ConfStringUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ConfStringUtils.java
new file mode 100644
index 0000000..618cde6
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ConfStringUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+public class ConfStringUtils {
+ public static boolean isValidIp(String ip) {
+ if (ip == null || ip.trim().isEmpty()) {
+ return false;
+ }
+ boolean b = false;
+ ip = ip.trim();
+ if (ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
+ String[] s = ip.split("\\.");
+ if (Integer.parseInt(s[0]) < 255) {
+ if (Integer.parseInt(s[1]) < 255) {
+ if (Integer.parseInt(s[2]) < 255) {
+ if (Integer.parseInt(s[3]) < 255) {
+ b = true;
+ }
+ }
+ }
+ }
+ }
+ return b;
+ }
+
+ public static boolean isValidPort(int port) {
+ if (port < 0 || port > 65535) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index 28c4681..d2c7ff6 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -36,6 +36,7 @@ import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
@@ -76,13 +77,14 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* @param resultMap
* @param cb
* @param channel
+ * @param msgEvent
* @param totalDataLen
* @return
* @throws
*/
private Map<String, Object> extractNewBinHB(Map<String, Object> resultMap,
- ChannelBuffer cb, Channel channel,
- int totalDataLen) throws Exception {
+ ChannelBuffer cb, Channel channel,
+ MessageEvent msgEvent, int totalDataLen) throws Exception {
int msgHeadPos = cb.readerIndex() - 5;
// check validation
@@ -95,7 +97,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
+ attrLen + BIN_HB_FORMAT_SIZE)) || (msgMagic != BIN_MSG_MAGIC)) {
LOG.error("err msg, bodyLen + attrLen > totalDataLen, "
- + "and bodyLen={},attrLen={},totalDataLen={},magic={};Connection info:{}",
+ + "and bodyLen={},attrLen={},totalDataLen={},magic={};Connection "
+ + "info:{}",
bodyLen, attrLen, totalDataLen, Integer.toHexString(msgMagic), channel.toString());
return resultMap;
@@ -116,7 +119,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
private void handleDateTime(Map<String, String> commonAttrMap, Channel channel,
- long uniq, long dataTime, int msgCount) {
+ MessageEvent msgEvent,
+ long uniq, long dataTime, int msgCount) {
commonAttrMap.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
String time = "";
if (commonAttrMap.containsKey(ConfigConstants.PKG_TIME_KEY)) {
@@ -126,7 +130,16 @@ public class DefaultServiceDecoder implements ServiceDecoder {
time = String.valueOf(dataTime);
}
StringBuilder sidBuilder = new StringBuilder();
- sidBuilder.append(channel.getRemoteAddress().toString()).append("#").append(time)
+ /*
+ * udp need use msgEvent get remote address
+ */
+ String remoteAddress = "";
+ if (channel != null && channel.getRemoteAddress() != null) {
+ remoteAddress = channel.getRemoteAddress().toString();
+ } else if (msgEvent != null && msgEvent.getRemoteAddress() != null) {
+ remoteAddress = msgEvent.getRemoteAddress().toString();
+ }
+ sidBuilder.append(remoteAddress).append("#").append(time)
.append("#").append(uniq);
commonAttrMap.put(AttributeConstants.SEQUENCE_ID, new String(sidBuilder));
@@ -224,14 +237,15 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* @param resultMap
* @param cb
* @param channel
+ * @param msgEvent
* @param totalDataLen
* @param msgType
* @return
* @throws Exception
*/
private Map<String, Object> extractNewBinData(Map<String, Object> resultMap,
- ChannelBuffer cb, Channel channel,
- int totalDataLen, MsgType msgType) throws Exception {
+ ChannelBuffer cb, Channel channel, MessageEvent msgEvent,
+ int totalDataLen, MsgType msgType) throws Exception {
int msgHeadPos = cb.readerIndex() - 5;
int bodyLen = cb.getInt(msgHeadPos + BIN_MSG_BODYLEN_OFFSET);
@@ -292,7 +306,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
try {
- handleDateTime(commonAttrMap, channel, uniq, dataTime, msgCount);
+ handleDateTime(commonAttrMap, channel, msgEvent, uniq, dataTime, msgCount);
final boolean index = handleExtMap(commonAttrMap, cb, resultMap, extendField, msgHeadPos);
ByteBuffer dataBuf = handleTrace(channel, cb, extendField, msgHeadPos,
totalDataLen, attrLen, strAttr, bodyLen);
@@ -330,6 +344,7 @@ public class DefaultServiceDecoder implements ServiceDecoder {
resultMap.put(ConfigConstants.MSG_LIST, msgList);
}
} catch (Exception ex) {
+ LOG.error("extractNewBinData has error! ex = {}", ex);
cb.clear();
throw new MessageIDException(uniq, ErrorCode.OTHER_ERROR, ex.getCause());
}
@@ -343,13 +358,15 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* @param cb
* @param channel
* @param totalDataLen
+ * @param msgEvent
* @param msgType
* @return
* @throws Exception
*/
private Map<String, Object> extractDefaultData(Map<String, Object> resultMap,
- ChannelBuffer cb, Channel channel,
- int totalDataLen, MsgType msgType) throws Exception {
+ ChannelBuffer cb, Channel channel,
+ MessageEvent msgEvent,
+ int totalDataLen, MsgType msgType) throws Exception {
int bodyLen = cb.readInt();
if (bodyLen == 0) {
throw new Exception(new Throwable("err msg, bodyLen is empty" + ";"
@@ -464,7 +481,8 @@ public class DefaultServiceDecoder implements ServiceDecoder {
* +--------+--------+--------+----------------+--------+----------------+------------------------+
*/
@Override
- public Map<String, Object> extractData(ChannelBuffer cb, Channel channel) throws Exception {
+ public Map<String, Object> extractData(ChannelBuffer cb, Channel channel,
+ MessageEvent msgEvent) throws Exception {
Map<String, Object> resultMap = new HashMap<String, Object>();
if (null == cb) {
LOG.error("cb == null");
@@ -491,14 +509,14 @@ public class DefaultServiceDecoder implements ServiceDecoder {
}
// if it's bin heart beat.
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
- return extractNewBinHB(resultMap, cb, channel, totalDataLen);
+ return extractNewBinHB(resultMap, cb, channel, msgEvent, totalDataLen);
}
if (msgType.getValue() >= MsgType.MSG_BIN_MULTI_BODY.getValue()) {
resultMap.put(ConfigConstants.COMPRESS_TYPE, (compressType != 0) ? "snappy" : "");
- return extractNewBinData(resultMap, cb, channel, totalDataLen, msgType);
+ return extractNewBinData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
} else {
- return extractDefaultData(resultMap, cb, channel, totalDataLen, msgType);
+ return extractDefaultData(resultMap, cb, channel, msgEvent, totalDataLen, msgType);
}
} else {
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index 69c4856..fa42169 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -19,10 +19,11 @@ package org.apache.inlong.dataproxy.source;
import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
-
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.commons.monitor.MonitorIndex;
+import org.apache.inlong.commons.monitor.MonitorIndexExt;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
@@ -40,21 +41,47 @@ import org.slf4j.LoggerFactory;
public class ServerMessageFactory implements ChannelPipelineFactory {
private static final Logger LOG = LoggerFactory.getLogger(ServerMessageFactory.class);
+
private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
+
+ private static long MAX_CHANNEL_MEMORY_SIZE = 1024 * 1024;
+
+ private static long MAX_TOTAL_MEMORY_SIZE = 1024 * 1024;
+
+ private static int MSG_LENGTH_LEN = 4;
+
private AbstractSource source;
+
private ChannelProcessor processor;
+
private ChannelGroup allChannels;
+
private ExecutionHandler executionHandler;
+
private String protocolType;
- private ServiceDecoder serviceProcessor;
+
+ private ServiceDecoder serviceDecoder;
+
private String messageHandlerName;
+
private int maxConnections = Integer.MAX_VALUE;
+
private int maxMsgLength;
+
private boolean isCompressed;
+
private String name;
+
private String topic;
+
private String attr;
+
private boolean filterEmptyMsg;
+
+ private MonitorIndex monitorIndex;
+
+ private MonitorIndexExt monitorIndexExt;
+
private Timer timer = new HashedWheelTimer();
/**
@@ -63,21 +90,22 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
* @param source
* @param allChannels
* @param protocol
- * @param serProcessor
+ * @param serviceDecoder
* @param messageHandlerName
- * @param maxMsgLength
* @param topic
* @param attr
* @param filterEmptyMsg
* @param maxCons
* @param isCompressed
+ * @param monitorIndex
+ * @param monitorIndexExt
* @param name
*/
- public ServerMessageFactory(AbstractSource source,
- ChannelGroup allChannels, String protocol, ServiceDecoder serProcessor,
- String messageHandlerName, Integer maxMsgLength,
- String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
- Boolean isCompressed, String name) {
+ public ServerMessageFactory(AbstractSource source, ChannelGroup allChannels, String protocol,
+ ServiceDecoder serviceDecoder, String messageHandlerName, Integer maxMsgLength,
+ String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
+ Boolean isCompressed, MonitorIndex monitorIndex, MonitorIndexExt monitorIndexExt,
+ String name) {
this.source = source;
this.processor = source.getChannelProcessor();
this.allChannels = allChannels;
@@ -86,17 +114,18 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
this.filterEmptyMsg = filterEmptyMsg;
int cores = Runtime.getRuntime().availableProcessors();
this.protocolType = protocol;
- this.serviceProcessor = serProcessor;
+ this.serviceDecoder = serviceDecoder;
this.messageHandlerName = messageHandlerName;
this.name = name;
this.maxConnections = maxCons;
this.maxMsgLength = maxMsgLength;
this.isCompressed = isCompressed;
-
+ this.monitorIndex = monitorIndex;
+ this.monitorIndexExt = monitorIndexExt;
if (protocolType.equalsIgnoreCase(ConfigConstants.UDP_PROTOCOL)) {
this.executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(cores * 2,
- 1024 * 1024, 1024 * 1024));
+ MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE));
}
}
@@ -116,7 +145,7 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
if (this.protocolType
.equalsIgnoreCase(ConfigConstants.TCP_PROTOCOL)) {
cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
- this.maxMsgLength, 0, 4, 0, 0, true));
+ this.maxMsgLength, 0, MSG_LENGTH_LEN, 0, 0, true));
cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
}
@@ -128,12 +157,14 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
Constructor<?> ctor = clazz.getConstructor(
AbstractSource.class, ServiceDecoder.class, ChannelGroup.class,
- String.class, String.class, Boolean.class, Integer.class,
- Integer.class, Boolean.class, String.class);
+ String.class, String.class, Boolean.class,
+ Integer.class, Boolean.class, MonitorIndex.class,
+ MonitorIndexExt.class, String.class);
SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
- .newInstance(source, serviceProcessor, allChannels, topic, attr,
- filterEmptyMsg, maxMsgLength, maxConnections, isCompressed, protocolType
+ .newInstance(source, serviceDecoder, allChannels, topic, attr,
+ filterEmptyMsg, maxConnections,
+ isCompressed, monitorIndex, monitorIndexExt, protocolType
);
cp.addLast("messageHandler", messageHandler);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 8925916..2ab09f5 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -22,6 +22,8 @@ import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA
import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -32,7 +34,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
@@ -48,6 +49,9 @@ import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.commons.monitor.MonitorIndex;
+import org.apache.inlong.commons.monitor.MonitorIndexExt;
+import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -60,9 +64,6 @@ import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-
/**
* Server message handler
*
@@ -71,10 +72,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
+
private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
+
private static final ConfigManager configManager = ConfigManager.getInstance();
+
private static final Joiner.MapJoiner mapJoiner = Joiner.on(AttributeConstants.SEPARATOR)
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
private static final Splitter.MapSplitter mapSplitter = Splitter
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
@@ -86,6 +91,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
return new SimpleDateFormat("yyyyMMddHHmm");
}
};
+
private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer =
new ThreadLocal<SimpleDateFormat>() {
@Override
@@ -94,40 +100,43 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
};
private AbstractSource source;
+
private final ChannelGroup allChannels;
+
private int maxConnections = Integer.MAX_VALUE;
+
private boolean filterEmptyMsg = false;
+
private final boolean isCompressed;
+
private final ChannelProcessor processor;
- private final ServiceDecoder serviceProcessor;
+
+ private final ServiceDecoder serviceDecoder;
+
private final String defaultTopic;
+
private String defaultMXAttr = "m=3";
+
private final ChannelBuffer heartbeatBuffer;
+
private final String protocolType;
+
+
+ private MonitorIndex monitorIndex;
+
+ private MonitorIndexExt monitorIndexExt;
+
//
private final DataProxyMetricItemSet metricItemSet;
- /**
- * ServerMessageHandler
- * @param source
- * @param serProcessor
- * @param allChannels
- * @param topic
- * @param attr
- * @param filterEmptyMsg
- * @param maxMsgLength
- * @param maxCons
- * @param isCompressed
- * @param protocolType
- */
- public ServerMessageHandler(AbstractSource source, ServiceDecoder serProcessor,
- ChannelGroup allChannels,
- String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength,
- Integer maxCons,
- Boolean isCompressed, String protocolType) {
+ public ServerMessageHandler(AbstractSource source, ServiceDecoder serviceDecoder,
+ ChannelGroup allChannels,
+ String topic, String attr, Boolean filterEmptyMsg,
+ Integer maxCons, Boolean isCompressed, MonitorIndex monitorIndex,
+ MonitorIndexExt monitorIndexExt, String protocolType) {
this.source = source;
this.processor = source.getChannelProcessor();
- this.serviceProcessor = serProcessor;
+ this.serviceDecoder = serviceDecoder;
this.allChannels = allChannels;
this.defaultTopic = topic;
if (null != attr) {
@@ -144,11 +153,20 @@ public class ServerMessageHandler extends SimpleChannelHandler {
} else {
this.metricItemSet = new DataProxyMetricItemSet(this.toString());
}
+ this.monitorIndex = monitorIndex;
+ this.monitorIndexExt = monitorIndexExt;
}
private String getRemoteIp(Channel channel) {
+ return getRemoteIp(channel, null);
+ }
+
+ private String getRemoteIp(Channel channel, SocketAddress remoteAddress) {
String strRemoteIp = DEFAULT_REMOTE_IP_VALUE;
SocketAddress remoteSocketAddress = channel.getRemoteAddress();
+ if (remoteSocketAddress == null) {
+ remoteSocketAddress = remoteAddress;
+ }
if (null != remoteSocketAddress) {
strRemoteIp = remoteSocketAddress.toString();
try {
@@ -237,7 +255,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
private void checkGroupIdInfo(ProxyMessage message, Map<String, String> commonAttrMap,
- Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+ Map<String, String> attrMap, AtomicReference<String> topicInfo) {
String groupId = message.getGroupId();
String streamId = message.getStreamId();
if (null != groupId) {
@@ -245,10 +263,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
if ("dc".equals(from)) {
String dcInterfaceId = message.getStreamId();
if (StringUtils.isNotEmpty(dcInterfaceId)
- && configManager.getDcMappingProperties()
- .containsKey(dcInterfaceId.trim())) {
+ && configManager.getDcMappingProperties()
+ .containsKey(dcInterfaceId.trim())) {
groupId = configManager.getDcMappingProperties()
- .get(dcInterfaceId.trim()).trim();
+ .get(dcInterfaceId.trim()).trim();
message.setGroupId(groupId);
}
}
@@ -270,16 +288,16 @@ public class ServerMessageHandler extends SimpleChannelHandler {
String streamIdNum = commonAttrMap.get(AttributeConstants.STREAMID_NUM);
if (configManager.getGroupIdMappingProperties() != null
- && configManager.getStreamIdMappingProperties() != null) {
+ && configManager.getStreamIdMappingProperties() != null) {
groupId = configManager.getGroupIdMappingProperties().get(groupIdNum);
streamId = (configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
- ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+ ? null : configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
if (groupId != null && streamId != null) {
String enableTrans =
- (configManager.getGroupIdEnableMappingProperties() == null)
- ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+ (configManager.getGroupIdEnableMappingProperties() == null)
+ ? null : configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
- .equalsIgnoreCase(num2name))) {
+ .equalsIgnoreCase(num2name))) {
String extraAttr = "groupId=" + groupId + "&" + "streamId=" + streamId;
message.setData(newBinMsg(message.getData(), extraAttr));
}
@@ -299,8 +317,8 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
private void updateMsgList(List<ProxyMessage> msgList, Map<String, String> commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP, MsgType msgType) {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP, MsgType msgType) {
for (ProxyMessage message : msgList) {
Map<String, String> attrMap = message.getAttributeMap();
@@ -310,7 +328,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
checkGroupIdInfo(message, commonAttrMap, attrMap, topicInfo);
topic = topicInfo.get();
-// if(groupId==null)groupId="b_test";//default groupId
+ // if(groupId==null)groupId="b_test";//default groupId
message.setTopic(topic);
commonAttrMap.put(AttributeConstants.NODE_IP, strRemoteIP);
@@ -327,15 +345,15 @@ public class ServerMessageHandler extends SimpleChannelHandler {
if (groupId != null && streamId != null) {
String tubeSwtichKey = groupId + SEPARATOR + streamId;
if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) != null
- && "false".equals(configManager.getTubeSwitchProperties()
- .get(tubeSwtichKey).trim())) {
+ && "false".equals(configManager.getTubeSwitchProperties()
+ .get(tubeSwtichKey).trim())) {
continue;
}
}
if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
- && !MsgType.MSG_MULTI_BODY.equals(msgType)
- && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
+ && !MsgType.MSG_MULTI_BODY.equals(msgType)
+ && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
byte[] data = message.getData();
if (data[data.length - 1] == '\n') {
int tripDataLen = data.length - 1;
@@ -352,16 +370,16 @@ public class ServerMessageHandler extends SimpleChannelHandler {
streamId = "";
}
HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
- .computeIfAbsent(topic, k -> new HashMap<>());
+ .computeIfAbsent(topic, k -> new HashMap<>());
List<ProxyMessage> streamIdMsgList = streamIdMsgMap
- .computeIfAbsent(streamId, k -> new ArrayList<>());
+ .computeIfAbsent(streamId, k -> new ArrayList<>());
streamIdMsgList.add(message);
}
}
private void formatMessagesAndSend(Map<String, String> commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP, MsgType msgType) throws MessageIDException {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP, MsgType msgType) throws MessageIDException {
int tdMsgVer = 1;
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -419,7 +437,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
StringBuilder sidBuilder = new StringBuilder();
sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
- .append(SEPARATOR).append(sequenceId);
+ .append(SEPARATOR).append(sequenceId);
headers.put(ConfigConstants.SEQUENCE_ID, sidBuilder.toString());
}
@@ -432,19 +450,33 @@ public class ServerMessageHandler extends SimpleChannelHandler {
} catch (Exception e1) {
long uniqVal = Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
throw new MessageIDException(uniqVal,
- ErrorCode.DT_ERROR,
- new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
- + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
- + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
+ ErrorCode.DT_ERROR,
+ new Throwable("attribute dt=" + headers.get(AttributeConstants.DATA_TIME
+ + " has error, detail is: topic=" + topicEntry.getKey() + "&streamId="
+ + streamIdEntry.getKey() + "&NodeIP=" + strRemoteIP), e1));
}
dtten = dtten / 1000 / 60 / 10;
dtten = dtten * 1000 * 60 * 10;
+ StringBuilder newbase = new StringBuilder();
+ newbase.append(protocolType).append(SEPARATOR)
+ .append(topicEntry.getKey()).append(SEPARATOR)
+ .append(streamIdEntry.getKey()).append(SEPARATOR)
+ .append(strRemoteIP).append(SEPARATOR)
+ .append(NetworkUtils.getLocalIp()).append(SEPARATOR)
+ .append(new SimpleDateFormat("yyyyMMddHHmm")
+ .format(dtten)).append(SEPARATOR).append(pkgTimeStr);
try {
processor.processEvent(event);
+ monitorIndexExt.incrementAndGet("EVENT_SUCCESS");
this.addMetric(true, data.length);
+ monitorIndex.addAndGet(new String(newbase),
+ Integer.parseInt(proxyMetricMsgCnt), 1, data.length, 0);
} catch (Throwable ex) {
logger.error("Error writting to channel,data will discard.", ex);
+ monitorIndexExt.incrementAndGet("EVENT_DROPPED");
+ monitorIndex.addAndGet(new String(newbase), 0,0,0,
+ Integer.parseInt(proxyMetricMsgCnt));
this.addMetric(false, data.length);
throw new ChannelException("ProcessEvent error can't write event to channel.");
}
@@ -453,15 +485,15 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
private void responsePackage(Map<String, String> commonAttrMap,
- Map<String, Object> resultMap,
- Channel remoteChannel,
- SocketAddress remoteSocketAddress,
- MsgType msgType) throws Exception {
+ Map<String, Object> resultMap,
+ Channel remoteChannel,
+ SocketAddress remoteSocketAddress,
+ MsgType msgType) throws Exception {
if (!commonAttrMap.containsKey("isAck") || "true".equals(commonAttrMap.get("isAck"))) {
if (MsgType.MSG_ACK_SERVICE.equals(msgType) || MsgType.MSG_ORIGINAL_RETURN
- .equals(msgType)
- || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR
- .equals(msgType)) {
+ .equals(msgType)
+ || MsgType.MSG_MULTI_BODY.equals(msgType) || MsgType.MSG_MULTI_BODY_ATTR
+ .equals(msgType)) {
byte[] backAttr = mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
byte[] backBody = null;
@@ -486,14 +518,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
} else {
String backAttrStr = new String(backAttr, StandardCharsets.UTF_8);
logger.warn(
- "the send buffer1 is full, so disconnect it!please check remote client"
- + "; Connection info:"
- + remoteChannel + ";attr is " + backAttrStr);
+ "the send buffer1 is full, so disconnect it!please check remote client"
+ + "; Connection info:"
+ + remoteChannel + ";attr is " + backAttrStr);
throw new Exception(new Throwable(
- "the send buffer1 is full, so disconnect it!please check remote client"
- +
- "; Connection info:" + remoteChannel + ";attr is "
- + backAttrStr));
+ "the send buffer1 is full, so disconnect it!please check remote client"
+ +
+ "; Connection info:" + remoteChannel + ";attr is "
+ + backAttrStr));
}
}
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
@@ -531,12 +563,12 @@ public class ServerMessageHandler extends SimpleChannelHandler {
remoteChannel.write(binBuffer, remoteSocketAddress);
} else {
logger.warn(
- "the send buffer2 is full, so disconnect it!please check remote client"
- + "; Connection info:" + remoteChannel + ";attr is "
- + backattrs);
+ "the send buffer2 is full, so disconnect it!please check remote client"
+ + "; Connection info:" + remoteChannel + ";attr is "
+ + backattrs);
throw new Exception(new Throwable(
- "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
- + remoteChannel + ";attr is " + backattrs));
+ "the send buffer2 is full,so disconnect it!please check remote client, Connection info:"
+ + remoteChannel + ";attr is " + backattrs));
}
}
}
@@ -544,14 +576,14 @@ public class ServerMessageHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- logger.info("message received");
+ logger.debug("message received");
if (e == null) {
logger.error("get null messageevent, just skip");
this.addMetric(false, 0);
return;
}
ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
- String strRemoteIP = getRemoteIp(e.getChannel());
+ String strRemoteIP = getRemoteIp(e.getChannel(), e.getRemoteAddress());
SocketAddress remoteSocketAddress = e.getRemoteAddress();
int len = cb.readableBytes();
if (len == 0 && this.filterEmptyMsg) {
@@ -564,8 +596,9 @@ public class ServerMessageHandler extends SimpleChannelHandler {
Channel remoteChannel = e.getChannel();
Map<String, Object> resultMap = null;
try {
- resultMap = serviceProcessor.extractData(cb, remoteChannel);
+ resultMap = serviceDecoder.extractData(cb, remoteChannel, e);
} catch (MessageIDException ex) {
+ logger.error("MessageIDException ex = {}", ex);
this.addMetric(false, 0);
throw new IOException(ex.getCause());
}
@@ -584,8 +617,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
}
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
-// ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
-// remoteChannel.write(binBuffer, remoteSocketAddress);
this.addMetric(false, 0);
return;
}
@@ -602,21 +633,19 @@ public class ServerMessageHandler extends SimpleChannelHandler {
&& !commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
new HashMap<String, HashMap<String, List<ProxyMessage>>>(
- msgList.size());
+ msgList.size());
updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, msgType);
formatMessagesAndSend(commonAttrMap, messageMap, strRemoteIP, msgType);
} else if (msgList != null && commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)) {
-// logger.info("i am in FILE_CHECK_DATA ");
Map<String, String> headers = new HashMap<String, String>();
headers.put("msgtype", "filestatus");
headers.put(ConfigConstants.FILE_CHECK_DATA,
"true");
for (ProxyMessage message : msgList) {
byte[] body = message.getData();
-// logger.info("data:"+new String(body));
Event event = EventBuilder.withBody(body, headers);
try {
processor.processEvent(event);
@@ -637,7 +666,6 @@ public class ServerMessageHandler extends SimpleChannelHandler {
"true");
for (ProxyMessage message : msgList) {
byte[] body = message.getData();
-// logger.info("data:"+new String(body));
Event event = EventBuilder.withBody(body, headers);
try {
processor.processEvent(event);
@@ -656,11 +684,20 @@ public class ServerMessageHandler extends SimpleChannelHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
logger.error("exception caught", e.getCause());
+ monitorIndexExt.incrementAndGet("EVENT_OTHEREXP");
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
logger.error("channel closed {}", ctx.getChannel());
+ super.channelClosed(ctx, e);
+ try {
+ e.getChannel().disconnect();
+ e.getChannel().close();
+ } catch (Exception ex) {
+ //
+ }
+ allChannels.remove(e.getChannel());
}
/**
@@ -689,7 +726,11 @@ public class ServerMessageHandler extends SimpleChannelHandler {
/**
* addMetric
+<<<<<<< HEAD
*
+=======
+ *
+>>>>>>> add udp feature
* @param result
* @param size
*/
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
index 520ae27..162d614 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServiceDecoder.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.MessageEvent;
/**
* decoder interface definition
@@ -36,5 +37,5 @@ public interface ServiceDecoder {
* @return
* @throws
*/
- Map<String, Object> extractData(ChannelBuffer cb, Channel channel) throws Exception;
+ Map<String, Object> extractData(ChannelBuffer cb, Channel channel, MessageEvent e) throws Exception;
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 4512db9..a0cbe05 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -576,16 +576,16 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent msgEvent) throws Exception {
logger.info("message received");
- if (e == null) {
+ if (msgEvent == null) {
logger.error("get null messageevent, just skip");
this.addMetric(false, 0);
return;
}
- ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
- String strRemoteIP = getRemoteIp(e.getChannel());
- SocketAddress remoteSocketAddress = e.getRemoteAddress();
+ ChannelBuffer cb = ((ChannelBuffer) msgEvent.getMessage());
+ String strRemoteIP = getRemoteIp(msgEvent.getChannel());
+ SocketAddress remoteSocketAddress = msgEvent.getRemoteAddress();
int len = cb.readableBytes();
if (len == 0 && this.filterEmptyMsg) {
logger.warn("skip empty msg.");
@@ -594,10 +594,10 @@ public class SimpleMessageHandler extends SimpleChannelHandler {
return;
}
- Channel remoteChannel = e.getChannel();
+ Channel remoteChannel = msgEvent.getChannel();
Map<String, Object> resultMap = null;
try {
- resultMap = serviceProcessor.extractData(cb, remoteChannel);
+ resultMap = serviceProcessor.extractData(cb, remoteChannel, msgEvent);
} catch (MessageIDException ex) {
this.addMetric(false, 0);
throw new IOException(ex.getCause());
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index af7c8da..60db247 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -17,11 +17,11 @@
package org.apache.inlong.dataproxy.source;
+import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.text.SimpleDateFormat;
@@ -29,15 +29,10 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.Executors;
-
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
-import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.source.AbstractSource;
import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -54,45 +49,57 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
* Simple tcp source
*
*/
-public class SimpleTcpSource extends AbstractSource implements Configurable, EventDrivenSource {
+public class SimpleTcpSource extends BaseSource
+ implements Configurable, EventDrivenSource {
private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
+
public static ArrayList<String> blacklist = new ArrayList<String>();
+
private static final String blacklistFilePath = "blacklist.properties";
+
+ private static int TRAFFIC_CLASS_TYPE_0 = 0;
+
+ private static int TRAFFIC_CLASS_TYPE_96 = 96;
+
+ private static int BUFFER_SIZE_MUST_THAN = 0;
+
+ private static int HIGH_WATER_MARK_DEFAULT_VALUE = 64 * 1024;
+
+ private static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+ private static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+ private static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+ private static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+ private static int DEFAULT_SLEEP_TIME_MS = 5 * 1000;
+
private static long propsLastModified;
- private static final String CONNECTIONS = "connections";
-
- protected int maxConnections = Integer.MAX_VALUE;
- private ServerBootstrap serverBootstrap = null;
- protected ChannelGroup allChannels;
- protected int port;
- protected String host = null;
- protected String msgFactoryName;
- protected String serviceDecoderName;
- protected String messageHandlerName;
- protected int maxMsgLength;
- protected boolean isCompressed;
+
private CheckBlackListThread checkBlackListThread;
+
private int maxThreads = 32;
private boolean tcpNoDelay = true;
+
private boolean keepAlive = true;
+
private int receiveBufferSize;
+
private int highWaterMark;
+
private int sendBufferSize;
+
private int trafficClass;
protected String topic;
- protected String attr;
- protected boolean filterEmptyMsg;
- private Channel nettyChannel = null;
//
private DataProxyMetricItemSet metricItemSet;
@@ -178,7 +185,7 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
logger.info("blacklist.properties:{}\n{}",
formator.format(new Date(blacklistFile.lastModified())), blacklist);
}
- Thread.sleep(5 * 1000);
+ Thread.sleep(DEFAULT_SLEEP_TIME_MS);
checkBlackList(blacklist, allChannels);
} catch (InterruptedException e) {
logger.info("ConfigReloader thread exit!");
@@ -191,13 +198,12 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
}
@Override
- public synchronized void start() {
+ public synchronized void startSource() {
logger.info("start " + this.getName());
this.metricItemSet = new DataProxyMetricItemSet(this.getName());
MetricRegister.register(metricItemSet);
checkBlackListThread = new CheckBlackListThread();
checkBlackListThread.start();
- super.start();
ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
ChannelFactory factory =
@@ -207,7 +213,6 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
Executors.newCachedThreadPool(
new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")), maxThreads);
logger.info("Set max workers : {} ;", maxThreads);
- ChannelPipelineFactory fac = null;
serverBootstrap = new ServerBootstrap(factory);
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
@@ -218,39 +223,15 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
serverBootstrap.setOption("child.writeBufferHighWaterMark", highWaterMark);
logger.info("load msgFactory=" + msgFactoryName
+ " and serviceDecoderName=" + serviceDecoderName);
- try {
-
- ServiceDecoder serviceDecoder =
- (ServiceDecoder) Class.forName(serviceDecoderName).newInstance();
-
- Class<? extends ChannelPipelineFactory> clazz =
- (Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
-
- Constructor ctor =
- clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
- String.class, ServiceDecoder.class, String.class,
- Integer.class, String.class, String.class, Boolean.class,
- Integer.class, Boolean.class, String.class);
-
- logger.info("Using channel processor:{}", this.getClass().getName());
- fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
- "tcp", serviceDecoder, messageHandlerName,
- maxMsgLength, topic, attr, filterEmptyMsg, maxConnections, isCompressed, this.getName());
-
- } catch (Exception e) {
- logger.error("Simple Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}",
- msgFactoryName, e);
- stop();
- throw new FlumeException(e.getMessage());
- }
+ ChannelPipelineFactory fac = this.getChannelPiplineFactory();
serverBootstrap.setPipelineFactory(fac);
try {
if (host == null) {
- nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
+ nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
} else {
- nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
+ nettyChannel = ((ServerBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
}
} catch (Exception e) {
logger.error("Simple TCP Source error bind host {} port {},program will exit!", host, port);
@@ -265,59 +246,34 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
@Override
public synchronized void stop() {
- logger.info("[STOP SOURCE]{} stopping...", super.getName());
checkBlackListThread.shutdouwn();
- if (allChannels != null && !allChannels.isEmpty()) {
- try {
- allChannels.unbind().awaitUninterruptibly();
- allChannels.close().awaitUninterruptibly();
- } catch (Exception e) {
- logger.warn("Simple TCP Source netty server stop ex", e);
- } finally {
- allChannels.clear();
- // allChannels = null;
- }
- }
-
- if (serverBootstrap != null) {
- try {
-
- serverBootstrap.releaseExternalResources();
- } catch (Exception e) {
- logger.warn("Simple TCP Source serverBootstrap stop ex ", e);
- } finally {
- serverBootstrap = null;
- }
- }
-
super.stop();
- logger.info("[STOP SOURCE]{} stopped", super.getName());
}
@Override
public void configure(Context context) {
logger.info("context is {}", context);
- port = context.getInteger(ConfigConstants.CONFIG_PORT);
- host = context.getString(ConfigConstants.CONFIG_HOST, "0.0.0.0");
-
+ super.configure(context);
tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true);
-
keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true);
- highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, 64 * 1024);
- receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, 1024 * 64);
- if (receiveBufferSize > 16 * 1024 * 1024) {
- receiveBufferSize = 16 * 1024 * 1024;
+ highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, HIGH_WATER_MARK_DEFAULT_VALUE);
+ receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, RECEIVE_BUFFER_DEFAULT_SIZE);
+ if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
+ receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
}
- Preconditions.checkArgument(receiveBufferSize > 0, "receiveBufferSize must be > 0");
+ Preconditions.checkArgument(receiveBufferSize > BUFFER_SIZE_MUST_THAN,
+ "receiveBufferSize must be > 0");
- sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, 1024 * 64);
- if (sendBufferSize > 16 * 1024 * 1024) {
- sendBufferSize = 16 * 1024 * 1024;
+ sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, SEND_BUFFER_DEFAULT_SIZE);
+ if (sendBufferSize > SEND_BUFFER_MAX_SIZE) {
+ sendBufferSize = SEND_BUFFER_MAX_SIZE;
}
- Preconditions.checkArgument(sendBufferSize > 0, "sendBufferSize must be > 0");
+ Preconditions.checkArgument(sendBufferSize > BUFFER_SIZE_MUST_THAN,
+ "sendBufferSize must be > 0");
- trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, 0);
- Preconditions.checkArgument((trafficClass == 0 || trafficClass == 96),
+ trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, TRAFFIC_CLASS_TYPE_0);
+ Preconditions.checkArgument((trafficClass == TRAFFIC_CLASS_TYPE_0
+ || trafficClass == TRAFFIC_CLASS_TYPE_96),
"trafficClass must be == 0 or == 96");
try {
@@ -326,51 +282,6 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
logger.warn("Simple TCP Source max-threads property must specify an integer value. {}",
context.getString(ConfigConstants.MAX_THREADS));
}
-
- try {
- maxConnections = context.getInteger(CONNECTIONS, 5000);
- } catch (NumberFormatException e) {
- logger.warn("BaseSource\'s \"connections\" property must specify an integer value.",
- context.getString(CONNECTIONS));
- }
-
- topic = context.getString(ConfigConstants.TOPIC);
- attr = context.getString(ConfigConstants.ATTR);
- Configurables.ensureRequiredNonNull(context, ConfigConstants.TOPIC, ConfigConstants.ATTR);
-
- topic = topic.trim();
- Preconditions.checkArgument(!topic.isEmpty(), "topic is empty");
- attr = attr.trim();
- Preconditions.checkArgument(!attr.isEmpty(), "attr is empty");
-
- filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false);
-
- msgFactoryName =
- context.getString(ConfigConstants.MSG_FACTORY_NAME,
- "org.apache.inlong.dataproxy.source.ServerMessageFactory");
- msgFactoryName = msgFactoryName.trim();
- Preconditions.checkArgument(StringUtils.isNotBlank(msgFactoryName),
- "msgFactoryName is empty");
-
- serviceDecoderName =
- context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME,
- "org.apache.inlong.dataproxy.source.DefaultServiceDecoder");
- serviceDecoderName = serviceDecoderName.trim();
- Preconditions.checkArgument(StringUtils.isNotBlank(serviceDecoderName),
- "serviceProcessorName is empty");
-
- messageHandlerName =
- context.getString(ConfigConstants.MESSAGE_HANDLER_NAME,
- "org.apache.inlong.dataproxy.source.ServerMessageHandler");
- messageHandlerName = messageHandlerName.trim();
- Preconditions.checkArgument(StringUtils.isNotBlank(messageHandlerName),
- "messageHandlerName is empty");
-
- maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, 1024 * 64);
- Preconditions.checkArgument(
- (maxMsgLength >= 4 && maxMsgLength <= ConfigConstants.MSG_MAX_LENGTH_BYTES),
- "maxMsgLength must be >= 4 and <= " + ConfigConstants.MSG_MAX_LENGTH_BYTES);
- isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true);
}
/**
@@ -380,4 +291,9 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
public DataProxyMetricItemSet getMetricItemSet() {
return metricItemSet;
}
+
+ @Override
+ public String getProtocolName() {
+ return "tcp";
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
new file mode 100644
index 0000000..7c09c30
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleUdpSource.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.base.NamedThreadFactory;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleUdpSource
+ extends BaseSource
+ implements EventDrivenSource, Configurable {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(SimpleUdpSource.class);
+
+ private static int UPD_BUFFER_DEFAULT_SIZE = 8192;
+
+ public SimpleUdpSource() {
+ super();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void startSource() {
+ ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+ // setup Netty server
+ serverBootstrap = new ConnectionlessBootstrap(
+ new NioDatagramChannelFactory(Executors
+ .newCachedThreadPool(new NamedThreadFactory("udpSource-nettyWorker"
+ + "-threadGroup")), maxThreads));
+ logger.info("Set max workers : {} ;",maxThreads);
+ serverBootstrap.setOption("receiveBufferSizePredictorFactory",
+ new FixedReceiveBufferSizePredictorFactory(UPD_BUFFER_DEFAULT_SIZE));
+ ChannelPipelineFactory fac = this.getChannelPiplineFactory();
+ serverBootstrap.setPipelineFactory(fac);
+ try {
+ if (host == null) {
+ nettyChannel =
+ ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(port));
+ } else {
+ nettyChannel =
+ ((ConnectionlessBootstrap)serverBootstrap).bind(new InetSocketAddress(host, port));
+ }
+ } catch (Exception e) {
+ logger.error("Simple UDP Source error bind host {} port {}, program will exit!",
+ new Object[] { host, port});
+ System.exit(-1);
+ //throw new FlumeException(e.getMessage());
+ }
+ allChannels.add(nettyChannel);
+ logger.info("Simple UDP Source started at host {}, port {}", host, port);
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ }
+
+ @Override
+ public void configure(Context context) {
+ super.configure(context);
+ try {
+ maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, 32);
+ } catch (NumberFormatException e) {
+ logger.warn("Simple UDP Source max-threads property must specify an integer value.",
+ context.getString(ConfigConstants.MAX_THREADS));
+ }
+ }
+
+ @Override
+ public String getProtocolName() {
+ return "udp";
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
new file mode 100644
index 0000000..19d0ede
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/source/UdpSourceTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.source;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class UdpSourceTest {
+
+ @Test
+ public void configTest() {
+ Map<String, String> map = new HashMap<>();
+ map.put(ConfigConstants.MAX_THREADS, "32");
+ map.put(ConfigConstants.CONFIG_PORT, "8080");
+ map.put(ConfigConstants.CONFIG_HOST, "127.0.0.1");
+ map.put(ConfigConstants.TOPIC, "topic");
+ map.put(ConfigConstants.ATTR, "{}");
+ Context context = new Context(map);
+ SimpleUdpSource udpSource = new SimpleUdpSource();
+ udpSource.configure(context);
+ int threadNum = udpSource.getContext().getInteger(ConfigConstants.MAX_THREADS);
+ Assert.assertEquals(threadNum, 32);
+ }
+}