You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/01/01 03:50:44 UTC

[incubator-eventmesh] branch master updated: simplify code

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a304bdd9 simplify code
     new db9db3eee Merge pull request #2753 from weihubeats/ClientGroupWrapper
7a304bdd9 is described below

commit 7a304bdd973a853143f22c768295d4c4e3fefe4a
Author: weihu <we...@163.com>
AuthorDate: Sat Dec 31 15:45:33 2022 +0800

    simplify code
---
 .../tcp/client/group/ClientGroupWrapper.java       | 147 ++++++++++-----------
 1 file changed, 71 insertions(+), 76 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 944dc3b0c..4e91a3d2d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -47,8 +47,6 @@ import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
-
-
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -62,28 +60,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
 import io.opentelemetry.api.trace.Span;
 
 import com.google.common.base.Preconditions;
 
+import lombok.extern.slf4j.Slf4j;
 
 
+@Slf4j
 public class ClientGroupWrapper {
-
-    public static final Logger LOGGER = LoggerFactory.getLogger(ClientGroupWrapper.class);
-
-    private String sysId;
+    
+    private final String sysId;
 
     private String group;
 
     private EventMeshTCPConfiguration eventMeshTCPConfiguration;
 
-    private EventMeshTCPServer eventMeshTCPServer;
+    private final EventMeshTCPServer eventMeshTCPServer;
 
     private EventMeshTcpRetryer eventMeshTcpRetryer;
 
@@ -109,14 +104,14 @@ public class ClientGroupWrapper {
 
     private MQConsumerWrapper broadCastMsgConsumer;
 
-    private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
+    private final ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
             new ConcurrentHashMap<String, Set<Session>>();
 
-    private ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();
 
     public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
 
-    private MQProducerWrapper mqProducerWrapper;
+    private final MQProducerWrapper mqProducerWrapper;
 
     public ClientGroupWrapper(String sysId, String group,
                               EventMeshTCPServer eventMeshTCPServer,
@@ -148,7 +143,7 @@ public class ClientGroupWrapper {
             this.groupLock.readLock().lockInterruptibly();
             has = topic2sessionInGroupMapping.containsKey(topic);
         } catch (Exception e) {
-            LOGGER.error("hasSubscription error! topic[{}]", topic);
+            log.error("hasSubscription error! topic[{}]", topic);
         } finally {
             this.groupLock.readLock().unlock();
         }
@@ -179,7 +174,7 @@ public class ClientGroupWrapper {
             public void onException(OnExceptionContext context) {
                 String bizSeqNo = (String) upStreamMsgContext.getEvent()
                         .getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
-                LOGGER.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
+                log.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
                         upStreamMsgContext.getEvent().getSubject(), bizSeqNo,
                         upStreamMsgContext.getSession().getClient(), context.getException());
             }
@@ -194,17 +189,17 @@ public class ClientGroupWrapper {
     public boolean addSubscription(SubscriptionItem subscriptionItem, Session session)
             throws Exception {
         if (subscriptionItem == null) {
-            LOGGER.error("addSubscription param error,subscriptionItem is null", session);
+            log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
             return false;
         }
         String topic = subscriptionItem.getTopic();
         if (session == null || !StringUtils.equalsIgnoreCase(group,
                 EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
-            LOGGER.error("addSubscription param error,topic:{},session:{}", topic, session);
+            log.error("addSubscription param error,topic:{},session:{}", topic, session);
             return false;
         }
 
-        boolean r = false;
+        boolean r;
         try {
             this.groupLock.writeLock().lockInterruptibly();
             if (!topic2sessionInGroupMapping.containsKey(topic)) {
@@ -214,20 +209,20 @@ public class ClientGroupWrapper {
             r = topic2sessionInGroupMapping.get(topic).add(session);
             if (r) {
 
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("addSubscription success, group:{} topic:{} client:{}", group,
+                if (log.isInfoEnabled()) {
+                    log.info("addSubscription success, group:{} topic:{} client:{}", group,
                             topic, session.getClient());
                 }
             } else {
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
+                if (log.isWarnEnabled()) {
+                    log.warn("addSubscription fail, group:{} topic:{} client:{}", group, topic,
                             session.getClient());
                 }
             }
 
             subscriptions.putIfAbsent(topic, subscriptionItem);
         } catch (Exception e) {
-            LOGGER.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+            log.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
             throw new Exception("addSubscription fail");
         } finally {
             this.groupLock.writeLock().unlock();
@@ -237,14 +232,14 @@ public class ClientGroupWrapper {
 
     public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) {
         if (subscriptionItem == null) {
-            LOGGER.error("addSubscription param error,subscriptionItem is null", session);
+            log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
             return false;
         }
         String topic = subscriptionItem.getTopic();
         if (session == null
                 || !StringUtils.equalsIgnoreCase(group,
                 EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
-            LOGGER.error("removeSubscription param error,topic:{},session:{}", topic, session);
+            log.error("removeSubscription param error,topic:{},session:{}", topic, session);
             return false;
         }
 
@@ -255,14 +250,14 @@ public class ClientGroupWrapper {
                 r = topic2sessionInGroupMapping.get(topic).remove(session);
                 if (r) {
 
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info(
+                    if (log.isInfoEnabled()) {
+                        log.info(
                                 "removeSubscription remove session success, group:{} topic:{} client:{}",
                                 group, topic, session.getClient());
                     }
                 } else {
-                    if (LOGGER.isWarnEnabled()) {
-                        LOGGER.warn(
+                    if (log.isWarnEnabled()) {
+                        log.warn(
                                 "removeSubscription remove session failed, group:{} topic:{} client:{}",
                                 group, topic, session.getClient());
                     }
@@ -272,11 +267,11 @@ public class ClientGroupWrapper {
                 topic2sessionInGroupMapping.remove(topic);
                 subscriptions.remove(topic);
 
-                LOGGER.info("removeSubscription remove topic success, group:{} topic:{}",
+                log.info("removeSubscription remove topic success, group:{} topic:{}",
                         group, topic);
             }
         } catch (Exception e) {
-            LOGGER.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
+            log.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
                     e);
         } finally {
             this.groupLock.writeLock().unlock();
@@ -301,7 +296,7 @@ public class ClientGroupWrapper {
         mqProducerWrapper.init(keyValue);
         mqProducerWrapper.start();
         producerStarted.compareAndSet(false, true);
-        LOGGER.info("starting producer success, group:{}", group);
+        log.info("starting producer success, group:{}", group);
     }
 
     public synchronized void shutdownProducer() throws Exception {
@@ -310,7 +305,7 @@ public class ClientGroupWrapper {
         }
         mqProducerWrapper.shutdown();
         producerStarted.compareAndSet(true, false);
-        LOGGER.info("shutdown producer success for group:{}", group);
+        log.info("shutdown producer success for group:{}", group);
     }
 
     public String getGroup() {
@@ -326,7 +321,7 @@ public class ClientGroupWrapper {
                 || !StringUtils.equalsIgnoreCase(group,
                 EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
 
-            LOGGER.error("addGroupConsumerSession param error,session:{}", session);
+            log.error("addGroupConsumerSession param error,session:{}", session);
             return false;
         }
 
@@ -336,13 +331,13 @@ public class ClientGroupWrapper {
             r = groupConsumerSessions.add(session);
             if (r) {
 
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("addGroupConsumerSession success, group:{} client:{}", group,
+                if (log.isInfoEnabled()) {
+                    log.info("addGroupConsumerSession success, group:{} client:{}", group,
                             session.getClient());
                 }
             }
         } catch (Exception e) {
-            LOGGER.error("addGroupConsumerSession error! group:{} client:{}", group,
+            log.error("addGroupConsumerSession error! group:{} client:{}", group,
                     session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
@@ -355,7 +350,7 @@ public class ClientGroupWrapper {
                 || !StringUtils.equalsIgnoreCase(group,
                 EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
 
-            LOGGER.error("addGroupProducerSession param error,session:{}", session);
+            log.error("addGroupProducerSession param error,session:{}", session);
             return false;
         }
 
@@ -365,11 +360,11 @@ public class ClientGroupWrapper {
             r = groupProducerSessions.add(session);
             if (r) {
 
-                LOGGER.info("addGroupProducerSession success, group:{} client:{}", group,
+                log.info("addGroupProducerSession success, group:{} client:{}", group,
                         session.getClient());
             }
         } catch (Exception e) {
-            LOGGER.error("addGroupProducerSession error! group:{} client:{}", group,
+            log.error("addGroupProducerSession error! group:{} client:{}", group,
                     session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
@@ -382,7 +377,7 @@ public class ClientGroupWrapper {
                 || !StringUtils.equalsIgnoreCase(group,
                 EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
 
-            LOGGER.error("removeGroupConsumerSession param error,session:{}", session);
+            log.error("removeGroupConsumerSession param error,session:{}", session);
             return false;
         }
 
@@ -392,13 +387,13 @@ public class ClientGroupWrapper {
             r = groupConsumerSessions.remove(session);
             if (r) {
 
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("removeGroupConsumerSession success, group:{} client:{}", group,
+                if (log.isInfoEnabled()) {
+                    log.info("removeGroupConsumerSession success, group:{} client:{}", group,
                             session.getClient());
                 }
             }
         } catch (Exception e) {
-            LOGGER.error("removeGroupConsumerSession error! group:{} client:{}", group,
+            log.error("removeGroupConsumerSession error! group:{} client:{}", group,
                     session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
@@ -410,7 +405,7 @@ public class ClientGroupWrapper {
         if (session == null
                 || !StringUtils.equalsIgnoreCase(group,
                 EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
-            LOGGER.error("removeGroupProducerSession param error,session:{}", session);
+            log.error("removeGroupProducerSession param error,session:{}", session);
             return false;
         }
 
@@ -420,11 +415,11 @@ public class ClientGroupWrapper {
             r = groupProducerSessions.remove(session);
             if (r) {
 
-                LOGGER.info("removeGroupProducerSession success, group:{} client:{}", group,
+                log.info("removeGroupProducerSession success, group:{} client:{}", group,
                         session.getClient());
             }
         } catch (Exception e) {
-            LOGGER.error("removeGroupProducerSession error! group:{} client:{}", group,
+            log.error("removeGroupProducerSession error! group:{} client:{}", group,
                     session.getClient(), e);
         } finally {
             this.groupLock.writeLock().unlock();
@@ -486,7 +481,7 @@ public class ClientGroupWrapper {
                                     EventMeshConstants.EVENTMESH_SEND_BACK_IP);
                         }
 
-                        LOGGER.error(
+                        log.error(
                                 "found no session to downstream msg,groupName:{}, topic:{}, "
                                         + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
                                 group, topic, bizSeqNo, sendBackTimes,
@@ -494,7 +489,7 @@ public class ClientGroupWrapper {
 
                         if (sendBackTimes >= eventMeshTCPServer
                                 .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
-                            LOGGER.error(
+                            log.error(
                                     "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
                                             + "bizSeqNo:{}", eventMeshTCPServer
                                             .getEventMeshTCPConfiguration()
@@ -510,7 +505,7 @@ public class ClientGroupWrapper {
                             sendMsgBackToBroker(event, bizSeqNo);
                         }
                     } catch (Exception e) {
-                        LOGGER.warn("handle msg exception when no session found", e);
+                        log.warn("handle msg exception when no session found", e);
                     }
 
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
@@ -533,8 +528,8 @@ public class ClientGroupWrapper {
         persistentMsgConsumer.registerEventListener(listener);
 
         inited4Persistent.compareAndSet(false, true);
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("init persistentMsgConsumer success, group:{}", group);
+        if (log.isInfoEnabled()) {
+            log.info("init persistentMsgConsumer success, group:{}", group);
         }
     }
 
@@ -544,8 +539,8 @@ public class ClientGroupWrapper {
         }
         persistentMsgConsumer.start();
         started4Persistent.compareAndSet(false, true);
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("starting persistentMsgConsumer success, group:{}", group);
+        if (log.isInfoEnabled()) {
+            log.info("starting persistentMsgConsumer success, group:{}", group);
         }
     }
 
@@ -583,8 +578,8 @@ public class ClientGroupWrapper {
                 EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
                         (EventMeshAsyncConsumeContext) context;
                 if (CollectionUtils.isEmpty(groupConsumerSessions)) {
-                    if (LOGGER.isWarnEnabled()) {
-                        LOGGER.warn("found no session to downstream broadcast msg");
+                    if (log.isWarnEnabled()) {
+                        log.warn("found no session to downstream broadcast msg");
                     }
                     eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                     return;
@@ -602,8 +597,8 @@ public class ClientGroupWrapper {
                     Session session = sessionsItr.next();
 
                     if (!session.isAvailable(topic)) {
-                        if (LOGGER.isWarnEnabled()) {
-                            LOGGER.warn("downstream broadcast msg,session is not available,client:{}",
+                        if (log.isWarnEnabled()) {
+                            log.warn("downstream broadcast msg,session is not available,client:{}",
                                     session.getClient());
                         }
                         continue;
@@ -632,8 +627,8 @@ public class ClientGroupWrapper {
         broadCastMsgConsumer.registerEventListener(listener);
 
         inited4Broadcast.compareAndSet(false, true);
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("init broadCastMsgConsumer success, group:{}", group);
+        if (log.isInfoEnabled()) {
+            log.info("init broadCastMsgConsumer success, group:{}", group);
         }
     }
 
@@ -643,7 +638,7 @@ public class ClientGroupWrapper {
         }
         broadCastMsgConsumer.start();
         started4Broadcast.compareAndSet(false, true);
-        LOGGER.info("starting broadCastMsgConsumer success, group:{}", group);
+        log.info("starting broadCastMsgConsumer success, group:{}", group);
     }
 
     public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
@@ -665,8 +660,8 @@ public class ClientGroupWrapper {
     public synchronized void shutdownBroadCastConsumer() throws Exception {
         if (started4Broadcast.get()) {
             broadCastMsgConsumer.shutdown();
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("broadcast consumer group:{} shutdown...", group);
+            if (log.isInfoEnabled()) {
+                log.info("broadcast consumer group:{} shutdown...", group);
             }
         }
         started4Broadcast.compareAndSet(true, false);
@@ -678,8 +673,8 @@ public class ClientGroupWrapper {
 
         if (started4Persistent.get()) {
             persistentMsgConsumer.shutdown();
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("persistent consumer group:{} shutdown...", group);
+            if (log.isInfoEnabled()) {
+                log.info("persistent consumer group:{} shutdown...", group);
             }
         }
         started4Persistent.compareAndSet(true, false);
@@ -739,8 +734,8 @@ public class ClientGroupWrapper {
         HttpTinyClient.HttpResult result = null;
 
         try {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl, msg);
+            if (log.isInfoEnabled()) {
+                log.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl, msg);
             }
             List<String> paramValues = new ArrayList<String>();
             paramValues.add(EventMeshConstants.MANAGE_MSG);
@@ -755,7 +750,7 @@ public class ClientGroupWrapper {
                     StandardCharsets.UTF_8.name(),
                     3000);
         } catch (Exception e) {
-            LOGGER.error("httpPost " + targetUrl + " is fail,", e);
+            log.error("httpPost " + targetUrl + " is fail,", e);
             throw e;
         }
 
@@ -775,8 +770,8 @@ public class ClientGroupWrapper {
     private void sendMsgBackToBroker(CloudEvent event, String bizSeqNo) throws Exception {
         try {
             String topic = event.getSubject();
-            if (LOGGER.isWarnEnabled()) {
-                LOGGER.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
+            if (log.isWarnEnabled()) {
+                log.warn("send msg back to broker, bizSeqno:{}, topic:{}", bizSeqNo, topic);
             }
 
             long startTime = System.currentTimeMillis();
@@ -786,8 +781,8 @@ public class ClientGroupWrapper {
                         @Override
                         public void onSuccess(SendResult sendResult) {
 
-                            if (LOGGER.isInfoEnabled()) {
-                                LOGGER.info(
+                            if (log.isInfoEnabled()) {
+                                log.info(
                                         "group:{} consume fail, sendMessageBack success, bizSeqno:{}, "
                                                 + "topic:{}", group, bizSeqNo, topic);
                             }
@@ -795,8 +790,8 @@ public class ClientGroupWrapper {
 
                         @Override
                         public void onException(OnExceptionContext context) {
-                            if (LOGGER.isWarnEnabled()) {
-                                LOGGER.warn(
+                            if (log.isWarnEnabled()) {
+                                log.warn(
                                         "group:{} consume fail, sendMessageBack fail, bizSeqno:{},"
                                                 + " topic:{}", group, bizSeqNo, topic);
                             }
@@ -805,8 +800,8 @@ public class ClientGroupWrapper {
                     });
             eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
         } catch (Exception e) {
-            if (LOGGER.isWarnEnabled()) {
-                LOGGER.warn("try send msg back to broker failed");
+            if (log.isWarnEnabled()) {
+                log.warn("try send msg back to broker failed");
             }
             throw e;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org