You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/10 10:17:14 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #359] Split handler from controller (#359) (#360)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new e54c1bf  [ISSUE #359] Split handler from controller (#359) (#360)
e54c1bf is described below

commit e54c1bfbb7c88b5a92d9b8de5eeead23c2af0321
Author: Wenjun Ruan <86...@qq.com>
AuthorDate: Thu Jun 10 18:17:06 2021 +0800

    [ISSUE #359] Split handler from controller (#359) (#360)
    
    * [ISSUE #359] Split handler from controller (#359)
    
    * add license header
    
    * add ut
---
 .../admin/controller/ClientManageController.java   | 748 +--------------------
 .../handler/EventMeshMsgDownStreamHandler.java     | 166 +++++
 .../handler/RedirectClientByIpPortHandler.java     | 111 +++
 .../admin/handler/RedirectClientByPathHandler.java | 112 +++
 .../handler/RedirectClientBySubSystemHandler.java  | 113 ++++
 .../admin/handler/RejectAllClientHandler.java      |  96 +++
 .../admin/handler/RejectClientByIpPortHandler.java | 107 +++
 .../handler/RejectClientBySubSystemHandler.java    | 113 ++++
 .../handler/ShowClientBySystemAndDcnHandler.java   |  92 +++
 .../runtime/admin/handler/ShowClientHandler.java   |  84 +++
 .../handler/ShowListenClientByTopicHandler.java    |  92 +++
 .../apache/eventmesh/runtime/util/NetUtils.java    |  72 ++
 .../handler/RedirectClientByIpPortHandlerTest.java |  40 ++
 13 files changed, 1217 insertions(+), 729 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
index f93e1af..0544373 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
@@ -17,36 +17,21 @@
 
 package org.apache.eventmesh.runtime.admin.controller;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import com.sun.net.httpserver.HttpExchange;
-import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpServer;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.admin.handler.RedirectClientByIpPortHandler;
+import org.apache.eventmesh.runtime.admin.handler.RedirectClientByPathHandler;
+import org.apache.eventmesh.runtime.admin.handler.RedirectClientBySubSystemHandler;
+import org.apache.eventmesh.runtime.admin.handler.RejectAllClientHandler;
+import org.apache.eventmesh.runtime.admin.handler.RejectClientByIpPortHandler;
+import org.apache.eventmesh.runtime.admin.handler.RejectClientBySubSystemHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemAndDcnHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowClientHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowListenClientByTopicHandler;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,713 +48,18 @@ public class ClientManageController {
     public void start() throws IOException {
         int port = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerAdminPort;
         HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
-        server.createContext("/clientManage/showClient", new ShowClientHandler());
-        server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler());
-        server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler());
-        server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler());
-        server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler());
-        server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler());
-        server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler());
-        server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler());
-//        server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler());
-        server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler());
+        server.createContext("/clientManage/showClient", new ShowClientHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler(eventMeshTCPServer));
+//        server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler(eventMeshTCPServer));
+        server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler(eventMeshTCPServer));
 
         server.start();
         logger.info("ClientManageController start success, port:{}", port);
     }
