You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/25 10:31:33 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #397]Remove subscription session failed error (#398)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new ce7fc49  [ISSUE #397]Remove subscription session failed error (#398)
ce7fc49 is described below

commit ce7fc49ce0b33f69a1ad3bca9f2b96ed9a1b32d4
Author: mike_xwm <mi...@126.com>
AuthorDate: Fri Jun 25 18:31:27 2021 +0800

    [ISSUE #397]Remove subscription session failed error (#398)
    
    * [ISSUE #325]Update gradle configuration for publishing package to maven repository
    
    * update build.gradle
    
    * update build.gradle and gradle.properties
    
    * update build.gradle and gradle.properties for publish to maven repository
    
    * * update gradle version for instructions
    * fix: dist task exception
    
    * [ISSUE #329]Missing Log4j dependency
    
    * update eventmesh-runtime.png
    
    * support unsubscribe topics while delconsumer in http mode
    
    * [ISSUE #397]Remove subscription session failed error
    
    * [ISSUE #397]Remove subscription session failed error
    close #397
---
 .../core/protocol/tcp/client/group/ClientGroupWrapper.java   |  8 ++++----
 .../runtime/core/protocol/tcp/client/session/Session.java    | 12 ++++++------
 .../org/apache/eventmesh/client/tcp/common/MessageUtils.java |  2 ++
 .../org/apache/eventmesh/tcp/common/EventMeshTestUtils.java  |  2 ++
 4 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index bbd62c0..5829e49 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -369,7 +369,7 @@ public class ClientGroupWrapper {
         keyValue.put("isBroadcast", "false");
         keyValue.put("consumerGroup", consumerGroup);
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
-        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
+        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
 
         persistentMsgConsumer.init(keyValue);
 //        persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
@@ -458,7 +458,7 @@ public class ClientGroupWrapper {
         keyValue.put("isBroadcast", "true");
         keyValue.put("consumerGroup", consumerGroup);
         keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
-        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
+        keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
         broadCastMsgConsumer.init(keyValue);
 //        broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
 //            @Override
@@ -536,7 +536,7 @@ public class ClientGroupWrapper {
                     message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                     message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
 
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
                     if (CollectionUtils.isEmpty(groupConsumerSessions)) {
                         logger.warn("found no session to downstream broadcast msg");
 //                        context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
@@ -586,7 +586,7 @@ public class ClientGroupWrapper {
                     message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                     message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
 
-                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
+                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
                     Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
                     String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
                     if (session == null) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 280ff3b..ddfced0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -326,10 +326,10 @@ public class Session {
         return true;
     }
 
-    @Override
-    public int hashCode() {
-        int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
-                + (sessionState != null ? sessionState.hashCode() : 0);
-        return code;
-    }
+//    @Override
+//    public int hashCode() {
+//        int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
+//                + (sessionState != null ? sessionState.hashCode() : 0);
+//        return code;
+//    }
 }
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 950c06b..2d6c7ca 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -99,6 +99,7 @@ public class MessageUtils {
 
     public static UserAgent generateSubClient(UserAgent agent) {
         UserAgent user = new UserAgent();
+        user.setEnv(agent.getEnv());
         user.setHost(agent.getHost());
         user.setPassword(agent.getPassword());
         user.setUsername(agent.getUsername());
@@ -116,6 +117,7 @@ public class MessageUtils {
 
     public static UserAgent generatePubClient(UserAgent agent) {
         UserAgent user = new UserAgent();
+        user.setEnv(agent.getEnv());
         user.setHost(agent.getHost());
         user.setPassword(agent.getPassword());
         user.setUsername(agent.getUsername());
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index 88578f5..88f5016 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -36,6 +36,7 @@ public class EventMeshTestUtils {
 
     public static UserAgent generateClient1() {
         UserAgent user = new UserAgent();
+        user.setEnv("test");
         user.setHost("127.0.0.1");
         user.setPassword(generateRandomString(8));
         user.setUsername("PU4283");
@@ -52,6 +53,7 @@ public class EventMeshTestUtils {
 
     public static UserAgent generateClient2() {
         UserAgent user = new UserAgent();
+        user.setEnv("test");
         user.setHost("127.0.0.1");
         user.setPassword(generateRandomString(8));
         user.setUsername("PU4283");

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org