You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/25 10:31:33 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #397]Remove
subscription session failed error (#398)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new ce7fc49 [ISSUE #397]Remove subscription session failed error (#398)
ce7fc49 is described below
commit ce7fc49ce0b33f69a1ad3bca9f2b96ed9a1b32d4
Author: mike_xwm <mi...@126.com>
AuthorDate: Fri Jun 25 18:31:27 2021 +0800
[ISSUE #397]Remove subscription session failed error (#398)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* [ISSUE #329]Missing Log4j dependency
* update eventmesh-runtime.png
* support unsubscribe topics while delconsumer in http mode
* [ISSUE #397]Remove subscription session failed error
* [ISSUE #397]Remove subscription session failed error
close #397
---
.../core/protocol/tcp/client/group/ClientGroupWrapper.java | 8 ++++----
.../runtime/core/protocol/tcp/client/session/Session.java | 12 ++++++------
.../org/apache/eventmesh/client/tcp/common/MessageUtils.java | 2 ++
.../org/apache/eventmesh/tcp/common/EventMeshTestUtils.java | 2 ++
4 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index bbd62c0..5829e49 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -369,7 +369,7 @@ public class ClientGroupWrapper {
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
persistentMsgConsumer.init(keyValue);
// persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
@@ -458,7 +458,7 @@ public class ClientGroupWrapper {
keyValue.put("isBroadcast", "true");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
// broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
// @Override
@@ -536,7 +536,7 @@ public class ClientGroupWrapper {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
logger.warn("found no session to downstream broadcast msg");
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
@@ -586,7 +586,7 @@ public class ClientGroupWrapper {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
if (session == null) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 280ff3b..ddfced0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -326,10 +326,10 @@ public class Session {
return true;
}
- @Override
- public int hashCode() {
- int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
- + (sessionState != null ? sessionState.hashCode() : 0);
- return code;
- }
+// @Override
+// public int hashCode() {
+// int code = 37 + (client != null ? client.hashCode() : 0) + (context != null ? context.hashCode() : 0)
+// + (sessionState != null ? sessionState.hashCode() : 0);
+// return code;
+// }
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 950c06b..2d6c7ca 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -99,6 +99,7 @@ public class MessageUtils {
public static UserAgent generateSubClient(UserAgent agent) {
UserAgent user = new UserAgent();
+ user.setEnv(agent.getEnv());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
@@ -116,6 +117,7 @@ public class MessageUtils {
public static UserAgent generatePubClient(UserAgent agent) {
UserAgent user = new UserAgent();
+ user.setEnv(agent.getEnv());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index 88578f5..88f5016 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -36,6 +36,7 @@ public class EventMeshTestUtils {
public static UserAgent generateClient1() {
UserAgent user = new UserAgent();
+ user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
@@ -52,6 +53,7 @@ public class EventMeshTestUtils {
public static UserAgent generateClient2() {
UserAgent user = new UserAgent();
+ user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org