You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/10 10:17:14 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #359] Split
handler from controller (#359) (#360)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new e54c1bf [ISSUE #359] Split handler from controller (#359) (#360)
e54c1bf is described below
commit e54c1bfbb7c88b5a92d9b8de5eeead23c2af0321
Author: Wenjun Ruan <86...@qq.com>
AuthorDate: Thu Jun 10 18:17:06 2021 +0800
[ISSUE #359] Split handler from controller (#359) (#360)
* [ISSUE #359] Split handler from controller (#359)
* add license header
* add ut
---
.../admin/controller/ClientManageController.java | 748 +--------------------
.../handler/EventMeshMsgDownStreamHandler.java | 166 +++++
.../handler/RedirectClientByIpPortHandler.java | 111 +++
.../admin/handler/RedirectClientByPathHandler.java | 112 +++
.../handler/RedirectClientBySubSystemHandler.java | 113 ++++
.../admin/handler/RejectAllClientHandler.java | 96 +++
.../admin/handler/RejectClientByIpPortHandler.java | 107 +++
.../handler/RejectClientBySubSystemHandler.java | 113 ++++
.../handler/ShowClientBySystemAndDcnHandler.java | 92 +++
.../runtime/admin/handler/ShowClientHandler.java | 84 +++
.../handler/ShowListenClientByTopicHandler.java | 92 +++
.../apache/eventmesh/runtime/util/NetUtils.java | 72 ++
.../handler/RedirectClientByIpPortHandlerTest.java | 40 ++
13 files changed, 1217 insertions(+), 729 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
index f93e1af..0544373 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
@@ -17,36 +17,21 @@
package org.apache.eventmesh.runtime.admin.controller;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import com.sun.net.httpserver.HttpExchange;
-import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.admin.handler.RedirectClientByIpPortHandler;
+import org.apache.eventmesh.runtime.admin.handler.RedirectClientByPathHandler;
+import org.apache.eventmesh.runtime.admin.handler.RedirectClientBySubSystemHandler;
+import org.apache.eventmesh.runtime.admin.handler.RejectAllClientHandler;
+import org.apache.eventmesh.runtime.admin.handler.RejectClientByIpPortHandler;
+import org.apache.eventmesh.runtime.admin.handler.RejectClientBySubSystemHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemAndDcnHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowClientHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowListenClientByTopicHandler;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.constants.EventMeshConstants;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
-import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,713 +48,18 @@ public class ClientManageController {
public void start() throws IOException {
int port = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerAdminPort;
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
- server.createContext("/clientManage/showClient", new ShowClientHandler());
- server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler());
- server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler());
- server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler());
- server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler());
- server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler());
- server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler());
- server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler());
-// server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler());
- server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler());
+ server.createContext("/clientManage/showClient", new ShowClientHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler(eventMeshTCPServer));
+// server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler(eventMeshTCPServer));
server.start();
logger.info("ClientManageController start success, port:{}", port);
}
-
- private Map<String, Object> parsePostParameters(HttpExchange exchange)
- throws IOException {
- Map<String, Object> parameters = new HashMap<>();
- if ("post".equalsIgnoreCase(exchange.getRequestMethod())) {
- InputStreamReader isr =
- new InputStreamReader(exchange.getRequestBody(), "utf-8");
- BufferedReader br = new BufferedReader(isr);
- String query = br.readLine();
- parseQuery(query, parameters);
- }
- return parameters;
- }
-
- @SuppressWarnings("unchecked")
- private void parseQuery(String query, Map<String, Object> parameters)
- throws UnsupportedEncodingException {
-
- if (query != null) {
- String pairs[] = query.split("&");
-
- for (String pair : pairs) {
- String param[] = pair.split("=");
-
- String key = null;
- String value = null;
- if (param.length > 0) {
- key = URLDecoder.decode(param[0], "UTF-8");
- }
-
- if (param.length > 1) {
- value = URLDecoder.decode(param[1], "UTF-8");
- }
-
- if (parameters.containsKey(key)) {
- Object obj = parameters.get(key);
- if (obj instanceof List<?>) {
- List<String> values = (List<String>) obj;
- values.add(value);
- } else if (obj instanceof String) {
- List<String> values = new ArrayList<String>();
- values.add((String) obj);
- values.add(value);
- parameters.put(key, values);
- }
- } else {
- parameters.put(key, value);
- }
- }
- }
- }
-
- /**
- * 打印本eventMesh上所有客户端信息
- *
- * @return
- */
- class ShowClientHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String newLine = System.getProperty("line.separator");
- logger.info("showAllClient=================");
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- Map<String, AtomicInteger> dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo();
- if (!dcnSystemMap.isEmpty()) {
- List<Map.Entry<String, AtomicInteger>> list = new ArrayList<>();
- ValueComparator vc = new ValueComparator();
- for (Map.Entry<String, AtomicInteger> entry : dcnSystemMap.entrySet()) {
- list.add(entry);
- }
- Collections.sort(list, vc);
- for (Map.Entry<String, AtomicInteger> entry : list) {
- result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) +
- newLine;
- }
- }
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("ShowClientHandler fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
- class ValueComparator implements Comparator<Map.Entry<String, AtomicInteger>> {
- @Override
- public int compare(Map.Entry<String, AtomicInteger> x, Map.Entry<String, AtomicInteger> y) {
- return x.getValue().intValue() - y.getValue().intValue();
- }
- }
-
- /**
- * print clientInfo by subsys and dcn
- *
- * @return
- */
- class ShowClientBySystemAndDcnHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
- String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
-
- String newLine = System.getProperty("line.separator");
- logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- if (!sessionMap.isEmpty()) {
- for (Session session : sessionMap.values()) {
- if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
- UserAgent userAgent = session.getClient();
- result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent
- .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine;
- }
- }
- }
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("ShowClientBySystemAndDcnHandler fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
-
- /**
- * query client subscription by topic
- */
- class ShowListenClientByTopicHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String topic = queryStringInfo.get(EventMeshConstants.MANAGE_TOPIC);
-
- String newLine = System.getProperty("line.separator");
- logger.info("showListeningClientByTopic,topic:{}=================", topic);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<String, ClientGroupWrapper> clientGroupMap = clientSessionGroupMapping.getClientGroupMap();
- if (!clientGroupMap.isEmpty()) {
- for (ClientGroupWrapper cgw : clientGroupMap.values()) {
- Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
- if (listenSessionSet != null && listenSessionSet.size() > 0) {
- result += String.format("group:%s", cgw.getGroupName()) + newLine;
- for (Session session : listenSessionSet) {
- UserAgent userAgent = session.getClient();
- result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
- .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getVersion()) + newLine;
- }
- }
- }
- }
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("ShowListenClientByTopicHandler fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
-
- /**
- * remove all clients accessed by eventMesh
- *
- * @return
- */
- class RejectAllClientHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
- try {
- logger.info("rejectAllClient in admin====================");
- if (!sessionMap.isEmpty()) {
- for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
- InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
- if (addr != null) {
- successRemoteAddrs.add(addr);
- }
- }
- }
- } catch (Exception e) {
- logger.error("clientManage|rejectAllClient|fail", e);
- result = String.format("rejectAllClient fail! sessionMap size {%d}, had reject {%s}, errorMsg : %s",
- sessionMap.size(), printClients(successRemoteAddrs), e.getMessage());
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- return;
- }
- result = String.format("rejectAllClient success! sessionMap size {%d}, had reject {%s}", sessionMap.size
- (), printClients(successRemoteAddrs));
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("rejectAllClient fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
- /**
- * remove c client by ip and port
- *
- * @return
- */
- class RejectClientByIpPortHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
- String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
-
- if (StringUtils.isBlank(ip) || StringUtils.isBlank(port)) {
- httpExchange.sendResponseHeaders(200, 0);
- result = "params illegal!";
- out.write(result.getBytes());
- return;
- }
- logger.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
- try {
- if (!sessionMap.isEmpty()) {
- for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
- if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) {
- InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
- if (addr != null) {
- successRemoteAddrs.add(addr);
- }
- }
- }
- }
- } catch (Exception e) {
- logger.error("clientManage|rejectClientByIpPort|fail|ip={}|port={},errMsg={}", ip, port, e);
- result = String.format("rejectClientByIpPort fail! {ip=%s port=%s}, had reject {%s}, errorMsg : %s", ip,
- port, printClients(successRemoteAddrs), e.getMessage());
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- return;
- }
-
- result = String.format("rejectClientByIpPort success! {ip=%s port=%s}, had reject {%s}", ip, port, printClients
- (successRemoteAddrs));
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("rejectClientByIpPort fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
-
- /**
- * remove c client by dcn and susysId
- *
- * @return
- */
- class RejectClientBySubSystemHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
- String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
-
- if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) {
- httpExchange.sendResponseHeaders(200, 0);
- result = "params illegal!";
- out.write(result.getBytes());
- return;
- }
-
- logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
- try {
- if (!sessionMap.isEmpty()) {
- for (Session session : sessionMap.values()) {
- if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
- InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping);
- if (addr != null) {
- successRemoteAddrs.add(addr);
- }
- }
- }
- }
- } catch (Exception e) {
- logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e);
- result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%d} , {dcn=%s " +
- "port=%s}, errorMsg : %s", sessionMap.size(), printClients(successRemoteAddrs), dcn,
- subSystem, e.getMessage());
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- return;
- }
- result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
- "port=%s}", sessionMap.size(), printClients(successRemoteAddrs), dcn, subSystem);
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("rejectClientBySubSystem fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
- /**
- * redirect subsystem for subsys and dcn
- *
- * @return
- */
- class RedirectClientBySubSystemHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
- String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
- String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
- String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
-
- if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem)
- || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
- || !StringUtils.isNumeric(destEventMeshPort)) {
- httpExchange.sendResponseHeaders(200, 0);
- result = "params illegal!";
- out.write(result.getBytes());
- return;
- }
- logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- String redirectResult = "";
- try {
- if (!sessionMap.isEmpty()) {
- for (Session session : sessionMap.values()) {
- if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
- redirectResult += "|";
- redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
- session, clientSessionGroupMapping);
- }
- }
- }
- } catch (Exception e) {
- logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" +
- "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e);
- result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
- "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
- sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e
- .getMessage());
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- return;
- }
- result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " +
- "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
- sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult);
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("redirectClientBySubSystem fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
- /**
- * redirect subsystem for path
- *
- * @return
- */
- class RedirectClientByPathHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String path = queryStringInfo.get(EventMeshConstants.MANAGE_PATH);
- String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
- String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
-
- if (StringUtils.isBlank(path) || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) ||
- !StringUtils.isNumeric(destEventMeshPort)) {
- httpExchange.sendResponseHeaders(200, 0);
- result = "params illegal!";
- out.write(result.getBytes());
- return;
- }
- logger.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- String redirectResult = "";
- try {
- if (!sessionMap.isEmpty()) {
- for (Session session : sessionMap.values()) {
- if (session.getClient().getPath().contains(path)) {
- redirectResult += "|";
- redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
- session, clientSessionGroupMapping);
- }
- }
- }
- } catch (Exception e) {
- logger.error("clientManage|redirectClientByPath|fail|path={}|destEventMeshIp" +
- "={}|destEventMeshPort={},errMsg={}", path, destEventMeshIp, destEventMeshPort, e);
- result = String.format("redirectClientByPath fail! sessionMap size {%d}, {path=%s " +
- "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
- sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult, e
- .getMessage());
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- return;
- }
- result = String.format("redirectClientByPath success! sessionMap size {%d}, {path=%s " +
- "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
- sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult);
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("redirectClientByPath fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
- /**
- * redirect subsystem for ip and port
- *
- * @return
- */
- class RedirectClientByIpPortHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "";
- OutputStream out = httpExchange.getResponseBody();
- try {
- String queryString = httpExchange.getRequestURI().getQuery();
- Map<String, String> queryStringInfo = formData2Dic(queryString);
- String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
- String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
- String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
- String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
-
- if (StringUtils.isBlank(ip) || !StringUtils.isNumeric(port)
- || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
- || !StringUtils.isNumeric(destEventMeshPort)) {
- httpExchange.sendResponseHeaders(200, 0);
- result = "params illegal!";
- out.write(result.getBytes());
- return;
- }
- logger.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort);
- ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- String redirectResult = "";
- try {
- if (!sessionMap.isEmpty()) {
- for (Session session : sessionMap.values()) {
- if (session.getClient().getHost().equals(ip) && String.valueOf(session.getClient().getPort()).equals(port)) {
- redirectResult += "|";
- redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
- session, clientSessionGroupMapping);
- }
- }
- }
- } catch (Exception e) {
- logger.error("clientManage|redirectClientByIpPort|fail|ip={}|port={}|destEventMeshIp" +
- "={}|destEventMeshPort={},errMsg={}", ip, port, destEventMeshIp, destEventMeshPort, e);
- result = String.format("redirectClientByIpPort fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
- "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
- sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult, e
- .getMessage());
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- return;
- }
- result = String.format("redirectClientByIpPort success! sessionMap size {%d}, {ip=%s port=%s " +
- "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
- sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult);
- httpExchange.sendResponseHeaders(200, 0);
- out.write(result.getBytes());
- } catch (Exception e) {
- logger.error("redirectClientByIpPort fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
-
- private String printClients(List<InetSocketAddress> clients) {
- if (clients.isEmpty()) {
- return "no session had been closed";
- }
- StringBuilder sb = new StringBuilder();
- for (InetSocketAddress addr : clients) {
- sb.append(addr).append("|");
- }
- return sb.toString();
- }
-
- private Map<String, String> formData2Dic(String formData) {
- Map<String, String> result = new HashMap<>();
- if (formData == null || formData.trim().length() == 0) {
- return result;
- }
- final String[] items = formData.split("&");
- Arrays.stream(items).forEach(item -> {
- final String[] keyAndVal = item.split("=");
- if (keyAndVal.length == 2) {
- try {
- final String key = URLDecoder.decode(keyAndVal[0], "utf8");
- final String val = URLDecoder.decode(keyAndVal[1], "utf8");
- result.put(key, val);
- } catch (UnsupportedEncodingException e) {
- logger.warn("formData2Dic:param decode failed...", e);
- }
- }
- });
- return result;
- }
-
- class EventMeshMsgDownStreamHandler implements HttpHandler {
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "false";
- OutputStream out = httpExchange.getResponseBody();
- try {
-// Map<String, Object> queryStringInfo = parsePostParameters(httpExchange);
-// String msgStr = (String)queryStringInfo.get("msg");
-// String groupName = (String)queryStringInfo.get("group");
-// logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr);
-// if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) {
-// logger.warn("msg or groupName is null");
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-// return;
-// }
-// MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class);
-// String topic = messageExt.getTopic();
-//
-// if (!EventMeshUtil.isValidRMBTopic(topic)) {
-// logger.warn("msg topic is illegal");
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-// return;
-// }
-//
-// DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy();
-// Set<Session> groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions();
-// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
-//
-// if(session == null){
-// logger.error("DownStream msg,retry other eventMesh found no session again");
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-// return;
-// }
-//
-// DownStreamMsgContext downStreamMsgContext =
-// new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true);
-// eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
-//
-// if (session.isCanDownStream()) {
-// session.downstreamMsg(downStreamMsgContext);
-// httpExchange.sendResponseHeaders(200, 0);
-// result = "true";
-// out.write(result.getBytes());
-// return;
-// }
-//
-// logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq);
-// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-// downStreamMsgContext.delay(delayTime);
-// eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
-// result = "true";
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-
- } catch (Exception e) {
- logger.error("EventMeshMsgDownStreamHandler handle fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
-
- }
- }
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java
new file mode 100644
index 0000000..cdc086e
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EventMeshMsgDownStreamHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(EventMeshMsgDownStreamHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public EventMeshMsgDownStreamHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "false";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+// Map<String, Object> queryStringInfo = parsePostParameters(httpExchange);
+// String msgStr = (String)queryStringInfo.get("msg");
+// String groupName = (String)queryStringInfo.get("group");
+// logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr);
+// if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) {
+// logger.warn("msg or groupName is null");
+// httpExchange.sendResponseHeaders(200, 0);
+// out.write(result.getBytes());
+// return;
+// }
+// MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class);
+// String topic = messageExt.getTopic();
+//
+// if (!EventMeshUtil.isValidRMBTopic(topic)) {
+// logger.warn("msg topic is illegal");
+// httpExchange.sendResponseHeaders(200, 0);
+// out.write(result.getBytes());
+// return;
+// }
+//
+// DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy();
+// Set<Session> groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions();
+// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
+//
+// if(session == null){
+// logger.error("DownStream msg,retry other eventMesh found no session again");
+// httpExchange.sendResponseHeaders(200, 0);
+// out.write(result.getBytes());
+// return;
+// }
+//
+// DownStreamMsgContext downStreamMsgContext =
+// new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true);
+// eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
+//
+// if (session.isCanDownStream()) {
+// session.downstreamMsg(downStreamMsgContext);
+// httpExchange.sendResponseHeaders(200, 0);
+// result = "true";
+// out.write(result.getBytes());
+// return;
+// }
+//
+// logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq);
+// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
+// downStreamMsgContext.delay(delayTime);
+// eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
+// result = "true";
+// httpExchange.sendResponseHeaders(200, 0);
+// out.write(result.getBytes());
+
+ } catch (Exception e) {
+ logger.error("EventMeshMsgDownStreamHandler handle fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+ }
+
+ private Map<String, Object> parsePostParameters(HttpExchange exchange)
+ throws IOException {
+ Map<String, Object> parameters = new HashMap<>();
+ if ("post".equalsIgnoreCase(exchange.getRequestMethod())) {
+ InputStreamReader isr =
+ new InputStreamReader(exchange.getRequestBody(), "utf-8");
+ BufferedReader br = new BufferedReader(isr);
+ String query = br.readLine();
+ parseQuery(query, parameters);
+ }
+ return parameters;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void parseQuery(String query, Map<String, Object> parameters)
+ throws UnsupportedEncodingException {
+
+ if (query != null) {
+ String pairs[] = query.split("&");
+
+ for (String pair : pairs) {
+ String param[] = pair.split("=");
+
+ String key = null;
+ String value = null;
+ if (param.length > 0) {
+ key = URLDecoder.decode(param[0], "UTF-8");
+ }
+
+ if (param.length > 1) {
+ value = URLDecoder.decode(param[1], "UTF-8");
+ }
+
+ if (parameters.containsKey(key)) {
+ Object obj = parameters.get(key);
+ if (obj instanceof List<?>) {
+ List<String> values = (List<String>) obj;
+ values.add(value);
+ } else if (obj instanceof String) {
+ List<String> values = new ArrayList<String>();
+ values.add((String) obj);
+ values.add(value);
+ parameters.put(key, values);
+ }
+ } else {
+ parameters.put(key, value);
+ }
+ }
+ }
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java
new file mode 100644
index 0000000..79ba4bd
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RedirectClientByIpPortHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RedirectClientByIpPortHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public RedirectClientByIpPortHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
+ String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
+ String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
+ String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
+
+ if (StringUtils.isBlank(ip) || !StringUtils.isNumeric(port)
+ || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
+ || !StringUtils.isNumeric(destEventMeshPort)) {
+ httpExchange.sendResponseHeaders(200, 0);
+ result = "params illegal!";
+ out.write(result.getBytes());
+ return;
+ }
+ logger.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ String redirectResult = "";
+ try {
+ if (!sessionMap.isEmpty()) {
+ for (Session session : sessionMap.values()) {
+ if (session.getClient().getHost().equals(ip) && String.valueOf(session.getClient().getPort()).equals(port)) {
+ redirectResult += "|";
+ redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
+ session, clientSessionGroupMapping);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("clientManage|redirectClientByIpPort|fail|ip={}|port={}|destEventMeshIp" +
+ "={}|destEventMeshPort={},errMsg={}", ip, port, destEventMeshIp, destEventMeshPort, e);
+ result = String.format("redirectClientByIpPort fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
+ "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
+ sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult, e
+ .getMessage());
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ return;
+ }
+ result = String.format("redirectClientByIpPort success! sessionMap size {%d}, {ip=%s port=%s " +
+ "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
+ sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult);
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("redirectClientByIpPort fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java
new file mode 100644
index 0000000..b605a1c
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * redirect subsystem for path
+ */
+public class RedirectClientByPathHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RedirectClientByPathHandler.class);
+
+ private EventMeshTCPServer eventMeshTCPServer;
+
+ public RedirectClientByPathHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String path = queryStringInfo.get(EventMeshConstants.MANAGE_PATH);
+ String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
+ String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
+
+ if (StringUtils.isBlank(path) || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) ||
+ !StringUtils.isNumeric(destEventMeshPort)) {
+ httpExchange.sendResponseHeaders(200, 0);
+ result = "params illegal!";
+ out.write(result.getBytes());
+ return;
+ }
+ logger.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ String redirectResult = "";
+ try {
+ if (!sessionMap.isEmpty()) {
+ for (Session session : sessionMap.values()) {
+ if (session.getClient().getPath().contains(path)) {
+ redirectResult += "|";
+ redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
+ session, clientSessionGroupMapping);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("clientManage|redirectClientByPath|fail|path={}|destEventMeshIp" +
+ "={}|destEventMeshPort={},errMsg={}", path, destEventMeshIp, destEventMeshPort, e);
+ result = String.format("redirectClientByPath fail! sessionMap size {%d}, {path=%s " +
+ "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
+ sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult, e
+ .getMessage());
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ return;
+ }
+ result = String.format("redirectClientByPath success! sessionMap size {%d}, {path=%s " +
+ "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
+ sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult);
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("redirectClientByPath fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
new file mode 100644
index 0000000..fa03e72
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * redirect subsystem for subsys and dcn
+ */
+public class RedirectClientBySubSystemHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RedirectClientBySubSystemHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public RedirectClientBySubSystemHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
+ String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
+ String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
+ String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
+
+ if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem)
+ || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
+ || !StringUtils.isNumeric(destEventMeshPort)) {
+ httpExchange.sendResponseHeaders(200, 0);
+ result = "params illegal!";
+ out.write(result.getBytes());
+ return;
+ }
+ logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ String redirectResult = "";
+ try {
+ if (!sessionMap.isEmpty()) {
+ for (Session session : sessionMap.values()) {
+ if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+ redirectResult += "|";
+ redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
+ session, clientSessionGroupMapping);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" +
+ "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e);
+ result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
+ "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
+ sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e
+ .getMessage());
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ return;
+ }
+ result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " +
+ "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
+ sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult);
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("redirectClientBySubSystem fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java
new file mode 100644
index 0000000..6314c48
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RejectAllClientHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RejectAllClientHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public RejectAllClientHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ /**
+ * remove all clients accessed by eventMesh
+ *
+ * @param httpExchange
+ * @throws IOException
+ */
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
+ try {
+ logger.info("rejectAllClient in admin====================");
+ if (!sessionMap.isEmpty()) {
+ for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
+ InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
+ if (addr != null) {
+ successRemoteAddrs.add(addr);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("clientManage|rejectAllClient|fail", e);
+ result = String.format("rejectAllClient fail! sessionMap size {%d}, had reject {%s}, errorMsg : %s",
+ sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), e.getMessage());
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ return;
+ }
+ result = String.format("rejectAllClient success! sessionMap size {%d}, had reject {%s}", sessionMap.size(),
+ NetUtils.addressToString(successRemoteAddrs));
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("rejectAllClient fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java
new file mode 100644
index 0000000..3ddd45e
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RejectClientByIpPortHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RejectClientByIpPortHandler.class);
+
+ private EventMeshTCPServer eventMeshTCPServer;
+
+ public RejectClientByIpPortHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP);
+ String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT);
+
+ if (StringUtils.isBlank(ip) || StringUtils.isBlank(port)) {
+ httpExchange.sendResponseHeaders(200, 0);
+ result = "params illegal!";
+ out.write(result.getBytes());
+ return;
+ }
+ logger.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
+ try {
+ if (!sessionMap.isEmpty()) {
+ for (Map.Entry<InetSocketAddress, Session> entry : sessionMap.entrySet()) {
+ if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) {
+ InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping);
+ if (addr != null) {
+ successRemoteAddrs.add(addr);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("clientManage|rejectClientByIpPort|fail|ip={}|port={},errMsg={}", ip, port, e);
+ result = String.format("rejectClientByIpPort fail! {ip=%s port=%s}, had reject {%s}, errorMsg : %s", ip,
+ port, NetUtils.addressToString(successRemoteAddrs), e.getMessage());
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ return;
+ }
+
+ result = String.format("rejectClientByIpPort success! {ip=%s port=%s}, had reject {%s}", ip, port,
+ NetUtils.addressToString(successRemoteAddrs));
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("rejectClientByIpPort fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
new file mode 100644
index 0000000..5b1e841
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RejectClientBySubSystemHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RejectClientBySubSystemHandler.class);
+
+ private EventMeshTCPServer eventMeshTCPServer;
+
+ public RejectClientBySubSystemHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ /**
+ * remove c client by dcn and susysId
+ * @param httpExchange
+ * @throws IOException
+ */
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
+ String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
+
+ if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) {
+ httpExchange.sendResponseHeaders(200, 0);
+ result = "params illegal!";
+ out.write(result.getBytes());
+ return;
+ }
+
+ logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
+ try {
+ if (!sessionMap.isEmpty()) {
+ for (Session session : sessionMap.values()) {
+ if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+ InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping);
+ if (addr != null) {
+ successRemoteAddrs.add(addr);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e);
+ result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
+ "port=%s}, errorMsg : %s", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn,
+ subSystem, e.getMessage());
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ return;
+ }
+ result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
+ "port=%s}", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn, subSystem);
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("rejectClientBySubSystem fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java
new file mode 100644
index 0000000..9ccd547
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ShowClientBySystemAndDcnHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(ShowClientBySystemAndDcnHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public ShowClientBySystemAndDcnHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ /**
+ * print clientInfo by subsys and dcn
+ *
+ * @param httpExchange
+ * @throws IOException
+ */
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
+ String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
+
+ String newLine = System.getProperty("line.separator");
+ logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ if (!sessionMap.isEmpty()) {
+ for (Session session : sessionMap.values()) {
+ if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+ UserAgent userAgent = session.getClient();
+ result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent
+ .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine;
+ }
+ }
+ }
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("ShowClientBySystemAndDcnHandler fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+ }
+
+
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
new file mode 100644
index 0000000..314b2e5
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This handler used to print the total client info
+ */
+public class ShowClientHandler implements HttpHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(ShowClientHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public ShowClientHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String newLine = System.getProperty("line.separator");
+ logger.info("showAllClient=================");
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ Map<String, AtomicInteger> dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo();
+ if (!dcnSystemMap.isEmpty()) {
+ List<Map.Entry<String, AtomicInteger>> list = new ArrayList<>();
+ for (Map.Entry<String, AtomicInteger> entry : dcnSystemMap.entrySet()) {
+ list.add(entry);
+ }
+ Collections.sort(list, Comparator.comparingInt(x -> x.getValue().intValue()));
+ for (Map.Entry<String, AtomicInteger> entry : list) {
+ result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) +
+ newLine;
+ }
+ }
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("ShowClientHandler fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
new file mode 100644
index 0000000..62d76dd
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.constants.EventMeshConstants;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * query client subscription by topic
+ */
+public class ShowListenClientByTopicHandler implements HttpHandler {
+
+ private Logger logger = LoggerFactory.getLogger(ShowListenClientByTopicHandler.class);
+
+ private final EventMeshTCPServer eventMeshTCPServer;
+
+ public ShowListenClientByTopicHandler(EventMeshTCPServer eventMeshTCPServer) {
+ this.eventMeshTCPServer = eventMeshTCPServer;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String result = "";
+ OutputStream out = httpExchange.getResponseBody();
+ try {
+ String queryString = httpExchange.getRequestURI().getQuery();
+ Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
+ String topic = queryStringInfo.get(EventMeshConstants.MANAGE_TOPIC);
+
+ String newLine = System.getProperty("line.separator");
+ logger.info("showListeningClientByTopic,topic:{}=================", topic);
+ ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
+ ConcurrentHashMap<String, ClientGroupWrapper> clientGroupMap = clientSessionGroupMapping.getClientGroupMap();
+ if (!clientGroupMap.isEmpty()) {
+ for (ClientGroupWrapper cgw : clientGroupMap.values()) {
+ Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
+ if (listenSessionSet != null && listenSessionSet.size() > 0) {
+ result += String.format("group:%s", cgw.getGroupName()) + newLine;
+ for (Session session : listenSessionSet) {
+ UserAgent userAgent = session.getClient();
+ result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
+ .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getVersion()) + newLine;
+ }
+ }
+ }
+ }
+ httpExchange.sendResponseHeaders(200, 0);
+ out.write(result.getBytes());
+ } catch (Exception e) {
+ logger.error("ShowListenClientByTopicHandler fail...", e);
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ logger.warn("out close failed...", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java
new file mode 100644
index 0000000..a2563bc
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
+
+ /**
+ * Transform the url form string to Map
+ *
+ * @param formData
+ * @return
+ */
+ public static Map<String, String> formData2Dic(String formData) {
+ Map<String, String> result = new HashMap<>();
+ if (formData == null || formData.trim().length() == 0) {
+ return result;
+ }
+ final String[] items = formData.split("&");
+ Arrays.stream(items).forEach(item -> {
+ final String[] keyAndVal = item.split("=");
+ if (keyAndVal.length == 2) {
+ try {
+ final String key = URLDecoder.decode(keyAndVal[0], "utf8");
+ final String val = URLDecoder.decode(keyAndVal[1], "utf8");
+ result.put(key, val);
+ } catch (UnsupportedEncodingException e) {
+ logger.warn("formData2Dic:param decode failed...", e);
+ }
+ }
+ });
+ return result;
+ }
+
+ public static String addressToString(List<InetSocketAddress> clients) {
+ if (clients.isEmpty()) {
+ return "no session had been closed";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (InetSocketAddress addr : clients) {
+ sb.append(addr).append("|");
+ }
+ return sb.toString();
+ }
+}
diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java
new file mode 100644
index 0000000..2d0ae03
--- /dev/null
+++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java
@@ -0,0 +1,40 @@
+package org.apache.eventmesh.runtime.admin.handler;
+
+import com.sun.net.httpserver.HttpExchange;
+import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+
+public class RedirectClientByIpPortHandlerTest {
+
+ private RedirectClientByIpPortHandler redirectClientByIpPortHandler;
+
+ @Before
+ public void init() {
+ EventMeshTCPServer mockServer = PowerMockito.mock(EventMeshTCPServer.class);
+ redirectClientByIpPortHandler = new RedirectClientByIpPortHandler(mockServer);
+ }
+
+ @Test
+ public void testHandleParamIllegal() throws IOException {
+ OutputStream outputStream = new ByteArrayOutputStream();
+ URI uri = URI.create("ip=127.0.0.1&port=1234&desteventMeshIp=127.0.0.1&desteventMeshPort=");
+
+ HttpExchange mockExchange = PowerMockito.mock(HttpExchange.class);
+ PowerMockito.when(mockExchange.getResponseBody()).thenReturn(outputStream);
+ PowerMockito.when(mockExchange.getRequestURI()).thenReturn(uri);
+
+ redirectClientByIpPortHandler.handle(mockExchange);
+
+ String response = outputStream.toString();
+ Assert.assertEquals("params illegal!", response);
+
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org