You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/03 09:20:45 UTC
[incubator-eventmesh] branch master updated: fix issue2780
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 25450ad24 fix issue2780
new 26eab9911 Merge pull request #2781 from jonyangx/issue2780
25450ad24 is described below
commit 25450ad24d03243c12c38cdc4cc7949de6522abd
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Jan 1 14:38:48 2023 +0800
fix issue2780
---
.../protocol/http/consumer/EventMeshConsumer.java | 80 +++++------
.../eventmesh/runtime/util/EventMeshUtil.java | 152 +++++++++++----------
2 files changed, 121 insertions(+), 111 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 99da5630a..ffcbfba9d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -104,21 +104,21 @@ public class EventMeshConsumer {
EventListener clusterEventListener = (event, context) -> {
String protocolVersion =
- Objects.requireNonNull(event.getSpecVersion()).toString();
+ Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
- EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
- EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+ EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
String topic = event.getSubject();
String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO)).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID)).toString();
event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
- .build();
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+ .build();
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|mq2eventMesh|topic={}|event={}", topic, event);
} else {
@@ -126,12 +126,12 @@ public class EventMeshConsumer {
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(),
- topic, null);
+ topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}",
- consumerGroupConf.getConsumerGroup(), topic);
+ consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -142,11 +142,13 @@ public class EventMeshConsumer {
}
SubscriptionItem subscriptionItem =
- consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
- HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
- consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
- consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+ consumerGroupConf.getConsumerGroupTopicConf().get(topic).getSubscriptionItem();
+ HandleMsgContext handleMsgContext = new HandleMsgContext(
+ EventMeshUtil.buildPushMsgSeqNo(),
+ consumerGroupConf.getConsumerGroup(),
+ EventMeshConsumer.this,
+ topic, event, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(),
+ consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -176,19 +178,19 @@ public class EventMeshConsumer {
EventListener broadcastEventListener = (event, context) -> {
String protocolVersion =
- Objects.requireNonNull(event.getSpecVersion()).toString();
+ Objects.requireNonNull(event.getSpecVersion()).toString();
Span span = TraceUtils.prepareServerSpan(
- EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
- EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
+ EventMeshTraceConstants.TRACE_DOWNSTREAM_EVENTMESH_SERVER_SPAN, false);
try {
event = CloudEventBuilder.from(event)
- .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
- .build();
+ .withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshServerIp())
+ .build();
String topic = event.getSubject();
String bizSeqNo = getEventExtension(event, ProtocolKey.ClientInstanceKey.BIZSEQNO, "");
@@ -198,18 +200,18 @@ public class EventMeshConsumer {
messageLogger.debug("message|mq2eventMesh|topic={}|msg={}", topic, event);
} else {
messageLogger.info("message|mq2eventMesh|topic={}|bizSeqNo={}|uniqueId={}",
- topic, bizSeqNo,
- uniqueId);
+ topic, bizSeqNo,
+ uniqueId);
}
ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(
- consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
+ consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
- (EventMeshAsyncConsumeContext) context;
+ (EventMeshAsyncConsumeContext) context;
if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}",
- consumerGroupConf.getConsumerGroup(), topic);
+ consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(event, uniqueId, bizSeqNo);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -220,15 +222,15 @@ public class EventMeshConsumer {
}
SubscriptionItem subscriptionItem =
- consumerGroupConf.getConsumerGroupTopicConf().get(topic)
- .getSubscriptionItem();
+ consumerGroupConf.getConsumerGroupTopicConf().get(topic)
+ .getSubscriptionItem();
HandleMsgContext handleMsgContext =
- new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
- consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, event, subscriptionItem,
- eventMeshAsyncConsumeContext.getAbstractContext(),
- consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
- currentTopicConfig);
+ new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(),
+ consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
+ topic, event, subscriptionItem,
+ eventMeshAsyncConsumeContext.getAbstractContext(),
+ consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId,
+ currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
@@ -310,16 +312,16 @@ public class EventMeshConsumer {
public void sendMessageBack(final CloudEvent event, final String uniqueId, String bizSeqNo) throws Exception {
EventMeshProducer sendMessageBack
- = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
+ = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
if (sendMessageBack == null) {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}",
- consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+ consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
return;
}
final SendMessageContext sendMessageBackContext = new SendMessageContext(bizSeqNo, event, sendMessageBack,
- eventMeshHTTPServer);
+ eventMeshHTTPServer);
sendMessageBack.send(sendMessageBackContext, new SendCallback() {
@Override
@@ -329,7 +331,7 @@ public class EventMeshConsumer {
@Override
public void onException(OnExceptionContext context) {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
- consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
+ consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
}
});
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 01ffb58d2..1303529aa 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -37,11 +37,11 @@ import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,29 +63,42 @@ public class EventMeshUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(EventMeshUtil.class);
- private static final Logger TCPLOGGER = LoggerFactory.getLogger("tcpMonitor");
-
public static String buildPushMsgSeqNo() {
- return StringUtils.rightPad(String.valueOf(System.currentTimeMillis()), 6)
- + RandomStringUtils.generateNum(4);
+ return new StringBuilder()
+ .append(StringUtils.rightPad(String.valueOf(System.currentTimeMillis()), 6))
+ .append(RandomStringUtils.generateNum(4))
+ .toString();
+
}
- public static String buildMeshClientID(String clientGroup, String meshCluster) {
- return StringUtils.trim(clientGroup)
- + "(" + StringUtils.trim(meshCluster) + ")"
- + "-" + EventMeshVersion.getCurrentVersionDesc()
- + "-" + ThreadUtils.getPID();
+ public static String buildMeshClientID(final String clientGroup, final String meshCluster) {
+ return new StringBuilder()
+ .append(StringUtils.trim(clientGroup))
+ .append('(')
+ .append(StringUtils.trim(meshCluster))
+ .append(')')
+ .append('-')
+ .append(EventMeshVersion.getCurrentVersionDesc())
+ .append('-')
+ .append(ThreadUtils.getPID())
+ .toString();
}
- public static String buildMeshTcpClientID(String clientSysId, String purpose, String meshCluster) {
- return StringUtils.trim(clientSysId)
- + "-" + StringUtils.trim(purpose)
- + "-" + StringUtils.trim(meshCluster)
- + "-" + EventMeshVersion.getCurrentVersionDesc()
- + "-" + ThreadUtils.getPID();
+ public static String buildMeshTcpClientID(final String clientSysId, final String purpose,
+ final String meshCluster) {
+ return new StringBuilder().append(StringUtils.trim(clientSysId))
+ .append('-')
+ .append(StringUtils.trim(purpose))
+ .append('-')
+ .append(StringUtils.trim(meshCluster))
+ .append('-')
+ .append(EventMeshVersion.getCurrentVersionDesc())
+ .append('-')
+ .append(ThreadUtils.getPID())
+ .toString();
}
- public static String buildClientGroup(String systemId) {
+ public static String buildClientGroup(final String systemId) {
return systemId;
}
@@ -95,24 +108,25 @@ public class EventMeshUtil {
* @param e
* @return stacktrace
*/
- public static String stackTrace(Throwable e) {
+ public static String stackTrace(final Throwable e) {
return stackTrace(e, 0);
}
- public static String stackTrace(Throwable e, int level) {
+ public static String stackTrace(final Throwable e, final int level) {
if (e == null) {
return null;
}
- StackTraceElement[] eles = e.getStackTrace();
- int localLevel = (level == 0) ? eles.length : level;
- StringBuilder sb = new StringBuilder();
+ final StackTraceElement[] eles = e.getStackTrace();
+ final int localLevel = (level == 0) ? eles.length : level;
+ final StringBuilder sb = new StringBuilder();
sb.append(e.getMessage()).append(System.lineSeparator());
int innerLevel = 0;
- for (StackTraceElement ele : eles) {
+ for (final StackTraceElement ele : eles) {
sb.append(ele).append(System.lineSeparator());
- if (++innerLevel >= localLevel) {
+ innerLevel++;
+ if (innerLevel >= localLevel) {
break;
}
}
@@ -120,12 +134,11 @@ public class EventMeshUtil {
}
public static ObjectMapper createJsoner() {
- ObjectMapper jsonMapper = new ObjectMapper();
- jsonMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- jsonMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
- jsonMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
- jsonMapper.setTimeZone(TimeZone.getDefault());
- return jsonMapper;
+ return new ObjectMapper()
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+ .setTimeZone(TimeZone.getDefault());
}
@@ -135,8 +148,8 @@ public class EventMeshUtil {
* @param eventMeshMessage
* @return message string
*/
- public static String printMqMessage(EventMeshMessage eventMeshMessage) {
- Map<String, String> properties = eventMeshMessage.getProperties();
+ public static String printMqMessage(final EventMeshMessage eventMeshMessage) {
+ final Map<String, String> properties = eventMeshMessage.getProperties();
String keys = properties.get(EventMeshConstants.KEYS_UPPERCASE);
if (StringUtils.isBlank(keys)) {
@@ -147,8 +160,7 @@ public class EventMeshUtil {
properties.get(EventMeshConstants.TTL), properties.get(EventMeshConstants.RR_REQUEST_UNIQ_ID), keys);
}
- public static String getMessageBizSeq(CloudEvent event) {
-
+ public static String getMessageBizSeq(final CloudEvent event) {
String keys = (String) event.getExtension(EventMeshConstants.KEYS_UPPERCASE);
if (StringUtils.isBlank(keys)) {
keys = (String) event.getExtension(EventMeshConstants.KEYS_LOWERCASE);
@@ -157,10 +169,9 @@ public class EventMeshUtil {
return keys;
}
- public static Map<String, String> getEventProp(CloudEvent event) {
- Set<String> extensionSet = event.getExtensionNames();
- Map<String, String> propMap = new HashMap<>();
- for (String extensionKey : extensionSet) {
+ public static Map<String, String> getEventProp(final CloudEvent event) {
+ final Map<String, String> propMap = new HashMap<>();
+ for (final String extensionKey : event.getExtensionNames()) {
propMap.put(extensionKey, event.getExtension(extensionKey) == null ? ""
: event.getExtension(extensionKey).toString());
}
@@ -169,19 +180,18 @@ public class EventMeshUtil {
public static String getLocalAddr() {
//priority of networkInterface when generating client ip
- String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
+ final String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("networkInterface.priority: {}", priority);
}
- List<String> preferList = new ArrayList<String>();
- for (String eth : priority.split("<")) {
- preferList.add(eth);
- }
+ final List<String> preferList = new ArrayList<>();
+ Arrays.stream(priority.split("<")).forEach(preferList::add);
+
NetworkInterface preferNetworkInterface = null;
try {
- Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
+ final Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
while (enumeration1.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration1.nextElement();
if (!preferList.contains(networkInterface.getName())) {
@@ -196,8 +206,8 @@ public class EventMeshUtil {
}
// Traversal Network interface to get the first non-loopback and non-private address
- ArrayList<String> ipv4Result = new ArrayList<String>();
- ArrayList<String> ipv6Result = new ArrayList<String>();
+ final ArrayList<String> ipv4Result = new ArrayList<>();
+ final ArrayList<String> ipv6Result = new ArrayList<>();
if (preferNetworkInterface != null) {
if (LOGGER.isDebugEnabled()) {
@@ -209,7 +219,7 @@ public class EventMeshUtil {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("no preferNetworkInterface");
}
- Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+ final Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
while (enumeration.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration.nextElement();
final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
@@ -219,12 +229,10 @@ public class EventMeshUtil {
// prefer ipv4
if (!ipv4Result.isEmpty()) {
- for (String ip : ipv4Result) {
- if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
- continue;
+ for (final String ip : ipv4Result) {
+ if (!ip.startsWith("127.0") && !ip.startsWith("192.168")) {
+ return ip;
}
-
- return ip;
}
return ipv4Result.get(ipv4Result.size() - 1);
@@ -248,8 +256,8 @@ public class EventMeshUtil {
}
}
- private static void getIpResult(List<String> ipv4Result, List<String> ipv6Result,
- Enumeration<InetAddress> en) {
+ private static void getIpResult(final List<String> ipv4Result, final List<String> ipv6Result,
+ final Enumeration<InetAddress> en) {
while (en.hasMoreElements()) {
final InetAddress address = en.nextElement();
if (address.isLoopbackAddress()) {
@@ -265,26 +273,26 @@ public class EventMeshUtil {
}
}
- public static String buildUserAgentClientId(UserAgent client) {
+ public static String buildUserAgentClientId(final UserAgent client) {
if (client == null) {
return null;
}
- StringBuilder sb = new StringBuilder();
- sb.append(client.getSubsystem())
- .append("-")
- .append("-")
+ return new StringBuilder()
+ .append(client.getSubsystem())
+ .append('-')
+ .append('-')
.append(client.getPid())
- .append("-")
+ .append('-')
.append(client.getHost())
- .append(":")
- .append(client.getPort());
- return sb.toString();
+ .append(':')
+ .append(client.getPort())
+ .toString();
}
- public static void printState(ThreadPoolExecutor scheduledExecutorService) {
- if (TCPLOGGER.isInfoEnabled()) {
- TCPLOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactoryImpl) scheduledExecutorService.getThreadFactory())
+ public static void printState(final ThreadPoolExecutor scheduledExecutorService) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("{} [{} {} {} {}]", ((EventMeshThreadFactoryImpl) scheduledExecutorService.getThreadFactory())
.getThreadNamePrefix(), scheduledExecutorService.getQueue().size(), scheduledExecutorService
.getPoolSize(), scheduledExecutorService.getActiveCount(), scheduledExecutorService
.getCompletedTaskCount());
@@ -300,7 +308,7 @@ public class EventMeshUtil {
* @throws ClassNotFoundException
*/
@SuppressWarnings("unchecked")
- public static <T> T cloneObject(T object) throws IOException, ClassNotFoundException {
+ public static <T> T cloneObject(final T object) throws IOException, ClassNotFoundException {
try (ByteArrayOutputStream byOut = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byOut)) {
@@ -315,14 +323,14 @@ public class EventMeshUtil {
}
- public static Map<String, Object> getCloudEventExtensionMap(String protocolVersion,
- CloudEvent cloudEvent) {
- EventMeshCloudEventWriter eventMeshCloudEventWriter = new EventMeshCloudEventWriter();
+ public static Map<String, Object> getCloudEventExtensionMap(final String protocolVersion,
+ final CloudEvent cloudEvent) {
+ final EventMeshCloudEventWriter eventMeshCloudEventWriter = new EventMeshCloudEventWriter();
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)
- && (cloudEvent instanceof CloudEventV1)) {
+ && cloudEvent instanceof CloudEventV1) {
((CloudEventV1) cloudEvent).readContext(eventMeshCloudEventWriter);
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)
- && (cloudEvent instanceof CloudEventV03)) {
+ && cloudEvent instanceof CloudEventV03) {
((CloudEventV03) cloudEvent).readContext(eventMeshCloudEventWriter);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org