-
-    private Map<String, Object> parsePostParameters(HttpExchange exchange)
-            throws IOException {
-        Map<String, Object> parameters = new HashMap<>();
-        if ("post".equalsIgnoreCase(exchange.getRequestMethod())) {
-            InputStreamReader isr =
-                    new InputStreamReader(exchange.getRequestBody(), "utf-8");
-            BufferedReader br = new BufferedReader(isr);
-            String query = br.readLine();
-            parseQuery(query, parameters);
-        }
-        return parameters;
-    }
-
-    @SuppressWarnings("unchecked")
-    private void parseQuery(String query, Map<String, Object> parameters)
-            throws UnsupportedEncodingException {
-
-        if (query != null) {
-            String pairs[] = query.split("&");
-
-            for (String pair : pairs) {
-                String param[] = pair.split("=");
-
-                String key = null;
-                String value = null;
-                if (param.length > 0) {
-                    key = URLDecoder.decode(param[0], "UTF-8");
-                }
-
-                if (param.length > 1) {
-                    value = URLDecoder.decode(param[1], "UTF-8");
-                }
-
-                if (parameters.containsKey(key)) {
-                    Object obj = parameters.get(key);
-                    if (obj instanceof List<?>) {
-                        List<String> values = (List<String>) obj;
-                        values.add(value);
-                    } else if (obj instanceof String) {
-                        List<String> values = new ArrayList<String>();
-                        values.add((String) obj);
-                        values.add(value);
-                        parameters.put(key, values);
-                    }
-                } else {
-                    parameters.put(key, value);
-                }
-            }
-        }
-    }
-
-    /**
-     * 打印本eventMesh上所有客户端信息
-     *
-     * @return
-     */
-    class ShowClientHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String newLine = System.getProperty("line.separator");
-                logger.info("showAllClient=================");
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                Map<String, AtomicInteger> dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo();
-                if (!dcnSystemMap.isEmpty()) {
-                    List<Map.Entry<String, AtomicInteger>> list = new ArrayList<>();
-                    ValueComparator vc = new ValueComparator();
-                    for (Map.Entry<String, AtomicInteger> entry : dcnSystemMap.entrySet()) {
-                        list.add(entry);
-                    }
-                    Collections.sort(list, vc);
-                    for (Map.Entry<String, AtomicInteger> entry : list) {
-                        result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) +
-                                newLine;
-                    }
-                }
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("ShowClientHandler fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-    class ValueComparator implements Comparator<Map.Entry<String, AtomicInteger>> {
-        @Override
-        public int compare(Map.Entry<String, AtomicInteger> x, Map.Entry<String, AtomicInteger> y) {
-            return x.getValue().intValue() - y.getValue().intValue();
-        }
-    }
-
-    /**
-     * print clientInfo by subsys and dcn
-     *
-     * @return
-     */
-    class ShowClientBySystemAndDcnHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
-                String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
-
-                String newLine = System.getProperty("line.separator");
-                logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                if (!sessionMap.isEmpty()) {
-                    for (Session session : sessionMap.values()) {
-                        if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
-                            UserAgent userAgent = session.getClient();
-                            result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent
-                                    .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine;
-                        }
-                    }
-                }
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("ShowClientBySystemAndDcnHandler fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-
-    /**
-     * query client subscription by topic
-     */
-    class ShowListenClientByTopicHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String topic = queryStringInfo.get(EventMeshConstants.MANAGE_TOPIC);
-
-                String newLine = System.getProperty("line.separator");
-                logger.info("showListeningClientByTopic,topic:{}=================", topic);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<String, ClientGroupWrapper> clientGroupMap = clientSessionGroupMapping.getClientGroupMap();
-                if (!clientGroupMap.isEmpty()) {
-                    for (ClientGroupWrapper cgw : clientGroupMap.values()) {
-                        Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
-                        if (listenSessionSet != null && listenSessionSet.size() > 0) {
-                            result += String.format("group:%s", cgw.getGroupName()) + newLine;
-                            for (Session session : listenSessionSet) {
-                                UserAgent userAgent = session.getClient();
-                                result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
-                                        .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getVersion()) + newLine;
-                            }
-                        }
-                    }
-                }
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("ShowListenClientByTopicHandler fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-
-    /**
-     * remove all clients accessed by eventMesh
-     *
-     * @return
-     */
-    class RejectAllClientHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
-                try {
-                    logger.info("rejectAllClient in admin====================");
-                    if (!sessionMap.isEmpty()) {
-                        for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
-                            InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
-                            if (addr != null) {
-                                successRemoteAddrs.add(addr);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("clientManage|rejectAllClient|fail", e);
-                    result = String.format("rejectAllClient fail! sessionMap size {%d}, had reject {%s}, errorMsg : %s",
-                            sessionMap.size(), printClients(successRemoteAddrs), e.getMessage());
-                    httpExchange.sendResponseHeaders(200, 0);
-                    out.write(result.getBytes());
-                    return;
-                }
-                result = String.format("rejectAllClient success! sessionMap size {%d}, had reject {%s}", sessionMap.size
-                        (), printClients(successRemoteAddrs));
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("rejectAllClient fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-    /**
-     * remove c client by ip and port
-     *
-     * @return
-     */
-    class RejectClientByIpPortHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
-                String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
-
-                if (StringUtils.isBlank(ip) || StringUtils.isBlank(port)) {
-                    httpExchange.sendResponseHeaders(200, 0);
-                    result = "params illegal!";
-                    out.write(result.getBytes());
-                    return;
-                }
-                logger.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
-                try {
-                    if (!sessionMap.isEmpty()) {
-                        for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
-                            if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) {
-                                InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
-                                if (addr != null) {
-                                    successRemoteAddrs.add(addr);
-                                }
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("clientManage|rejectClientByIpPort|fail|ip={}|port={},errMsg={}", ip, port, e);
-                    result = String.format("rejectClientByIpPort fail! {ip=%s port=%s}, had reject {%s}, errorMsg : %s", ip,
-                            port, printClients(successRemoteAddrs), e.getMessage());
-                    httpExchange.sendResponseHeaders(200, 0);
-                    out.write(result.getBytes());
-                    return;
-                }
-
-                result = String.format("rejectClientByIpPort success! {ip=%s port=%s}, had reject {%s}", ip, port, printClients
-                        (successRemoteAddrs));
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("rejectClientByIpPort fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-
-    /**
-     * remove c client by dcn and susysId
-     *
-     * @return
-     */
-    class RejectClientBySubSystemHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
-                String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
-
-                if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) {
-                    httpExchange.sendResponseHeaders(200, 0);
-                    result = "params illegal!";
-                    out.write(result.getBytes());
-                    return;
-                }
-
-                logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
-                try {
-                    if (!sessionMap.isEmpty()) {
-                        for (Session session : sessionMap.values()) {
-                            if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
-                                InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping);
-                                if (addr != null) {
-                                    successRemoteAddrs.add(addr);
-                                }
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e);
-                    result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%d} , {dcn=%s " +
-                                    "port=%s}, errorMsg : %s", sessionMap.size(), printClients(successRemoteAddrs), dcn,
-                            subSystem, e.getMessage());
-                    httpExchange.sendResponseHeaders(200, 0);
-                    out.write(result.getBytes());
-                    return;
-                }
-                result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
-                        "port=%s}", sessionMap.size(), printClients(successRemoteAddrs), dcn, subSystem);
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("rejectClientBySubSystem fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-    /**
-     * redirect subsystem for subsys and dcn
-     *
-     * @return
-     */
-    class RedirectClientBySubSystemHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
-                String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
-                String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
-                String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
-
-                if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem)
-                        || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
-                        || !StringUtils.isNumeric(destEventMeshPort)) {
-                    httpExchange.sendResponseHeaders(200, 0);
-                    result = "params illegal!";
-                    out.write(result.getBytes());
-                    return;
-                }
-                logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                String redirectResult = "";
-                try {
-                    if (!sessionMap.isEmpty()) {
-                        for (Session session : sessionMap.values()) {
-                            if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
-                                redirectResult += "|";
-                                redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
-                                        session, clientSessionGroupMapping);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" +
-                            "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e);
-                    result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
-                                    "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
-                            sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e
-                                    .getMessage());
-                    httpExchange.sendResponseHeaders(200, 0);
-                    out.write(result.getBytes());
-                    return;
-                }
-                result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " +
-                                "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
-                        sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult);
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("redirectClientBySubSystem fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-    /**
-     * redirect subsystem for path
-     *
-     * @return
-     */
-    class RedirectClientByPathHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String path = queryStringInfo.get(EventMeshConstants.MANAGE_PATH);
-                String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
-                String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
-
-                if (StringUtils.isBlank(path) || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) ||
-                        !StringUtils.isNumeric(destEventMeshPort)) {
-                    httpExchange.sendResponseHeaders(200, 0);
-                    result = "params illegal!";
-                    out.write(result.getBytes());
-                    return;
-                }
-                logger.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                String redirectResult = "";
-                try {
-                    if (!sessionMap.isEmpty()) {
-                        for (Session session : sessionMap.values()) {
-                            if (session.getClient().getPath().contains(path)) {
-                                redirectResult += "|";
-                                redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
-                                        session, clientSessionGroupMapping);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("clientManage|redirectClientByPath|fail|path={}|destEventMeshIp" +
-                            "={}|destEventMeshPort={},errMsg={}", path, destEventMeshIp, destEventMeshPort, e);
-                    result = String.format("redirectClientByPath fail! sessionMap size {%d}, {path=%s " +
-                                    "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
-                            sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult, e
-                                    .getMessage());
-                    httpExchange.sendResponseHeaders(200, 0);
-                    out.write(result.getBytes());
-                    return;
-                }
-                result = String.format("redirectClientByPath success! sessionMap size {%d}, {path=%s " +
-                                "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
-                        sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult);
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("redirectClientByPath fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-    /**
-     * redirect subsystem for ip and port
-     *
-     * @return
-     */
-    class RedirectClientByIpPortHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-                String queryString = httpExchange.getRequestURI().getQuery();
-                Map<String, String> queryStringInfo = formData2Dic(queryString);
-                String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
-                String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
-                String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
-                String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
-
-                if (StringUtils.isBlank(ip) || !StringUtils.isNumeric(port)
-                        || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
-                        || !StringUtils.isNumeric(destEventMeshPort)) {
-                    httpExchange.sendResponseHeaders(200, 0);
-                    result = "params illegal!";
-                    out.write(result.getBytes());
-                    return;
-                }
-                logger.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort);
-                ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
-                ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
-                String redirectResult = "";
-                try {
-                    if (!sessionMap.isEmpty()) {
-                        for (Session session : sessionMap.values()) {
-                            if (session.getClient().getHost().equals(ip) && String.valueOf(session.getClient().getPort()).equals(port)) {
-                                redirectResult += "|";
-                                redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
-                                        session, clientSessionGroupMapping);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    logger.error("clientManage|redirectClientByIpPort|fail|ip={}|port={}|destEventMeshIp" +
-                            "={}|destEventMeshPort={},errMsg={}", ip, port, destEventMeshIp, destEventMeshPort, e);
-                    result = String.format("redirectClientByIpPort fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
-                                    "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
-                            sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult, e
-                                    .getMessage());
-                    httpExchange.sendResponseHeaders(200, 0);
-                    out.write(result.getBytes());
-                    return;
-                }
-                result = String.format("redirectClientByIpPort success! sessionMap size {%d}, {ip=%s port=%s " +
-                                "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
-                        sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult);
-                httpExchange.sendResponseHeaders(200, 0);
-                out.write(result.getBytes());
-            } catch (Exception e) {
-                logger.error("redirectClientByIpPort fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
-
-    private String printClients(List<InetSocketAddress> clients) {
-        if (clients.isEmpty()) {
-            return "no session had been closed";
-        }
-        StringBuilder sb = new StringBuilder();
-        for (InetSocketAddress addr : clients) {
-            sb.append(addr).append("|");
-        }
-        return sb.toString();
-    }
-
-    private Map<String, String> formData2Dic(String formData) {
-        Map<String, String> result = new HashMap<>();
-        if (formData == null || formData.trim().length() == 0) {
-            return result;
-        }
-        final String[] items = formData.split("&");
-        Arrays.stream(items).forEach(item -> {
-            final String[] keyAndVal = item.split("=");
-            if (keyAndVal.length == 2) {
-                try {
-                    final String key = URLDecoder.decode(keyAndVal[0], "utf8");
-                    final String val = URLDecoder.decode(keyAndVal[1], "utf8");
-                    result.put(key, val);
-                } catch (UnsupportedEncodingException e) {
-                    logger.warn("formData2Dic:param decode failed...", e);
-                }
-            }
-        });
-        return result;
-    }
-
-    class EventMeshMsgDownStreamHandler implements HttpHandler {
-        @Override
-        public void handle(HttpExchange httpExchange) throws IOException {
-            String result = "false";
-            OutputStream out = httpExchange.getResponseBody();
-            try {
-//                Map<String, Object> queryStringInfo =  parsePostParameters(httpExchange);
-//                String msgStr = (String)queryStringInfo.get("msg");
-//                String groupName = (String)queryStringInfo.get("group");
-//                logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr);
-//                if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) {
-//                    logger.warn("msg or groupName is null");
-//                    httpExchange.sendResponseHeaders(200, 0);
-//                    out.write(result.getBytes());
-//                    return;
-//                }
-//                MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class);
-//                String topic = messageExt.getTopic();
-//
-//                if (!EventMeshUtil.isValidRMBTopic(topic)) {
-//                    logger.warn("msg topic is illegal");
-//                    httpExchange.sendResponseHeaders(200, 0);
-//                    out.write(result.getBytes());
-//                    return;
-//                }
-//
-//                DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy();
-//                Set<Session> groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions();
-//                Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
-//
-//                if(session == null){
-//                    logger.error("DownStream msg,retry other eventMesh found no session again");
-//                    httpExchange.sendResponseHeaders(200, 0);
-//                    out.write(result.getBytes());
-//                    return;
-//                }
-//
-//                DownStreamMsgContext downStreamMsgContext =
-//                        new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true);
-//                eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
-//
-//                if (session.isCanDownStream()) {
-//                    session.downstreamMsg(downStreamMsgContext);
-//                    httpExchange.sendResponseHeaders(200, 0);
-//                    result = "true";
-//                    out.write(result.getBytes());
-//                    return;
-//                }
-//
-//                logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq);
-//                long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-//                downStreamMsgContext.delay(delayTime);
-//                eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
-//                result = "true";
-//                httpExchange.sendResponseHeaders(200, 0);
-//                out.write(result.getBytes());
-
-            } catch (Exception e) {
-                logger.error("EventMeshMsgDownStreamHandler handle fail...", e);
-            } finally {
-                if (out != null) {
-                    try {
-                        out.close();
-                    } catch (IOException e) {
-                        logger.warn("out close failed...", e);
-                    }
-                }
-            }
-
-        }
-    }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java
new file mode 100644
index 0000000..cdc086e
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EventMeshMsgDownStreamHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(EventMeshMsgDownStreamHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public EventMeshMsgDownStreamHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "false";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+//                Map<String, Object> queryStringInfo =  parsePostParameters(httpExchange);
+//                String msgStr = (String)queryStringInfo.get("msg");
+//                String groupName = (String)queryStringInfo.get("group");
+//                logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr);
+//                if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) {
+//                    logger.warn("msg or groupName is null");
+//                    httpExchange.sendResponseHeaders(200, 0);
+//                    out.write(result.getBytes());
+//                    return;
+//                }
+//                MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class);
+//                String topic = messageExt.getTopic();
+//
+//                if (!EventMeshUtil.isValidRMBTopic(topic)) {
+//                    logger.warn("msg topic is illegal");
+//                    httpExchange.sendResponseHeaders(200, 0);
+//                    out.write(result.getBytes());
+//                    return;
+//                }
+//
+//                DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy();
+//                Set<Session> groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions();
+//                Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
+//
+//                if(session == null){
+//                    logger.error("DownStream msg,retry other eventMesh found no session again");
+//                    httpExchange.sendResponseHeaders(200, 0);
+//                    out.write(result.getBytes());
+//                    return;
+//                }
+//
+//                DownStreamMsgContext downStreamMsgContext =
+//                        new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true);
+//                eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
+//
+//                if (session.isCanDownStream()) {
+//                    session.downstreamMsg(downStreamMsgContext);
+//                    httpExchange.sendResponseHeaders(200, 0);
+//                    result = "true";
+//                    out.write(result.getBytes());
+//                    return;
+//                }
+//
+//                logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq);
+//                long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
+//                downStreamMsgContext.delay(delayTime);
+//                eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
+//                result = "true";
+//                httpExchange.sendResponseHeaders(200, 0);
+//                out.write(result.getBytes());
+
+        } catch (Exception e) {
+            logger.error("EventMeshMsgDownStreamHandler handle fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+    }
+
+    private Map<String, Object> parsePostParameters(HttpExchange exchange)
+            throws IOException {
+        Map<String, Object> parameters = new HashMap<>();
+        if ("post".equalsIgnoreCase(exchange.getRequestMethod())) {
+            InputStreamReader isr =
+                    new InputStreamReader(exchange.getRequestBody(), "utf-8");
+            BufferedReader br = new BufferedReader(isr);
+            String query = br.readLine();
+            parseQuery(query, parameters);
+        }
+        return parameters;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void parseQuery(String query, Map<String, Object> parameters)
+            throws UnsupportedEncodingException {
+
+        if (query != null) {
+            String pairs[] = query.split("&");
+
+            for (String pair : pairs) {
+                String param[] = pair.split("=");
+
+                String key = null;
+                String value = null;
+                if (param.length > 0) {
+                    key = URLDecoder.decode(param[0], "UTF-8");
+                }
+
+                if (param.length > 1) {
+                    value = URLDecoder.decode(param[1], "UTF-8");
+                }
+
+                if (parameters.containsKey(key)) {
+                    Object obj = parameters.get(key);
+                    if (obj instanceof List<?>) {
+                        List<String> values = (List<String>) obj;
+                        values.add(value);
+                    } else if (obj instanceof String) {
+                        List<String> values = new ArrayList<String>();
+                        values.add((String) obj);
+                        values.add(value);
+                        parameters.put(key, values);
+                    }
+                } else {
+                    parameters.put(key, value);
+                }
+            }
+        }
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java
new file mode 100644
index 0000000..79ba4bd
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RedirectClientByIpPortHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RedirectClientByIpPortHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public RedirectClientByIpPortHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
+            String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
+            String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
+            String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
+
+            if (StringUtils.isBlank(ip) || !StringUtils.isNumeric(port)
+                    || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
+                    || !StringUtils.isNumeric(destEventMeshPort)) {
+                httpExchange.sendResponseHeaders(200, 0);
+                result = "params illegal!";
+                out.write(result.getBytes());
+                return;
+            }
+            logger.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            String redirectResult = "";
+            try {
+                if (!sessionMap.isEmpty()) {
+                    for (Session session : sessionMap.values()) {
+                        if (session.getClient().getHost().equals(ip) && String.valueOf(session.getClient().getPort()).equals(port)) {
+                            redirectResult += "|";
+                            redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
+                                    session, clientSessionGroupMapping);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("clientManage|redirectClientByIpPort|fail|ip={}|port={}|destEventMeshIp" +
+                        "={}|destEventMeshPort={},errMsg={}", ip, port, destEventMeshIp, destEventMeshPort, e);
+                result = String.format("redirectClientByIpPort fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
+                                "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
+                        sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult, e
+                                .getMessage());
+                httpExchange.sendResponseHeaders(200, 0);
+                out.write(result.getBytes());
+                return;
+            }
+            result = String.format("redirectClientByIpPort success! sessionMap size {%d}, {ip=%s port=%s " +
+                            "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
+                    sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult);
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("redirectClientByIpPort fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java
new file mode 100644
index 0000000..b605a1c
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * redirect subsystem for path
+ */
+public class RedirectClientByPathHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RedirectClientByPathHandler.class);
+
+    private EventMeshTCPServer eventMeshTCPServer;
+
+    public RedirectClientByPathHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String path = queryStringInfo.get(EventMeshConstants.MANAGE_PATH);
+            String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
+            String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
+
+            if (StringUtils.isBlank(path) || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) ||
+                    !StringUtils.isNumeric(destEventMeshPort)) {
+                httpExchange.sendResponseHeaders(200, 0);
+                result = "params illegal!";
+                out.write(result.getBytes());
+                return;
+            }
+            logger.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            String redirectResult = "";
+            try {
+                if (!sessionMap.isEmpty()) {
+                    for (Session session : sessionMap.values()) {
+                        if (session.getClient().getPath().contains(path)) {
+                            redirectResult += "|";
+                            redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
+                                    session, clientSessionGroupMapping);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("clientManage|redirectClientByPath|fail|path={}|destEventMeshIp" +
+                        "={}|destEventMeshPort={},errMsg={}", path, destEventMeshIp, destEventMeshPort, e);
+                result = String.format("redirectClientByPath fail! sessionMap size {%d}, {path=%s " +
+                                "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
+                        sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult, e
+                                .getMessage());
+                httpExchange.sendResponseHeaders(200, 0);
+                out.write(result.getBytes());
+                return;
+            }
+            result = String.format("redirectClientByPath success! sessionMap size {%d}, {path=%s " +
+                            "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
+                    sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult);
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("redirectClientByPath fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
new file mode 100644
index 0000000..fa03e72
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * redirect subsystem for subsys and dcn
+ */
+public class RedirectClientBySubSystemHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RedirectClientBySubSystemHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public RedirectClientBySubSystemHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
+            String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
+            String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
+            String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
+
+            if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem)
+                    || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
+                    || !StringUtils.isNumeric(destEventMeshPort)) {
+                httpExchange.sendResponseHeaders(200, 0);
+                result = "params illegal!";
+                out.write(result.getBytes());
+                return;
+            }
+            logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            String redirectResult = "";
+            try {
+                if (!sessionMap.isEmpty()) {
+                    for (Session session : sessionMap.values()) {
+                        if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+                            redirectResult += "|";
+                            redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
+                                    session, clientSessionGroupMapping);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" +
+                        "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e);
+                result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
+                                "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
+                        sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e
+                                .getMessage());
+                httpExchange.sendResponseHeaders(200, 0);
+                out.write(result.getBytes());
+                return;
+            }
+            result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " +
+                            "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
+                    sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult);
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("redirectClientBySubSystem fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java
new file mode 100644
index 0000000..6314c48
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RejectAllClientHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RejectAllClientHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public RejectAllClientHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    /**
+     * remove all clients accessed by eventMesh
+     *
+     * @param httpExchange
+     * @throws IOException
+     */
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
+            try {
+                logger.info("rejectAllClient in admin====================");
+                if (!sessionMap.isEmpty()) {
+                    for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
+                        InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
+                        if (addr != null) {
+                            successRemoteAddrs.add(addr);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("clientManage|rejectAllClient|fail", e);
+                result = String.format("rejectAllClient fail! sessionMap size {%d}, had reject {%s}, errorMsg : %s",
+                        sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), e.getMessage());
+                httpExchange.sendResponseHeaders(200, 0);
+                out.write(result.getBytes());
+                return;
+            }
+            result = String.format("rejectAllClient success! sessionMap size {%d}, had reject {%s}", sessionMap.size(),
+                    NetUtils.addressToString(successRemoteAddrs));
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("rejectAllClient fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java
new file mode 100644
index 0000000..3ddd45e
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RejectClientByIpPortHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RejectClientByIpPortHandler.class);
+
+    private EventMeshTCPServer eventMeshTCPServer;
+
+    public RejectClientByIpPortHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
+            String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
+
+            if (StringUtils.isBlank(ip) || StringUtils.isBlank(port)) {
+                httpExchange.sendResponseHeaders(200, 0);
+                result = "params illegal!";
+                out.write(result.getBytes());
+                return;
+            }
+            logger.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
+            try {
+                if (!sessionMap.isEmpty()) {
+                    for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
+                        if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) {
+                            InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
+                            if (addr != null) {
+                                successRemoteAddrs.add(addr);
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("clientManage|rejectClientByIpPort|fail|ip={}|port={},errMsg={}", ip, port, e);
+                result = String.format("rejectClientByIpPort fail! {ip=%s port=%s}, had reject {%s}, errorMsg : %s", ip,
+                        port, NetUtils.addressToString(successRemoteAddrs), e.getMessage());
+                httpExchange.sendResponseHeaders(200, 0);
+                out.write(result.getBytes());
+                return;
+            }
+
+            result = String.format("rejectClientByIpPort success! {ip=%s port=%s}, had reject {%s}", ip, port,
+                    NetUtils.addressToString(successRemoteAddrs));
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("rejectClientByIpPort fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
new file mode 100644
index 0000000..5b1e841
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RejectClientBySubSystemHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(RejectClientBySubSystemHandler.class);
+
+    private EventMeshTCPServer eventMeshTCPServer;
+
+    public RejectClientBySubSystemHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    /**
+     * remove c client by dcn and susysId
+     * @param httpExchange
+     * @throws IOException
+     */
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
+            String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
+
+            if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) {
+                httpExchange.sendResponseHeaders(200, 0);
+                result = "params illegal!";
+                out.write(result.getBytes());
+                return;
+            }
+
+            logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
+            try {
+                if (!sessionMap.isEmpty()) {
+                    for (Session session : sessionMap.values()) {
+                        if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+                            InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping);
+                            if (addr != null) {
+                                successRemoteAddrs.add(addr);
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e);
+                result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
+                                "port=%s}, errorMsg : %s", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn,
+                        subSystem, e.getMessage());
+                httpExchange.sendResponseHeaders(200, 0);
+                out.write(result.getBytes());
+                return;
+            }
+            result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
+                    "port=%s}", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn, subSystem);
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("rejectClientBySubSystem fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java
new file mode 100644
index 0000000..9ccd547
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ShowClientBySystemAndDcnHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(ShowClientBySystemAndDcnHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public ShowClientBySystemAndDcnHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    /**
+     * print clientInfo by subsys and dcn
+     *
+     * @param httpExchange
+     * @throws IOException
+     */
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
+            String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
+
+            String newLine = System.getProperty("line.separator");
+            logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+            if (!sessionMap.isEmpty()) {
+                for (Session session : sessionMap.values()) {
+                    if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+                        UserAgent userAgent = session.getClient();
+                        result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent
+                                .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine;
+                    }
+                }
+            }
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("ShowClientBySystemAndDcnHandler fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+    }
+
+
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
new file mode 100644
index 0000000..314b2e5
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This handler used to print the total client info
+ */
+public class ShowClientHandler implements HttpHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(ShowClientHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public ShowClientHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String newLine = System.getProperty("line.separator");
+            logger.info("showAllClient=================");
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            Map<String, AtomicInteger> dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo();
+            if (!dcnSystemMap.isEmpty()) {
+                List<Map.Entry<String, AtomicInteger>> list = new ArrayList<>();
+                for (Map.Entry<String, AtomicInteger> entry : dcnSystemMap.entrySet()) {
+                    list.add(entry);
+                }
+                Collections.sort(list, Comparator.comparingInt(x -> x.getValue().intValue()));
+                for (Map.Entry<String, AtomicInteger> entry : list) {
+                    result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) +
+                            newLine;
+                }
+            }
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("ShowClientHandler fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
new file mode 100644
index 0000000..62d76dd
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * query client subscription by topic
+ */
+public class ShowListenClientByTopicHandler implements HttpHandler {
+
+    private Logger logger = LoggerFactory.getLogger(ShowListenClientByTopicHandler.class);
+
+    private final EventMeshTCPServer eventMeshTCPServer;
+
+    public ShowListenClientByTopicHandler(EventMeshTCPServer eventMeshTCPServer) {
+        this.eventMeshTCPServer = eventMeshTCPServer;
+    }
+
+    @Override
+    public void handle(HttpExchange httpExchange) throws IOException {
+        String result = "";
+        OutputStream out = httpExchange.getResponseBody();
+        try {
+            String queryString = httpExchange.getRequestURI().getQuery();
+            Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+            String topic = queryStringInfo.get(EventMeshConstants.MANAGE_TOPIC);
+
+            String newLine = System.getProperty("line.separator");
+            logger.info("showListeningClientByTopic,topic:{}=================", topic);
+            ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+            ConcurrentHashMap<String, ClientGroupWrapper> clientGroupMap = clientSessionGroupMapping.getClientGroupMap();
+            if (!clientGroupMap.isEmpty()) {
+                for (ClientGroupWrapper cgw : clientGroupMap.values()) {
+                    Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
+                    if (listenSessionSet != null && listenSessionSet.size() > 0) {
+                        result += String.format("group:%s", cgw.getGroupName()) + newLine;
+                        for (Session session : listenSessionSet) {
+                            UserAgent userAgent = session.getClient();
+                            result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
+                                    .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getVersion()) + newLine;
+                        }
+                    }
+                }
+            }
+            httpExchange.sendResponseHeaders(200, 0);
+            out.write(result.getBytes());
+        } catch (Exception e) {
+            logger.error("ShowListenClientByTopicHandler fail...", e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    logger.warn("out close failed...", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java
new file mode 100644
index 0000000..a2563bc
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
+
+    /**
+     * Transform the url form string to Map
+     *
+     * @param formData
+     * @return
+     */
+    public static Map<String, String> formData2Dic(String formData) {
+        Map<String, String> result = new HashMap<>();
+        if (formData == null || formData.trim().length() == 0) {
+            return result;
+        }
+        final String[] items = formData.split("&");
+        Arrays.stream(items).forEach(item -> {
+            final String[] keyAndVal = item.split("=");
+            if (keyAndVal.length == 2) {
+                try {
+                    final String key = URLDecoder.decode(keyAndVal[0], "utf8");
+                    final String val = URLDecoder.decode(keyAndVal[1], "utf8");
+                    result.put(key, val);
+                } catch (UnsupportedEncodingException e) {
+                    logger.warn("formData2Dic:param decode failed...", e);
+                }
+            }
+        });
+        return result;
+    }
+
+    public static String addressToString(List<InetSocketAddress> clients) {
+        if (clients.isEmpty()) {
+            return "no session had been closed";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (InetSocketAddress addr : clients) {
+            sb.append(addr).append("|");
+        }
+        return sb.toString();
+    }
+}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java
new file mode 100644
index 0000000..2d0ae03
--- /dev/null
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java
@@ -0,0 +1,40 @@
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+
+public class RedirectClientByIpPortHandlerTest {
+
+    private RedirectClientByIpPortHandler redirectClientByIpPortHandler;
+
+    @Before
+    public void init() {
+        EventMeshTCPServer mockServer = PowerMockito.mock(EventMeshTCPServer.class);
+        redirectClientByIpPortHandler = new RedirectClientByIpPortHandler(mockServer);
+    }
+
+    @Test
+    public void testHandleParamIllegal() throws IOException {
+        OutputStream outputStream = new ByteArrayOutputStream();
+        URI uri = URI.create("ip=127.0.0.1&port=1234&desteventMeshIp=127.0.0.1&desteventMeshPort=");
+
+        HttpExchange mockExchange = PowerMockito.mock(HttpExchange.class);
+        PowerMockito.when(mockExchange.getResponseBody()).thenReturn(outputStream);
+        PowerMockito.when(mockExchange.getRequestURI()).thenReturn(uri);
+
+        redirectClientByIpPortHandler.handle(mockExchange);
+
+        String response = outputStream.toString();
+        Assert.assertEquals("params illegal!", response);
+
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org