You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/09 06:32:18 UTC

[incubator-eventmesh] branch master updated: fix issue2489

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d830c2d7 fix issue2489
     new 5c26f0916 Merge pull request #2490 from jonyangx/issue2489
2d830c2d7 is described below

commit 2d830c2d700e2c97fece35a05bc3ed922ca1390d
Author: jonyangx <jo...@gmail.com>
AuthorDate: Wed Dec 7 12:18:42 2022 +0800

    fix issue2489
---
 .../eventmesh/runtime/demo/AsyncPubClient.java     | 30 ++++++++++---------
 .../eventmesh/runtime/demo/AsyncSubClient.java     | 30 ++++++++++---------
 .../eventmesh/runtime/demo/BroadCastPubClient.java | 14 +++++----
 .../eventmesh/runtime/demo/BroadCastSubClient.java | 31 +++++++++++---------
 .../apache/eventmesh/runtime/demo/CCPubClient.java | 10 ++++---
 .../apache/eventmesh/runtime/demo/CCSubClient.java | 34 ++++++++++++----------
 .../apache/eventmesh/runtime/demo/CClientDemo.java | 19 +++++-------
 .../eventmesh/runtime/demo/SyncPubClient.java      | 23 +++++++++------
 .../eventmesh/runtime/demo/SyncSubClient.java      | 28 ++++++++++--------
 9 files changed, 122 insertions(+), 97 deletions(-)

diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java
index 762cce2ef..afda4a105 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncPubClient.java
@@ -32,22 +32,26 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class AsyncPubClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(AsyncPubClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPubClient.class);
 
     public static void main(String[] args) throws Exception {
-        PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
-        pubClient.init();
-        pubClient.heartbeat();
-        pubClient.registerBusiHandler(new ReceiveMsgHook() {
-            @Override
-            public void handle(Package msg, ChannelHandlerContext ctx) {
-                logger.error("server good by request: {}", msg);
+        try (PubClientImpl pubClient =
+                     new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+            pubClient.init();
+            pubClient.heartbeat();
+            pubClient.registerBusiHandler(new ReceiveMsgHook() {
+                @Override
+                public void handle(Package msg, ChannelHandlerContext ctx) {
+                    if (LOGGER.isInfoEnabled()) {
+                        LOGGER.info("server good by request: {}", msg);
+                    }
+                }
+            });
+
+            for (int i = 0; i < 1; i++) {
+                ThreadUtils.randomSleep(0, 500);
+                pubClient.broadcast(MessageUtils.asyncMessage(ClientConstants.ASYNC_TOPIC, i), 5000);
             }
-        });
-
-        for (int i = 0; i < 1; i++) {
-            ThreadUtils.randomSleep(0, 500);
-            pubClient.broadcast(MessageUtils.asyncMessage(ClientConstants.ASYNC_TOPIC, i), 5000);
         }
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java
index 2783e136e..ba04f62b5 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/AsyncSubClient.java
@@ -33,21 +33,25 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class AsyncSubClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(AsyncSubClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSubClient.class);
 
     public static void main(String[] args) throws Exception {
-        SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
-        client.init();
-        client.heartbeat();
-        client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
-        client.registerBusiHandler(new ReceiveMsgHook() {
-            @Override
-            public void handle(Package msg, ChannelHandlerContext ctx) {
-                if (msg.getBody() instanceof EventMeshMessage) {
-                    String body = ((EventMeshMessage) msg.getBody()).getBody();
-                    logger.error("receive message -------------------------------" + body);
+        try (SubClientImpl client =
+                     new SubClientImpl("localhost", 10002, MessageUtils.generateSubServer())) {
+            client.init();
+            client.heartbeat();
+            client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+            client.registerBusiHandler(new ReceiveMsgHook() {
+                @Override
+                public void handle(Package msg, ChannelHandlerContext ctx) {
+                    if (msg.getBody() instanceof EventMeshMessage) {
+                        String body = ((EventMeshMessage) msg.getBody()).getBody();
+                        if (LOGGER.isInfoEnabled()) {
+                            LOGGER.info("receive message -------------------------------" + body);
+                        }
+                    }
                 }
-            }
-        });
+            });
+        }
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java
index 5f3fa158b..7c3da75e9 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastPubClient.java
@@ -25,12 +25,14 @@ import org.apache.eventmesh.runtime.client.impl.PubClientImpl;
 
 public class BroadCastPubClient {
     public static void main(String[] args) throws Exception {
-        PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
-        pubClient.init();
-        pubClient.heartbeat();
-        for (int i = 0; i < 10000; i++) {
-            ThreadUtils.randomSleep(0, 500);
-            pubClient.broadcast(MessageUtils.broadcastMessage(ClientConstants.BROADCAST_TOPIC, i), 5000);
+        try (PubClientImpl pubClient =
+                     new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+            pubClient.init();
+            pubClient.heartbeat();
+            for (int i = 0; i < 10000; i++) {
+                ThreadUtils.randomSleep(0, 500);
+                pubClient.broadcast(MessageUtils.broadcastMessage(ClientConstants.BROADCAST_TOPIC, i), 5000);
+            }
         }
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java
index 8e00541dc..14acd1dff 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/BroadCastSubClient.java
@@ -34,23 +34,26 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class BroadCastSubClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(BroadCastSubClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(BroadCastSubClient.class);
 
     public static void main(String[] args) throws Exception {
-        SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
-        client.init();
-        client.heartbeat();
-        client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
-        client.registerBusiHandler(new ReceiveMsgHook() {
-            @Override
-            public void handle(Package msg, ChannelHandlerContext ctx) {
-                if (msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
-                    if (msg.getBody() instanceof EventMeshMessage) {
-                        String body = ((EventMeshMessage) msg.getBody()).getBody();
-                        logger.error("receive message -------------------------------" + body);
+        try (SubClientImpl client = new SubClientImpl("localhost", 10000, MessageUtils.generateSubServer())) {
+            client.init();
+            client.heartbeat();
+            client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
+            client.registerBusiHandler(new ReceiveMsgHook() {
+                @Override
+                public void handle(Package msg, ChannelHandlerContext ctx) {
+                    if (msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
+                        if (msg.getBody() instanceof EventMeshMessage) {
+                            String body = ((EventMeshMessage) msg.getBody()).getBody();
+                            if (LOGGER.isInfoEnabled()) {
+                                LOGGER.info("receive message -------------------------------" + body);
+                            }
+                        }
                     }
                 }
-            }
-        });
+            });
+        }
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java
index a05a24ab3..e6f68f37e 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCPubClient.java
@@ -25,11 +25,13 @@ import org.apache.eventmesh.runtime.client.impl.PubClientImpl;
 public class CCPubClient {
 
     public static void main(String[] args) throws Exception {
-        PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
-        pubClient.init();
-        pubClient.heartbeat();
+        try (PubClientImpl pubClient =
+                     new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+            pubClient.init();
+            pubClient.heartbeat();
 
-        pubClient.broadcast(MessageUtils.rrMesssage(ClientConstants.ASYNC_TOPIC, 0), 5000);
+            pubClient.broadcast(MessageUtils.rrMesssage(ClientConstants.ASYNC_TOPIC, 0), 5000);
 
+        }
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java
index b23bc4b9b..e380e1e20 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CCSubClient.java
@@ -33,23 +33,27 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class CCSubClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(CCSubClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(CCSubClient.class);
 
     public static void main(String[] args) throws Exception {
-        SubClientImpl subClient = new SubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
-        subClient.init();
-        subClient.heartbeat();
-        subClient.listen();
-        subClient.justSubscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
-        subClient.registerBusiHandler(new ReceiveMsgHook() {
-            @Override
-            public void handle(Package msg, ChannelHandlerContext ctx) {
-                logger.error("Received message: {}", msg);
-                if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
-                    Package rrResponse = MessageUtils.rrResponse(msg);
-                    ctx.writeAndFlush(rrResponse);
+        try (SubClientImpl subClient =
+                     new SubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+            subClient.init();
+            subClient.heartbeat();
+            subClient.listen();
+            subClient.justSubscribe("TEST-TOPIC-TCP-SYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
+            subClient.registerBusiHandler(new ReceiveMsgHook() {
+                @Override
+                public void handle(Package msg, ChannelHandlerContext ctx) {
+                    if (LOGGER.isInfoEnabled()) {
+                        LOGGER.info("Received message: {}", msg);
+                    }
+                    if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
+                        Package rrResponse = MessageUtils.rrResponse(msg);
+                        ctx.writeAndFlush(rrResponse);
+                    }
                 }
-            }
-        });
+            });
+        }
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
index ff323a0f3..c77b74a4e 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java
@@ -35,14 +35,13 @@ import io.netty.channel.ChannelHandlerContext;
  */
 public class CClientDemo {
 
-    public static final Logger logger = LoggerFactory.getLogger(CClientDemo.class);
+    public static final Logger LOGGER = LoggerFactory.getLogger(CClientDemo.class);
 
-    private static final String SYNC_TOPIC = "TEST-TOPIC-TCP-SYNC";
     private static final String ASYNC_TOPIC = "TEST-TOPIC-TCP-ASYNC";
     private static final String BROADCAST_TOPIC = "TEST-TOPIC-TCP-BROADCAST";
 
     public static void main(String[] args) throws Exception {
-        EventMeshClientImpl client = new EventMeshClientImpl("127.0.0.1", 10000);
+        EventMeshClientImpl client = new EventMeshClientImpl("localhost", 10000);
         client.init();
         client.heartbeat();
         client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
@@ -51,22 +50,20 @@ public class CClientDemo {
         client.registerSubBusiHandler(new ReceiveMsgHook() {
             @Override
             public void handle(Package msg, ChannelHandlerContext ctx) {
-                if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
-                    logger.error("receive message-------------------------------------" + msg);
+                if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT
+                        || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) {
+
+                    if (LOGGER.isInfoEnabled()) {
+                        LOGGER.info("receive message-------------------------------------" + msg);
+                    }
                 }
             }
         });
         for (int i = 0; i < 10000; i++) {
-            //ThreadUtil.randomSleep(0,200);
             //broadcast message
             client.broadcast(MessageUtils.broadcastMessage("TEST-TOPIC-TCP-BROADCAST", i), 5000);
             //asynchronous message
             client.publish(MessageUtils.asyncMessage(ASYNC_TOPIC, i), 5000);
         }
-        //
-        //Thread.sleep(10000);
-        //client.close();
-
-
     }
 }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java
index 172c7a269..40d7d2b4d 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncPubClient.java
@@ -28,18 +28,23 @@ import org.slf4j.LoggerFactory;
 
 public class SyncPubClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(SyncPubClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SyncPubClient.class);
 
     public static void main(String[] args) throws Exception {
-        PubClientImpl pubClient = new PubClientImpl("127.0.0.1", 10000, UserAgentUtils.createUserAgent());
-        pubClient.init();
-        pubClient.heartbeat();
+        try (PubClientImpl pubClient =
+                     new PubClientImpl("localhost", 10000, UserAgentUtils.createUserAgent())) {
+            pubClient.init();
+            pubClient.heartbeat();
 
-        for (int i = 0; i < 100; i++) {
-            Package rr = pubClient.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC", i), 3000);
-            if (rr.getBody() instanceof EventMeshMessage) {
-                String body = ((EventMeshMessage) rr.getBody()).getBody();
-                logger.error("rrMessage: " + body + "             " + "rr-reply-------------------------------------------------" + rr);
+            for (int i = 0; i < 100; i++) {
+                Package rr = pubClient.rr(MessageUtils.rrMesssage("TEST-TOPIC-TCP-SYNC", i), 3000);
+                if (rr.getBody() instanceof EventMeshMessage) {
+                    String body = ((EventMeshMessage) rr.getBody()).getBody();
+                    if (LOGGER.isInfoEnabled()) {
+                        LOGGER.info("rrMessage: " + body + "             "
+                                + "rr-reply-------------------------------------------------" + rr);
+                    }
+                }
             }
         }
     }
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java
index f7d2f12df..44ec2ec85 100644
--- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/SyncSubClient.java
@@ -33,20 +33,24 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class SyncSubClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(SyncSubClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SyncSubClient.class);
 
     public static void main(String[] args) throws Exception {
-        SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
-        client.init();
-        client.heartbeat();
-        client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
-        client.registerBusiHandler(new ReceiveMsgHook() {
-            @Override
-            public void handle(Package msg, ChannelHandlerContext ctx) {
-                if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
-                    logger.error("receive message -------------------------------" + msg);
+        try (SubClientImpl client =
+                     new SubClientImpl("localhost", 10000, MessageUtils.generateSubServer())) {
+            client.init();
+            client.heartbeat();
+            client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
+            client.registerBusiHandler(new ReceiveMsgHook() {
+                @Override
+                public void handle(Package msg, ChannelHandlerContext ctx) {
+                    if (msg.getHeader().getCommand() == Command.REQUEST_TO_CLIENT) {
+                        if ((LOGGER.isInfoEnabled())) {
+                            LOGGER.info("receive message -------------------------------" + msg);
+                        }
+                    }
                 }
-            }
-        });
+            });
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org