You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/28 02:41:53 UTC
[incubator-eventmesh] branch master updated: ISSUE(1708): add NPE exception check
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a290e7e8 ISSUE(1708): add NPE exception check
new ff46266d Merge pull request #1978 from NewtonVan/dev-1708
a290e7e8 is described below
commit a290e7e86fb602ca4f7a617690e1c05a300debdb
Author: idi0tn3 <26...@qq.com>
AuthorDate: Thu Oct 27 20:34:46 2022 +0800
ISSUE(1708): add NPE exception check
- Most of refactoring adapt "fail fast" style.
- Exception check on line 252 is quite different, null cgw won't cause
NPE, and it still throw the exception for making `flag` var be false.
---
.../client/group/ClientSessionGroupMapping.java | 46 +++++++++++-----------
1 file changed, 23 insertions(+), 23 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 8a184d50..9ca76165 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -249,7 +249,7 @@ public class ClientSessionGroupMapping {
logger.info("readySession session[{}]", session);
ClientGroupWrapper cgw = session.getClientGroupWrapper().get();
- boolean flag = cgw.addGroupConsumerSession(session);
+ boolean flag = cgw != null && cgw.addGroupConsumerSession(session);
if (!flag) {
throw new Exception("addGroupConsumerSession fail");
}
@@ -266,13 +266,13 @@ public class ClientSessionGroupMapping {
private void cleanClientGroupWrapperByCloseSub(Session session) throws Exception {
cleanSubscriptionInSession(session);
- session.getClientGroupWrapper().get().removeGroupConsumerSession(session);
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).removeGroupConsumerSession(session);
handleUnackMsgsInSession(session);
cleanClientGroupWrapperCommon(session);
}
private void cleanClientGroupWrapperByClosePub(Session session) throws Exception {
- session.getClientGroupWrapper().get().removeGroupProducerSession(session);
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).removeGroupProducerSession(session);
cleanClientGroupWrapperCommon(session);
}
@@ -283,9 +283,9 @@ public class ClientSessionGroupMapping {
*/
private void cleanSubscriptionInSession(Session session) throws Exception {
for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
- session.getClientGroupWrapper().get().removeSubscription(item, session);
- if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) {
- session.getClientGroupWrapper().get().unsubscribe(item);
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).removeSubscription(item, session);
+ if (!Objects.requireNonNull(session.getClientGroupWrapper().get()).hasSubscription(item.getTopic())) {
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).unsubscribe(item);
}
}
}
@@ -297,7 +297,7 @@ public class ClientSessionGroupMapping {
*/
private void handleUnackMsgsInSession(Session session) {
ConcurrentHashMap<String /** seq */, DownStreamMsgContext> unAckMsg = session.getPusher().getUnAckMsg();
- if (unAckMsg.size() > 0 && session.getClientGroupWrapper().get().getGroupConsumerSessions().size() > 0) {
+ if (unAckMsg.size() > 0 && Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() > 0) {
for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
DownStreamMsgContext downStreamMsgContext = entry.getValue();
if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
@@ -306,8 +306,8 @@ public class ClientSessionGroupMapping {
session.getClient());
continue;
}
- Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy()
- .select(session.getClientGroupWrapper().get().getGroup(),
+ Session reChooseSession = Objects.requireNonNull(session.getClientGroupWrapper().get()).getDownstreamDispatchStrategy()
+ .select(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup(),
downStreamMsgContext.event.getSubject(),
Objects.requireNonNull(session.getClientGroupWrapper().get()).groupConsumerSessions);
if (reChooseSession != null) {
@@ -326,37 +326,37 @@ public class ClientSessionGroupMapping {
private void cleanClientGroupWrapperCommon(Session session) throws Exception {
logger.info("GroupConsumerSessions size:{}",
- session.getClientGroupWrapper().get().getGroupConsumerSessions().size());
- if (session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) {
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size());
+ if (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0) {
shutdownClientGroupConsumer(session);
}
logger.info("GroupProducerSessions size:{}",
- session.getClientGroupWrapper().get().getGroupProducerSessions().size());
- if ((session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0)
- && (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) {
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size());
+ if ((Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0)
+ && (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size() == 0)) {
shutdownClientGroupProducer(session);
- clientGroupMap.remove(session.getClientGroupWrapper().get().getGroup());
- lockMap.remove(session.getClientGroupWrapper().get().getGroup());
- logger.info("remove clientGroupWrapper group[{}]", session.getClientGroupWrapper().get().getGroup());
+ clientGroupMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
+ lockMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
+ logger.info("remove clientGroupWrapper group[{}]", Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
}
}
private void shutdownClientGroupConsumer(Session session) throws Exception {
- if (session.getClientGroupWrapper().get().started4Broadcast.get() == Boolean.TRUE) {
- session.getClientGroupWrapper().get().shutdownBroadCastConsumer();
+ if (Objects.requireNonNull(session.getClientGroupWrapper().get()).started4Broadcast.get() == Boolean.TRUE) {
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).shutdownBroadCastConsumer();
}
- if (session.getClientGroupWrapper().get().started4Persistent.get() == Boolean.TRUE) {
- session.getClientGroupWrapper().get().shutdownPersistentConsumer();
+ if (Objects.requireNonNull(session.getClientGroupWrapper().get()).started4Persistent.get() == Boolean.TRUE) {
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).shutdownPersistentConsumer();
}
}
private void shutdownClientGroupProducer(Session session) throws Exception {
- if (session.getClientGroupWrapper().get().producerStarted.get() == Boolean.TRUE) {
- session.getClientGroupWrapper().get().shutdownProducer();
+ if (Objects.requireNonNull(session.getClientGroupWrapper().get()).producerStarted.get() == Boolean.TRUE) {
+ Objects.requireNonNull(session.getClientGroupWrapper().get()).shutdownProducer();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org