You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/01 03:50:44 UTC
[incubator-eventmesh] branch master updated: simplify code
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 7a304bdd9 simplify code
new db9db3eee Merge pull request #2753 from weihubeats/ClientGroupWrapper
7a304bdd9 is described below
commit 7a304bdd973a853143f22c768295d4c4e3fefe4a
Author: weihu <we...@163.com>
AuthorDate: Sat Dec 31 15:45:33 2022 +0800
simplify code
---
.../tcp/client/group/ClientGroupWrapper.java | 147 ++++++++++-----------
1 file changed, 71 insertions(+), 76 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 944dc3b0c..4e91a3d2d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -47,8 +47,6 @@ import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-
-
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
@@ -62,28 +60,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class ClientGroupWrapper {
-
- public static final Logger LOGGER = LoggerFactory.getLogger(ClientGroupWrapper.class);
-
- private String sysId;
+
+ private final String sysId;
private String group;
private EventMeshTCPConfiguration eventMeshTCPConfiguration;
- private EventMeshTCPServer eventMeshTCPServer;
+ private final EventMeshTCPServer eventMeshTCPServer;
private EventMeshTcpRetryer eventMeshTcpRetryer;
@@ -109,14 +104,14 @@ public class ClientGroupWrapper {
private MQConsumerWrapper broadCastMsgConsumer;
- private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
+ private final ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
new ConcurrentHashMap<String, Set<Session>>();
- private ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
- private MQProducerWrapper mqProducerWrapper;
+ private final MQProducerWrapper mqProducerWrapper;
public ClientGroupWrapper(String sysId, String group,
EventMeshTCPServer eventMeshTCPServer,
@@ -148,7 +143,7 @@ public class ClientGroupWrapper {
this.groupLock.readLock().lockInterruptibly();
has = topic2sessionInGroupMapping.containsKey(topic);
} catch (Exception e) {
- LOGGER.error("hasSubscription error! topic[{}]", topic);
+ log.error("hasSubscription error! topic[{}]", topic);
} finally {
this.groupLock.readLock().unlock();
}
@@ -179,7 +174,7 @@ public class ClientGroupWrapper {
public void onException(OnExceptionContext context) {
String bizSeqNo = (String) upStreamMsgContext.getEvent()
.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
- LOGGER.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
+ log.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
upStreamMsgContext.getSession().getClient(), context.getException());
}
@@ -194,17 +189,17 @@ public class ClientGroupWrapper {
public boolean addSubscription(SubscriptionItem subscriptionItem, Session session)
throws Exception {
if (subscriptionItem == null) {
- LOGGER.error("addSubscription param error,subscriptionItem is null", session);
+ log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
return false;
}
String topic = subscriptionItem.getTopic();
if (session == null || !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
- LOGGER.error("addSubscription param error,topic:{},session:{}", topic, session);
+ log.error("addSubscription param error,topic:{},session:{}", topic, session);
return false;
}
- boolean r = false;
+ boolean r;
try {
this.groupLock.writeLock().lockInterruptibly();
if (!topic2sessionInGroupMapping.containsKey(topic)) {
@@ -214,20 +209,20 @@ public class ClientGroupWrapper {
r = topic2sessionInGroupMapping.get(topic).add(session);
if (r) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("addSubscription success, group:{} topic:{} client:{}", group,
+ if (log.isInfoEnabled()) {
+ log.info("addSubscription success, group:{} topic:{} client:{}", group,
topic, session.getClient());
}
} else {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
+ if (log.isWarnEnabled()) {
+ log.warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
session.getClient());
}
}
subscriptions.putIfAbsent(topic, subscriptionItem);
} catch (Exception e) {
- LOGGER.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+ log.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
throw new Exception("addSubscription fail");
} finally {
this.groupLock.writeLock().unlock();
@@ -237,14 +232,14 @@ public class ClientGroupWrapper {
public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) {
if (subscriptionItem == null) {
- LOGGER.error("addSubscription param error,subscriptionItem is null", session);
+ log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
return false;
}
String topic = subscriptionItem.getTopic();
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
- LOGGER.error("removeSubscription param error,topic:{},session:{}", topic, session);
+ log.error("removeSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -255,14 +250,14 @@ public class ClientGroupWrapper {
r = topic2sessionInGroupMapping.get(topic).remove(session);
if (r) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
+ if (log.isInfoEnabled()) {
+ log.info(
"removeSubscription remove session success, group:{} topic:{} client:{}",
group, topic, session.getClient());
}
} else {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn(
+ if (log.isWarnEnabled()) {
+ log.warn(
"removeSubscription remove session failed, group:{} topic:{} client:{}",
group, topic, session.getClient());
}
@@ -272,11 +267,11 @@ public class ClientGroupWrapper {
topic2sessionInGroupMapping.remove(topic);
subscriptions.remove(topic);
- LOGGER.info("removeSubscription remove topic success, group:{} topic:{}",
+ log.info("removeSubscription remove topic success, group:{} topic:{}",
group, topic);
}
} catch (Exception e) {
- LOGGER.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
+ log.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
e);
} finally {
this.groupLock.writeLock().unlock();
@@ -301,7 +296,7 @@ public class ClientGroupWrapper {
mqProducerWrapper.init(keyValue);
mqProducerWrapper.start();
producerStarted.compareAndSet(false, true);
- LOGGER.info("starting producer success, group:{}", group);
+ log.info("starting producer success, group:{}", group);
}
public synchronized void shutdownProducer() throws Exception {
@@ -310,7 +305,7 @@ public class ClientGroupWrapper {
}
mqProducerWrapper.shutdown();
producerStarted.compareAndSet(true, false);
- LOGGER.info("shutdown producer success for group:{}", group);
+ log.info("shutdown producer success for group:{}", group);
}
public String getGroup() {
@@ -326,7 +321,7 @@ public class ClientGroupWrapper {
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
- LOGGER.error("addGroupConsumerSession param error,session:{}", session);
+ log.error("addGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -336,13 +331,13 @@ public class ClientGroupWrapper {
r = groupConsumerSessions.add(session);
if (r) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("addGroupConsumerSession success, group:{} client:{}", group,
+ if (log.isInfoEnabled()) {
+ log.info("addGroupConsumerSession success, group:{} client:{}", group,
session.getClient());
}
}
} catch (Exception e) {
- LOGGER.error("addGroupConsumerSession error! group:{} client:{}", group,
+ log.error("addGroupConsumerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
@@ -355,7 +350,7 @@ public class ClientGroupWrapper {
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
- LOGGER.error("addGroupProducerSession param error,session:{}", session);
+ log.error("addGroupProducerSession param error,session:{}", session);
return false;
}
@@ -365,11 +360,11 @@ public class ClientGroupWrapper {
r = groupProducerSessions.add(session);
if (r) {
- LOGGER.info("addGroupProducerSession success, group:{} client:{}", group,
+ log.info("addGroupProducerSession success, group:{} client:{}", group,
session.getClient());
}
} catch (Exception e) {
- LOGGER.error("addGroupProducerSession error! group:{} client:{}", group,
+ log.error("addGroupProducerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
@@ -382,7 +377,7 @@ public class ClientGroupWrapper {
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
- LOGGER.error("removeGroupConsumerSession param error,session:{}", session);
+ log.error("removeGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -392,13 +387,13 @@ public class ClientGroupWrapper {
r = groupConsumerSessions.remove(session);
if (r) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("removeGroupConsumerSession success, group:{} client:{}", group,
+ if (log.isInfoEnabled()) {
+ log.info("removeGroupConsumerSession success, group:{} client:{}", group,
session.getClient());
}
}
} catch (Exception e) {
- LOGGER.error("removeGroupConsumerSession error! group:{} client:{}", group,
+ log.error("removeGroupConsumerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
@@ -410,7 +405,7 @@ public class ClientGroupWrapper {
if (session == null
|| !StringUtils.equalsIgnoreCase(group,
EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
- LOGGER.error("removeGroupProducerSession param error,session:{}", session);
+ log.error("removeGroupProducerSession param error,session:{}", session);
return false;
}
@@ -420,11 +415,11 @@ public class ClientGroupWrapper {
r = groupProducerSessions.remove(session);
if (r) {
- LOGGER.info("removeGroupProducerSession success, group:{} client:{}", group,
+ log.info("removeGroupProducerSession success, group:{} client:{}", group,
session.getClient());
}
} catch (Exception e) {
- LOGGER.error("removeGroupProducerSession error! group:{} client:{}", group,
+ log.error("removeGroupProducerSession error! group:{} client:{}", group,
session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
@@ -486,7 +481,7 @@ public class ClientGroupWrapper {
EventMeshConstants.EVENTMESH_SEND_BACK_IP);
}
- LOGGER.error(
+ log.error(
"found no session to downstream msg,groupName:{}, topic:{}, "
+ "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
group, topic, bizSeqNo, sendBackTimes,
@@ -494,7 +489,7 @@ public class ClientGroupWrapper {
if (sendBackTimes >= eventMeshTCPServer
.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
- LOGGER.error(
+ log.error(
"sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+ "bizSeqNo:{}", eventMeshTCPServer
.getEventMeshTCPConfiguration()
@@ -510,7 +505,7 @@ public class ClientGroupWrapper {
sendMsgBackToBroker(event, bizSeqNo);
}
} catch (Exception e) {
- LOGGER.warn("handle msg exception when no session found", e);
+ log.warn("handle msg exception when no session found", e);
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -533,8 +528,8 @@ public class ClientGroupWrapper {
persistentMsgConsumer.registerEventListener(listener);
inited4Persistent.compareAndSet(false, true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("init persistentMsgConsumer success, group:{}", group);
+ if (log.isInfoEnabled()) {
+ log.info("init persistentMsgConsumer success, group:{}", group);
}
}
@@ -544,8 +539,8 @@ public class ClientGroupWrapper {
}
persistentMsgConsumer.start();
started4Persistent.compareAndSet(false, true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("starting persistentMsgConsumer success, group:{}", group);
+ if (log.isInfoEnabled()) {
+ log.info("starting persistentMsgConsumer success, group:{}", group);
}
}
@@ -583,8 +578,8 @@ public class ClientGroupWrapper {
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
(EventMeshAsyncConsumeContext) context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("found no session to downstream broadcast msg");
+ if (log.isWarnEnabled()) {
+ log.warn("found no session to downstream broadcast msg");
}
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
@@ -602,8 +597,8 @@ public class ClientGroupWrapper {
Session session = sessionsItr.next();
if (!session.isAvailable(topic)) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("downstream broadcast msg,session is not available,client:{}",
+ if (log.isWarnEnabled()) {
+ log.warn("downstream broadcast msg,session is not available,client:{}",
session.getClient());
}
continue;
@@ -632,8 +627,8 @@ public class ClientGroupWrapper {
broadCastMsgConsumer.registerEventListener(listener);
inited4Broadcast.compareAndSet(false, true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("init broadCastMsgConsumer success, group:{}", group);
+ if (log.isInfoEnabled()) {
+ log.info("init broadCastMsgConsumer success, group:{}", group);
}
}
@@ -643,7 +638,7 @@ public class ClientGroupWrapper {
}
broadCastMsgConsumer.start();
started4Broadcast.compareAndSet(false, true);
- LOGGER.info("starting broadCastMsgConsumer success, group:{}", group);
+ log.info("starting broadCastMsgConsumer success, group:{}", group);
}
public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
@@ -665,8 +660,8 @@ public class ClientGroupWrapper {
public synchronized void shutdownBroadCastConsumer() throws Exception {
if (started4Broadcast.get()) {
broadCastMsgConsumer.shutdown();
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("broadcast consumer group:{} shutdown...", group);
+ if (log.isInfoEnabled()) {
+ log.info("broadcast consumer group:{} shutdown...", group);
}
}
started4Broadcast.compareAndSet(true, false);
@@ -678,8 +673,8 @@ public class ClientGroupWrapper {
if (started4Persistent.get()) {
persistentMsgConsumer.shutdown();
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("persistent consumer group:{} shutdown...", group);
+ if (log.isInfoEnabled()) {
+ log.info("persistent consumer group:{} shutdown...", group);
}
}
started4Persistent.compareAndSet(true, false);
@@ -739,8 +734,8 @@ public class ClientGroupWrapper {
HttpTinyClient.HttpResult result = null;
try {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl, msg);
+ if (log.isInfoEnabled()) {
+ log.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl, msg);
}
List<String> paramValues = new ArrayList<String>();
paramValues.add(EventMeshConstants.MANAGE_MSG);
@@ -755,7 +750,7 @@ public class ClientGroupWrapper {
StandardCharsets.UTF_8.name(),
3000);
} catch (Exception e) {
- LOGGER.error("httpPost " + targetUrl + " is fail,", e);
+ log.error("httpPost " + targetUrl + " is fail,", e);
throw e;
}
@@ -775,8 +770,8 @@ public class ClientGroupWrapper {
private void sendMsgBackToBroker(CloudEvent event, String bizSeqNo) throws Exception {
try {
String topic = event.getSubject();
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
+ if (log.isWarnEnabled()) {
+ log.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
}
long startTime = System.currentTimeMillis();
@@ -786,8 +781,8 @@ public class ClientGroupWrapper {
@Override
public void onSuccess(SendResult sendResult) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
+ if (log.isInfoEnabled()) {
+ log.info(
"group:{} consume fail, sendMessageBack success, bizSeqno:{}, "
+ "topic:{}", group, bizSeqNo, topic);
}
@@ -795,8 +790,8 @@ public class ClientGroupWrapper {
@Override
public void onException(OnExceptionContext context) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn(
+ if (log.isWarnEnabled()) {
+ log.warn(
"group:{} consume fail, sendMessageBack fail, bizSeqno:{},"
+ " topic:{}", group, bizSeqNo, topic);
}
@@ -805,8 +800,8 @@ public class ClientGroupWrapper {
});
eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
} catch (Exception e) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("try send msg back to broker failed");
+ if (log.isWarnEnabled()) {
+ log.warn("try send msg back to broker failed");
}
throw e;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org