You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/09 06:32:18 UTC
[incubator-eventmesh] branch master updated: fix issue2489
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 2d830c2d7 fix issue2489
new 5c26f0916 Merge pull request #2490 from jonyangx/issue2489
2d830c2d7 is described below
commit 2d830c2d700e2c97fece35a05bc3ed922ca1390d
Author: jonyangx <jo...@gmail.com>
AuthorDate: Wed Dec 7 12:18:42 2022 +0800
fix issue2489
---
.../eventmesh/runtime/demo/AsyncPubClient.java | 30 ++++++++++---------
.../eventmesh/runtime/demo/AsyncSubClient.java | 30 ++++++++++---------
.../eventmesh/runtime/demo/BroadCastPubClient.java | 14 +++++----
.../eventmesh/runtime/demo/BroadCastSubClient.java | 31 +++++++++++---------
.../apache/eventmesh/runtime/demo/CCPubClient.java | 10 ++++---
.../apache/eventmesh/runtime/demo/CCSubClient.java | 34 ++++++++++++----------
.../apache/eventmesh/runtime/demo/CClientDemo.java | 19 +++++-------
.../eventmesh/runtime/demo/SyncPubClient.java | 23 +++++++++------
.../eventmesh/runtime/demo/SyncSubClient.java | 28 ++++++++++--------
9 files changed, 122 insertions(+), 97 deletions(-)
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java
index 762cce2ef..afda4a105 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java
@@ -32,22 +32,26 @@ import io.netty.channel.ChannelHandlerContext;
public class AsyncPubClient {
- private static final Logger logger = LoggerFactory.getLogger(AsyncPubClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPubClient.class);
public static void main(String[] args) throws Exception {
- PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
- pubClient.init();
- pubClient.heartbeat();
- pubClient.registerBusiHandler(new ReceiveMsgHook() {
- @Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- logger.error("server good by request: {}", msg);
+ try (PubClientImpl pubClient =
+ new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+ pubClient.init();
+ pubClient.heartbeat();
+ pubClient.registerBusiHandler(new ReceiveMsgHook() {
+ @Override
+ public void handle(Package msg, ChannelHandlerContext ctx) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("server good by request: {}", msg);
+ }
+ }
+ });
+
+ for (int i = 0; i < 1; i++) {
+ ThreadUtils.randomSleep(0, 500);
+ pubClient.broadcast(MessageUtils.asyncMessage(ClientConstants.ASYNC_TOPIC, i), 5000);
}
- });
-
- for (int i = 0; i < 1; i++) {
- ThreadUtils.randomSleep(0, 500);
- pubClient.broadcast(MessageUtils.asyncMessage(ClientConstants.ASYNC_TOPIC, i), 5000);
}
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java
index 2783e136e..ba04f62b5 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java
@@ -33,21 +33,25 @@ import io.netty.channel.ChannelHandlerContext;
public class AsyncSubClient {
- private static final Logger logger = LoggerFactory.getLogger(AsyncSubClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSubClient.class);
public static void main(String[] args) throws Exception {
- SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
- client.init();
- client.heartbeat();
- client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
- client.registerBusiHandler(new ReceiveMsgHook() {
- @Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- if (msg.getBody() instanceof EventMeshMessage) {
- String body = ((EventMeshMessage) msg.getBody()).getBody();
- logger.error("receive message -------------------------------" + body);
+ try (SubClientImpl client =
+ new SubClientImpl("localhost", 10002, MessageUtils.generateSubServer())) {
+ client.init();
+ client.heartbeat();
+ client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+ client.registerBusiHandler(new ReceiveMsgHook() {
+ @Override
+ public void handle(Package msg, ChannelHandlerContext ctx) {
+ if (msg.getBody() instanceof EventMeshMessage) {
+ String body = ((EventMeshMessage) msg.getBody()).getBody();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("receive message -------------------------------" + body);
+ }
+ }
}
- }
- });
+ });
+ }
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java
index 5f3fa158b..7c3da75e9 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java
@@ -25,12 +25,14 @@ import org.apache.eventmesh.runtime.client.impl.PubClientImpl;
public class BroadCastPubClient {
public static void main(String[] args) throws Exception {
- PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
- pubClient.init();
- pubClient.heartbeat();
- for (int i = 0; i < 10000; i++) {
- ThreadUtils.randomSleep(0, 500);
- pubClient.broadcast(MessageUtils.broadcastMessage(ClientConstants.BROADCAST_TOPIC, i), 5000);
+ try (PubClientImpl pubClient =
+ new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+ pubClient.init();
+ pubClient.heartbeat();
+ for (int i = 0; i < 10000; i++) {
+ ThreadUtils.randomSleep(0, 500);
+ pubClient.broadcast(MessageUtils.broadcastMessage(ClientConstants.BROADCAST_TOPIC, i), 5000);
+ }
}
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java
index 8e00541dc..14acd1dff 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java
@@ -34,23 +34,26 @@ import io.netty.channel.ChannelHandlerContext;
public class BroadCastSubClient {
- private static final Logger logger = LoggerFactory.getLogger(BroadCastSubClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(BroadCastSubClient.class);
public static void main(String[] args) throws Exception {
- SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
- client.init();
- client.heartbeat();
- client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
- client.registerBusiHandler(new ReceiveMsgHook() {
- @Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- if (msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
- if (msg.getBody() instanceof EventMeshMessage) {
- String body = ((EventMeshMessage) msg.getBody()).getBody();
- logger.error("receive message -------------------------------" + body);
+ try (SubClientImpl client = new SubClientImpl("localhost", 10000, MessageUtils.generateSubServer())) {
+ client.init();
+ client.heartbeat();
+ client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
+ client.registerBusiHandler(new ReceiveMsgHook() {
+ @Override
+ public void handle(Package msg, ChannelHandlerContext ctx) {
+ if (msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
+ if (msg.getBody() instanceof EventMeshMessage) {
+ String body = ((EventMeshMessage) msg.getBody()).getBody();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("receive message -------------------------------" + body);
+ }
+ }
}
}
- }
- });
+ });
+ }
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java
index a05a24ab3..e6f68f37e 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java
@@ -25,11 +25,13 @@ import org.apache.eventmesh.runtime.client.impl.PubClientImpl;
public class CCPubClient {
public static void main(String[] args) throws Exception {
- PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
- pubClient.init();
- pubClient.heartbeat();
+ try (PubClientImpl pubClient =
+ new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+ pubClient.init();
+ pubClient.heartbeat();
- pubClient.broadcast(MessageUtils.rrMesssage(ClientConstants.ASYNC_TOPIC, 0), 5000);
+ pubClient.broadcast(MessageUtils.rrMesssage(ClientConstants.ASYNC_TOPIC, 0), 5000);
+ }
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java
index b23bc4b9b..e380e1e20 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java
@@ -33,23 +33,27 @@ import io.netty.channel.ChannelHandlerContext;
public class CCSubClient {
- private static final Logger logger = LoggerFactory.getLogger(CCSubClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(CCSubClient.class);
public static void main(String[] args) throws Exception {
- SubClientImpl subClient = new SubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
- subClient.init();
- subClient.heartbeat();
- subClient.listen();
- subClient.justSubscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
- subClient.registerBusiHandler(new ReceiveMsgHook() {
- @Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- logger.error("Received message: {}", msg);
- if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
- Package rrResponse = MessageUtils.rrResponse(msg);
- ctx.writeAndFlush(rrResponse);
+ try (SubClientImpl subClient =
+ new SubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+ subClient.init();
+ subClient.heartbeat();
+ subClient.listen();
+ subClient.justSubscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
+ subClient.registerBusiHandler(new ReceiveMsgHook() {
+ @Override
+ public void handle(Package msg, ChannelHandlerContext ctx) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Received message: {}", msg);
+ }
+ if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
+ Package rrResponse = MessageUtils.rrResponse(msg);
+ ctx.writeAndFlush(rrResponse);
+ }
}
- }
- });
+ });
+ }
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
index ff323a0f3..c77b74a4e 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
@@ -35,14 +35,13 @@ import io.netty.channel.ChannelHandlerContext;
*/
public class CClientDemo {
- public static final Logger logger = LoggerFactory.getLogger(CClientDemo.class);
+ public static final Logger LOGGER = LoggerFactory.getLogger(CClientDemo.class);
- private static final String SYNC_TOPIC = "TEST-TOPIC-TCP-SYNC";
private static final String ASYNC_TOPIC = "TEST-TOPIC-TCP-ASYNC";
private static final String BROADCAST_TOPIC = "TEST-TOPIC-TCP-BROADCAST";
public static void main(String[] args) throws Exception {
- EventMeshClientImpl client = new EventMeshClientImpl("127.0.0.1", 10000);
+ EventMeshClientImpl client = new EventMeshClientImpl("localhost", 10000);
client.init();
client.heartbeat();
client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
@@ -51,22 +50,20 @@ public class CClientDemo {
client.registerSubBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
- if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
- logger.error("receive message-------------------------------------" + msg);
+ if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT
+ || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("receive message-------------------------------------" + msg);
+ }
}
}
});
for (int i = 0; i < 10000; i++) {
- //ThreadUtil.randomSleep(0,200);
//broadcast message
client.broadcast(MessageUtils.broadcastMessage("TEST-TOPIC-TCP-BROADCAST", i), 5000);
//asynchronous message
client.publish(MessageUtils.asyncMessage(ASYNC_TOPIC, i), 5000);
}
- //
- //Thread.sleep(10000);
- //client.close();
-
-
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java
index 172c7a269..40d7d2b4d 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java
@@ -28,18 +28,23 @@ import org.slf4j.LoggerFactory;
public class SyncPubClient {
- private static final Logger logger = LoggerFactory.getLogger(SyncPubClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncPubClient.class);
public static void main(String[] args) throws Exception {
- PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
- pubClient.init();
- pubClient.heartbeat();
+ try (PubClientImpl pubClient =
+ new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+ pubClient.init();
+ pubClient.heartbeat();
- for (int i = 0; i < 100; i++) {
- Package rr = pubClient.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC", i), 3000);
- if (rr.getBody() instanceof EventMeshMessage) {
- String body = ((EventMeshMessage) rr.getBody()).getBody();
- logger.error("rrMessage: " + body + " " + "rr-reply-------------------------------------------------" + rr);
+ for (int i = 0; i < 100; i++) {
+ Package rr = pubClient.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC", i), 3000);
+ if (rr.getBody() instanceof EventMeshMessage) {
+ String body = ((EventMeshMessage) rr.getBody()).getBody();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("rrMessage: " + body + " "
+ + "rr-reply-------------------------------------------------" + rr);
+ }
+ }
}
}
}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java
index f7d2f12df..44ec2ec85 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java
@@ -33,20 +33,24 @@ import io.netty.channel.ChannelHandlerContext;
public class SyncSubClient {
- private static final Logger logger = LoggerFactory.getLogger(SyncSubClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyncSubClient.class);
public static void main(String[] args) throws Exception {
- SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
- client.init();
- client.heartbeat();
- client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
- client.registerBusiHandler(new ReceiveMsgHook() {
- @Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
- logger.error("receive message -------------------------------" + msg);
+ try (SubClientImpl client =
+ new SubClientImpl("localhost", 10000, MessageUtils.generateSubServer())) {
+ client.init();
+ client.heartbeat();
+ client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
+ client.registerBusiHandler(new ReceiveMsgHook() {
+ @Override
+ public void handle(Package msg, ChannelHandlerContext ctx) {
+ if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
+ if ((LOGGER.isInfoEnabled())) {
+ LOGGER.info("receive message -------------------------------" + msg);
+ }
+ }
}
- }
- });
+ });
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org