You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/28 02:41:53 UTC

[incubator-eventmesh] branch master updated: ISSUE(1708): add NPE exception check

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a290e7e8 ISSUE(1708): add NPE exception check
     new ff46266d Merge pull request #1978 from NewtonVan/dev-1708
a290e7e8 is described below

commit a290e7e86fb602ca4f7a617690e1c05a300debdb
Author: idi0tn3 <26...@qq.com>
AuthorDate: Thu Oct 27 20:34:46 2022 +0800

    ISSUE(1708): add NPE exception check
    
    - Most of refactoring adapt "fail fast" style.
    - Exception check on line 252 is quite different, null cgw won't cause
      NPE, and it still throw the exception for making `flag` var be false.
---
 .../client/group/ClientSessionGroupMapping.java    | 46 +++++++++++-----------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 8a184d50..9ca76165 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -249,7 +249,7 @@ public class ClientSessionGroupMapping {
             logger.info("readySession session[{}]", session);
             ClientGroupWrapper cgw = session.getClientGroupWrapper().get();
 
-            boolean flag = cgw.addGroupConsumerSession(session);
+            boolean flag = cgw != null && cgw.addGroupConsumerSession(session);
             if (!flag) {
                 throw new Exception("addGroupConsumerSession fail");
             }
@@ -266,13 +266,13 @@ public class ClientSessionGroupMapping {
 
     private void cleanClientGroupWrapperByCloseSub(Session session) throws Exception {
         cleanSubscriptionInSession(session);
-        session.getClientGroupWrapper().get().removeGroupConsumerSession(session);
+        Objects.requireNonNull(session.getClientGroupWrapper().get()).removeGroupConsumerSession(session);
         handleUnackMsgsInSession(session);
         cleanClientGroupWrapperCommon(session);
     }
 
     private void cleanClientGroupWrapperByClosePub(Session session) throws Exception {
-        session.getClientGroupWrapper().get().removeGroupProducerSession(session);
+        Objects.requireNonNull(session.getClientGroupWrapper().get()).removeGroupProducerSession(session);
         cleanClientGroupWrapperCommon(session);
     }
 
@@ -283,9 +283,9 @@ public class ClientSessionGroupMapping {
      */
     private void cleanSubscriptionInSession(Session session) throws Exception {
         for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
-            session.getClientGroupWrapper().get().removeSubscription(item, session);
-            if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) {
-                session.getClientGroupWrapper().get().unsubscribe(item);
+            Objects.requireNonNull(session.getClientGroupWrapper().get()).removeSubscription(item, session);
+            if (!Objects.requireNonNull(session.getClientGroupWrapper().get()).hasSubscription(item.getTopic())) {
+                Objects.requireNonNull(session.getClientGroupWrapper().get()).unsubscribe(item);
             }
         }
     }
@@ -297,7 +297,7 @@ public class ClientSessionGroupMapping {
      */
     private void handleUnackMsgsInSession(Session session) {
         ConcurrentHashMap<String /** seq */, DownStreamMsgContext> unAckMsg = session.getPusher().getUnAckMsg();
-        if (unAckMsg.size() > 0 && session.getClientGroupWrapper().get().getGroupConsumerSessions().size() > 0) {
+        if (unAckMsg.size() > 0 && Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() > 0) {
             for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
                 DownStreamMsgContext downStreamMsgContext = entry.getValue();
                 if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
@@ -306,8 +306,8 @@ public class ClientSessionGroupMapping {
                             session.getClient());
                     continue;
                 }
-                Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy()
-                        .select(session.getClientGroupWrapper().get().getGroup(),
+                Session reChooseSession = Objects.requireNonNull(session.getClientGroupWrapper().get()).getDownstreamDispatchStrategy()
+                        .select(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup(),
                                 downStreamMsgContext.event.getSubject(),
                                 Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions);
                 if (reChooseSession != null) {
@@ -326,37 +326,37 @@ public class ClientSessionGroupMapping {
 
     private void cleanClientGroupWrapperCommon(Session session) throws Exception {
         logger.info("GroupConsumerSessions size:{}",
-                session.getClientGroupWrapper().get().getGroupConsumerSessions().size());
-        if (session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) {
+                Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size());
+        if (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0) {
             shutdownClientGroupConsumer(session);
         }
 
         logger.info("GroupProducerSessions size:{}",
-                session.getClientGroupWrapper().get().getGroupProducerSessions().size());
-        if ((session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0)
-                && (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) {
+                Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size());
+        if ((Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0)
+                && (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size() == 0)) {
             shutdownClientGroupProducer(session);
 
-            clientGroupMap.remove(session.getClientGroupWrapper().get().getGroup());
-            lockMap.remove(session.getClientGroupWrapper().get().getGroup());
-            logger.info("remove clientGroupWrapper group[{}]", session.getClientGroupWrapper().get().getGroup());
+            clientGroupMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
+            lockMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
+            logger.info("remove clientGroupWrapper group[{}]", Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
         }
     }
 
     private void shutdownClientGroupConsumer(Session session) throws Exception {
-        if (session.getClientGroupWrapper().get().started4Broadcast.get() == Boolean.TRUE) {
-            session.getClientGroupWrapper().get().shutdownBroadCastConsumer();
+        if (Objects.requireNonNull(session.getClientGroupWrapper().get()).started4Broadcast.get() == Boolean.TRUE) {
+            Objects.requireNonNull(session.getClientGroupWrapper().get()).shutdownBroadCastConsumer();
         }
 
-        if (session.getClientGroupWrapper().get().started4Persistent.get() == Boolean.TRUE) {
-            session.getClientGroupWrapper().get().shutdownPersistentConsumer();
+        if (Objects.requireNonNull(session.getClientGroupWrapper().get()).started4Persistent.get() == Boolean.TRUE) {
+            Objects.requireNonNull(session.getClientGroupWrapper().get()).shutdownPersistentConsumer();
         }
     }
 
 
     private void shutdownClientGroupProducer(Session session) throws Exception {
-        if (session.getClientGroupWrapper().get().producerStarted.get() == Boolean.TRUE) {
-            session.getClientGroupWrapper().get().shutdownProducer();
+        if (Objects.requireNonNull(session.getClientGroupWrapper().get()).producerStarted.get() == Boolean.TRUE) {
+            Objects.requireNonNull(session.getClientGroupWrapper().get()).shutdownProducer();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org