You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/21 07:40:46 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #366] remove
custom concept [dcn&®ion] (#390)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new da05fe3 [ISSUE #366] remove custom concept [dcn&®ion] (#390)
da05fe3 is described below
commit da05fe30ab7fb0fff71360eeda0a0664319dce84
Author: nanoxiong <xi...@163.com>
AuthorDate: Mon Jun 21 15:40:38 2021 +0800
[ISSUE #366] remove custom concept [dcn&®ion] (#390)
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
close #366
---
.../org/apache/eventmesh/common/Constants.java | 6 -
.../eventmesh/common/command/HttpCommand.java | 15 +-
.../common/config/CommonConfiguration.java | 14 --
.../http/body/client/HeartbeatRequestBody.java | 14 ++
.../http/body/client/SubscribeRequestBody.java | 21 ++-
.../http/body/client/UnSubscribeRequestBody.java | 17 ++-
.../http/body/message/ReplyMessageRequestBody.java | 15 +-
.../body/message/SendMessageBatchRequestBody.java | 14 ++
.../message/SendMessageBatchV2RequestBody.java | 14 ++
.../http/body/message/SendMessageRequestBody.java | 15 +-
.../common/protocol/http/common/ProtocolKey.java | 4 -
.../http/header/client/HeartbeatRequestHeader.java | 30 +---
.../header/client/HeartbeatResponseHeader.java | 30 +---
.../http/header/client/RegRequestHeader.java | 26 ----
.../http/header/client/RegResponseHeader.java | 31 +---
.../http/header/client/SubscribeRequestHeader.java | 26 ----
.../header/client/SubscribeResponseHeader.java | 30 +---
.../http/header/client/UnRegRequestHeader.java | 28 ----
.../http/header/client/UnRegResponseHeader.java | 29 +---
.../header/client/UnSubscribeRequestHeader.java | 26 ----
.../header/client/UnSubscribeResponseHeader.java | 29 +---
.../header/message/PushMessageRequestHeader.java | 26 ----
.../header/message/PushMessageResponseHeader.java | 32 +---
.../header/message/ReplyMessageRequestHeader.java | 28 ----
.../header/message/ReplyMessageResponseHeader.java | 32 +---
.../message/SendMessageBatchRequestHeader.java | 28 ----
.../message/SendMessageBatchResponseHeader.java | 32 +---
.../message/SendMessageBatchV2RequestHeader.java | 28 ----
.../message/SendMessageBatchV2ResponseHeader.java | 32 +---
.../header/message/SendMessageRequestHeader.java | 28 ----
.../header/message/SendMessageResponseHeader.java | 32 +---
.../eventmesh/common/protocol/tcp/UserAgent.java | 30 ++--
.../connector/rocketmq/common/Constants.java | 5 +-
.../rocketmq/common/EventMeshConstants.java | 5 -
.../rocketmq/consumer/RocketMQConsumerImpl.java | 4 +-
eventmesh-runtime/conf/eventmesh.properties | 2 -
eventmesh-runtime/scripts/client_manage.sh | 22 ++-
eventmesh-runtime/scripts/session.sh | 5 +-
.../admin/controller/ClientManageController.java | 5 +-
.../handler/EventMeshMsgDownStreamHandler.java | 166 ---------------------
.../handler/RedirectClientBySubSystemHandler.java | 19 ++-
.../handler/RejectClientBySubSystemHandler.java | 31 ++--
...Handler.java => ShowClientBySystemHandler.java} | 15 +-
.../runtime/admin/handler/ShowClientHandler.java | 32 ++--
.../handler/ShowListenClientByTopicHandler.java | 2 +-
.../runtime/constants/EventMeshConstants.java | 7 -
.../protocol/http/consumer/EventMeshConsumer.java | 6 +-
.../http/processor/BatchSendMessageProcessor.java | 11 +-
.../processor/BatchSendMessageV2Processor.java | 10 +-
.../http/processor/HeartBeatProcessor.java | 10 +-
.../http/processor/ReplyMessageProcessor.java | 8 +-
.../http/processor/SendAsyncMessageProcessor.java | 8 +-
.../http/processor/SendSyncMessageProcessor.java | 8 +-
.../http/processor/SubscribeProcessor.java | 11 +-
.../http/processor/UnSubscribeProcessor.java | 12 +-
.../core/protocol/http/processor/inf/Client.java | 4 -
.../protocol/http/producer/EventMeshProducer.java | 3 +-
.../protocol/http/push/AsyncHTTPPushRequest.java | 2 -
.../tcp/client/group/ClientGroupWrapper.java | 111 +++++++-------
.../client/group/ClientSessionGroupMapping.java | 84 +++--------
.../core/protocol/tcp/client/session/Session.java | 2 +-
.../session/push/retry/EventMeshTcpRetryer.java | 2 +-
.../core/protocol/tcp/client/task/HelloTask.java | 10 ++
.../eventmesh/runtime/util/EventMeshUtil.java | 48 +-----
.../src/test/java/client/common/MessageUtils.java | 2 -
.../test/java/client/common/UserAgentUtils.java | 3 -
.../eventmesh/client/http/RemotingServer.java | 3 +-
.../client/http/conf/LiteClientConfig.java | 46 +++---
.../client/http/consumer/LiteConsumer.java | 18 ++-
.../http/consumer/context/LiteConsumeContext.java | 28 +---
.../client/http/producer/LiteProducer.java | 9 +-
.../eventmesh/client/tcp/common/MessageUtils.java | 2 -
.../client/http/demo/AsyncPublishInstance.java | 2 +-
.../client/http/demo/AsyncSyncRequestInstance.java | 2 +-
.../client/http/demo/SyncRequestInstance.java | 2 +-
.../client/tcp/common/EventMeshTestUtils.java | 6 +-
.../eventmesh/http/demo/AsyncPublishInstance.java | 2 +-
.../http/demo/AsyncSyncRequestInstance.java | 2 +-
.../eventmesh/http/demo/SyncRequestInstance.java | 2 +-
.../http/demo/sub/service/SubService.java | 3 +-
.../eventmesh/tcp/common/EventMeshTestUtils.java | 7 +-
.../apache/eventmesh/tcp/demo/SyncResponse.java | 1 +
82 files changed, 403 insertions(+), 1169 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index 937859a..c2457ad 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -21,10 +21,6 @@ public class Constants {
public static final String DEFAULT_CHARSET = "UTF-8";
- public static final String TARGET_EVENTMESH_REGION = "TARGET_EVENTMESH_REGION";
-
- public static final String CONSTANTS_DEFAULT_REGION_KEY = "default";
-
public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
public static final String LANGUAGE_JAVA = "JAVA";
@@ -47,8 +43,6 @@ public class Constants {
public static final String CONSTANTS_INSTANCE_DESC_IDC = "idc";
- public static final String CONSTANTS_INSTANCE_DESC_DCN = "dcn";
-
public static final String CONSTANTS_INSTANCE_DESC_SYSID = "sysId";
public static final String CONSTANTS_INSTANCE_DESC_IP = "ip";
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
index 3117956..d36a1ae 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
@@ -17,19 +17,9 @@
package org.apache.eventmesh.common.command;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
import com.alibaba.fastjson.JSON;
-
import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-
+import io.netty.handler.codec.http.*;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
@@ -38,6 +28,9 @@ import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader;
import org.apache.eventmesh.common.protocol.http.header.Header;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
public class HttpCommand {
private static AtomicLong requestId = new AtomicLong(0);
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 931f41b..08a44cb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -31,9 +31,7 @@ import org.apache.commons.lang3.StringUtils;
public class CommonConfiguration {
public String eventMeshEnv = "P";
- public String eventMeshRegion = "";
public String eventMeshIDC = "FT";
- public String eventMeshDCN = "1C0";
public String eventMeshCluster = "LS";
public String eventMeshName = "";
public String sysID = "5477";
@@ -68,10 +66,6 @@ public class CommonConfiguration {
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshEnvStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ENV));
eventMeshEnv = StringUtils.deleteWhitespace(eventMeshEnvStr);
- String eventMeshRegionStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_REGION);
- Preconditions.checkState(StringUtils.isNotEmpty(eventMeshRegionStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_REGION));
- eventMeshRegion = StringUtils.deleteWhitespace(eventMeshRegionStr);
-
String sysIdStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SYSID);
Preconditions.checkState(StringUtils.isNotEmpty(sysIdStr) && StringUtils.isNumeric(sysIdStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_SYSID));
sysID = StringUtils.deleteWhitespace(sysIdStr);
@@ -88,10 +82,6 @@ public class CommonConfiguration {
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshIDCStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_IDC));
eventMeshIDC = StringUtils.deleteWhitespace(eventMeshIDCStr);
- String eventMeshDCNStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_DCN);
- Preconditions.checkState(StringUtils.isNotEmpty(eventMeshDCNStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_DCN));
- eventMeshDCN = StringUtils.deleteWhitespace(eventMeshDCNStr);
-
eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
eventMeshServerIp = getLocalAddr();
@@ -102,12 +92,8 @@ public class CommonConfiguration {
static class ConfKeys {
public static String KEYS_EVENTMESH_ENV = "eventMesh.server.env";
- public static String KEYS_EVENTMESH_REGION = "eventMesh.server.region";
-
public static String KEYS_EVENTMESH_IDC = "eventMesh.server.idc";
- public static String KEYS_EVENTMESH_DCN = "eventMesh.server.dcn";
-
public static String KEYS_EVENTMESH_SYSID = "eventMesh.sysid";
public static String KEYS_EVENTMESH_SERVER_CLUSTER = "eventMesh.server.cluster";
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
index ace4d71..751b632 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
@@ -31,6 +31,9 @@ public class HeartbeatRequestBody extends Body {
public static final String CLIENTTYPE = "clientType";
public static final String HEARTBEATENTITIES = "heartbeatEntities";
+ public static final String CONSUMERGROUP = "consumerGroup";
+
+ private String consumerGroup;
private String clientType;
@@ -52,9 +55,18 @@ public class HeartbeatRequestBody extends Body {
this.heartbeatEntities = heartbeatEntities;
}
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
public static HeartbeatRequestBody buildBody(Map<String, Object> bodyParam) {
HeartbeatRequestBody body = new HeartbeatRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
+ body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
body.setHeartbeatEntities(JSONArray.parseArray(MapUtils.getString(bodyParam, HEARTBEATENTITIES), HeartbeatEntity.class));
return body;
}
@@ -63,6 +75,7 @@ public class HeartbeatRequestBody extends Body {
public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<String, Object>();
map.put(CLIENTTYPE, clientType);
+ map.put(CONSUMERGROUP, consumerGroup);
map.put(HEARTBEATENTITIES, JSON.toJSONString(heartbeatEntities));
return map;
}
@@ -89,6 +102,7 @@ public class HeartbeatRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("heartbeatRequestBody={")
+ .append("consumerGroup=").append(consumerGroup).append(",")
.append("clientType=").append(clientType).append("}");
return sb.toString();
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
index 991b834..72e2e91 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
@@ -34,8 +34,22 @@ public class SubscribeRequestBody extends Body {
public static final String URL = "url";
+ public static final String CONSUMERGROUP = "consumerGroup";
+
private List<SubscriptionItem> topics;
+ private String url;
+
+ private String consumerGroup;
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
public List<SubscriptionItem> getTopics() {
return topics;
}
@@ -44,8 +58,6 @@ public class SubscribeRequestBody extends Body {
this.topics = topics;
}
- private String url;
-
public String getUrl() {
return url;
}
@@ -58,6 +70,7 @@ public class SubscribeRequestBody extends Body {
SubscribeRequestBody body = new SubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
+ body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
return body;
}
@@ -66,13 +79,15 @@ public class SubscribeRequestBody extends Body {
Map<String, Object> map = new HashMap<String, Object>();
map.put(URL, url);
map.put(TOPIC, JSON.toJSONString(topics));
+ map.put(CONSUMERGROUP, consumerGroup);
return map;
}
@Override
public String toString() {
return "subscribeBody{" +
- "url='" + url + '\'' +
+ "consumerGroup='" + consumerGroup + '\'' +
+ ", url='" + url + '\'' +
", topics=" + topics +
'}';
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
index e690f7e..756f1a9 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
@@ -33,10 +33,14 @@ public class UnSubscribeRequestBody extends Body {
public static final String URL = "url";
+ public static final String CONSUMERGROUP = "consumerGroup";
+
private List<String> topics;
private String url;
+ private String consumerGroup;
+
public List<String> getTopics() {
return topics;
}
@@ -53,10 +57,19 @@ public class UnSubscribeRequestBody extends Body {
this.url = url;
}
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
public static UnSubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
UnSubscribeRequestBody body = new UnSubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
+ body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
return body;
}
@@ -65,13 +78,15 @@ public class UnSubscribeRequestBody extends Body {
Map<String, Object> map = new HashMap<String, Object>();
map.put(URL, url);
map.put(TOPIC, JSON.toJSONString(topics));
+ map.put(CONSUMERGROUP, consumerGroup);
return map;
}
@Override
public String toString() {
return "unSubscribeRequestBody{" +
- "url='" + url + '\'' +
+ "consumerGroup='" + consumerGroup + '\'' +
+ ", url='" + url + '\'' +
", topics=" + topics +
'}';
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
index ddd29e5..e2de019 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
@@ -29,12 +29,12 @@ import org.apache.eventmesh.common.protocol.http.body.Body;
public class ReplyMessageRequestBody extends Body {
-
public static final String ORIGTOPIC = "origTopic";
public static final String BIZSEQNO = "bizSeqNo";
public static final String UNIQUEID = "uniqueId";
public static final String CONTENT = "content";
public static final String EXTFIELDS = "extFields";
+ public static final String PRODUCERGROUP = "producerGroup";
private String bizSeqNo;
@@ -46,6 +46,8 @@ public class ReplyMessageRequestBody extends Body {
private HashMap<String, String> extFields;
+ private String producerGroup;
+
public String getOrigTopic() {
return origTopic;
}
@@ -86,6 +88,14 @@ public class ReplyMessageRequestBody extends Body {
this.extFields = extFields;
}
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
@SuppressWarnings("unchecked")
public static ReplyMessageRequestBody buildBody(Map<String, Object> bodyParam) {
ReplyMessageRequestBody body = new ReplyMessageRequestBody();
@@ -97,6 +107,7 @@ public class ReplyMessageRequestBody extends Body {
if (StringUtils.isNotBlank(extFields)) {
body.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
}
+ body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
return body;
}
@@ -108,6 +119,7 @@ public class ReplyMessageRequestBody extends Body {
.append("uniqueId=").append(uniqueId).append(",")
.append("origTopic=").append(origTopic).append(",")
.append("content=").append(content).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
.append("extFields=").append(extFields).append("}");
return sb.toString();
}
@@ -120,6 +132,7 @@ public class ReplyMessageRequestBody extends Body {
map.put(UNIQUEID, uniqueId);
map.put(CONTENT, content);
map.put(EXTFIELDS, JSON.toJSONString(extFields));
+ map.put(PRODUCERGROUP, producerGroup);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
index ced6bef..576e352 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
@@ -33,6 +33,7 @@ public class SendMessageBatchRequestBody extends Body {
public static final String BATCHID = "batchId";
public static final String CONTENTS = "contents";
public static final String SIZE = "size";
+ public static final String PRODUCERGROUP = "producerGroup";
private String batchId;
@@ -40,6 +41,8 @@ public class SendMessageBatchRequestBody extends Body {
private String size;
+ private String producerGroup;
+
public SendMessageBatchRequestBody() {
}
@@ -67,12 +70,21 @@ public class SendMessageBatchRequestBody extends Body {
this.size = size;
}
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("sendMessageBatchRequestBody={")
.append("batchId=").append(batchId).append(",")
.append("size=").append(size).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
.append("contents=").append(JSON.toJSONString(contents)).append("}");
return sb.toString();
}
@@ -111,6 +123,7 @@ public class SendMessageBatchRequestBody extends Body {
body.setContents(JSONArray.parseArray(contents, BatchMessageEntity.class));
}
body.setSize(size);
+ body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
return body;
}
@@ -120,6 +133,7 @@ public class SendMessageBatchRequestBody extends Body {
map.put(BATCHID, batchId);
map.put(SIZE, size);
map.put(CONTENTS, contents);
+ map.put(PRODUCERGROUP, producerGroup);
return map;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2RequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2RequestBody.java
index dc46adc..8f97cf1 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2RequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchV2RequestBody.java
@@ -30,6 +30,7 @@ public class SendMessageBatchV2RequestBody extends Body {
public static final String MSG = "msg";
public static final String TAG = "tag";
public static final String TTL = "ttl";
+ public static final String PRODUCERGROUP = "producerGroup";
private String bizSeqNo;
@@ -41,6 +42,8 @@ public class SendMessageBatchV2RequestBody extends Body {
private String ttl;
+ private String producerGroup;
+
public String getBizSeqNo() {
return bizSeqNo;
}
@@ -81,6 +84,14 @@ public class SendMessageBatchV2RequestBody extends Body {
this.ttl = ttl;
}
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
public static SendMessageBatchV2RequestBody buildBody(final Map<String, Object> bodyParam) {
String bizSeqno = MapUtils.getString(bodyParam,
BIZSEQNO);
@@ -98,6 +109,7 @@ public class SendMessageBatchV2RequestBody extends Body {
body.setTag(tag);
body.setTtl(ttl);
body.setTopic(topic);
+ body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
return body;
}
@@ -109,6 +121,7 @@ public class SendMessageBatchV2RequestBody extends Body {
map.put(MSG, msg);
map.put(TAG, tag);
map.put(TTL, ttl);
+ map.put(PRODUCERGROUP, producerGroup);
return map;
}
@@ -120,6 +133,7 @@ public class SendMessageBatchV2RequestBody extends Body {
.append("topic=").append(topic).append(",")
.append("tag=").append(tag).append(",")
.append("ttl=").append(ttl).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
.append("msg=").append(msg).append("}");
return sb.toString();
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
index 840e2f7..5c302e8 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
@@ -35,7 +35,7 @@ public class SendMessageRequestBody extends Body {
public static final String TTL = "ttl";
public static final String TAG = "tag";
public static final String EXTFIELDS = "extFields";
-
+ public static final String PRODUCERGROUP = "producerGroup";
private String topic;
@@ -51,6 +51,8 @@ public class SendMessageRequestBody extends Body {
private HashMap<String, String> extFields;
+ private String producerGroup;
+
public String getTopic() {
return topic;
}
@@ -107,6 +109,14 @@ public class SendMessageRequestBody extends Body {
this.tag = tag;
}
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
@SuppressWarnings("unchecked")
public static SendMessageRequestBody buildBody(Map<String, Object> bodyParam) {
SendMessageRequestBody body = new SendMessageRequestBody();
@@ -120,6 +130,7 @@ public class SendMessageRequestBody extends Body {
if (StringUtils.isNotBlank(extFields)) {
body.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
}
+ body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
return body;
}
@@ -133,6 +144,7 @@ public class SendMessageRequestBody extends Body {
map.put(TAG, tag);
map.put(CONTENT, content);
map.put(EXTFIELDS, extFields);
+ map.put(PRODUCERGROUP, producerGroup);
return map;
}
@@ -146,6 +158,7 @@ public class SendMessageRequestBody extends Body {
.append("content=").append(content).append(",")
.append("ttl=").append(ttl).append(",")
.append("tag=").append(tag).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
.append("extFields=").append(extFields).append("}");
return sb.toString();
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
index c6cb15d..12e79ad 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/ProtocolKey.java
@@ -26,9 +26,7 @@ public class ProtocolKey {
public static class ClientInstanceKey {
////////////////////////////////////协议层请求方描述///////////
public static final String ENV = "Env";
- public static final String REGION = "Region";
public static final String IDC = "Idc";
- public static final String DCN = "Dcn";
public static final String SYS = "Sys";
public static final String PID = "Pid";
public static final String IP = "Ip";
@@ -42,9 +40,7 @@ public class ProtocolKey {
public static final String EVENTMESHCLUSTER = "EventMeshCluster";
public static final String EVENTMESHIP = "EventMeshIp";
public static final String EVENTMESHENV = "EventMeshEnv";
- public static final String EVENTMESHREGION = "EventMeshRegion";
public static final String EVENTMESHIDC = "EventMeshIdc";
- public static final String EVENTMESHDCN = "EventMeshDcn";
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java
index e189823..5cd0724 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeader.java
@@ -41,15 +41,9 @@ public class HeartbeatRequestHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -59,6 +53,8 @@ public class HeartbeatRequestHeader extends Header {
//请求方的IP
private String ip;
+ private String producerGroup;
+
//请求方的USERNAME
private String username = "username";
@@ -97,14 +93,6 @@ public class HeartbeatRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -113,14 +101,6 @@ public class HeartbeatRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -168,9 +148,7 @@ public class HeartbeatRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -187,9 +165,7 @@ public class HeartbeatRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
@@ -206,9 +182,7 @@ public class HeartbeatRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeader.java
index 3863498..e0d6e99 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeader.java
@@ -34,12 +34,8 @@ public class HeartbeatResponseHeader extends Header {
private String eventMeshEnv;
- private String eventMeshRegion;
-
private String eventMeshIdc;
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -72,14 +68,6 @@ public class HeartbeatResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -88,24 +76,14 @@ public class HeartbeatResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static HeartbeatResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv,
+ String eventMeshIDC) {
HeartbeatResponseHeader heartbeatResponseHeader = new HeartbeatResponseHeader();
heartbeatResponseHeader.setCode(requestCode);
heartbeatResponseHeader.setEventMeshCluster(eventMeshCluster);
- heartbeatResponseHeader.setEventMeshDcn(eventMeshDcn);
heartbeatResponseHeader.setEventMeshIp(eventMeshIp);
heartbeatResponseHeader.setEventMeshEnv(eventMeshEnv);
- heartbeatResponseHeader.setEventMeshRegion(eventMeshRegion);
heartbeatResponseHeader.setEventMeshIdc(eventMeshIDC);
return heartbeatResponseHeader;
}
@@ -116,9 +94,7 @@ public class HeartbeatResponseHeader extends Header {
sb.append("heartbeatResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -131,9 +107,7 @@ public class HeartbeatResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, eventMeshCluster);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeader.java
index 2dbcd6f..0ee7cab 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeader.java
@@ -37,12 +37,8 @@ public class RegRequestHeader extends Header {
private String env;
- private String region;
-
private String idc;
- private String dcn;
-
private String sys;
private String pid;
@@ -61,9 +57,7 @@ public class RegRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -104,14 +98,6 @@ public class RegRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -120,14 +106,6 @@ public class RegRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -175,9 +153,7 @@ public class RegRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -194,9 +170,7 @@ public class RegRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java
index d0257cc..503a527 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeader.java
@@ -38,15 +38,9 @@ public class RegResponseHeader extends Header {
//处理该次Request请求的eventMesh所在的环境编号
private String eventMeshEnv;
- //处理该次Request请求的eventMesh所在区域
- private String eventMeshRegion;
-
//处理该次Request请求的eventMesh所在IDC
private String eventMeshIdc;
- //处理该次Request请求的eventMesh所在DCN
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -79,14 +73,6 @@ public class RegResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -95,24 +81,13 @@ public class RegResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static RegResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv, String eventMeshIDC) {
RegResponseHeader regResponseHeader = new RegResponseHeader();
regResponseHeader.setCode(requestCode);
regResponseHeader.setEventMeshCluster(eventMeshCluster);
- regResponseHeader.setEventMeshDcn(eventMeshDcn);
regResponseHeader.setEventMeshIp(eventMeshIp);
regResponseHeader.setEventMeshEnv(eventMeshEnv);
- regResponseHeader.setEventMeshRegion(eventMeshRegion);
regResponseHeader.setEventMeshIdc(eventMeshIDC);
return regResponseHeader;
}
@@ -123,9 +98,7 @@ public class RegResponseHeader extends Header {
sb.append("regResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -138,9 +111,7 @@ public class RegResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, eventMeshCluster);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeader.java
index 9fffa22..6143e4b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeader.java
@@ -37,12 +37,8 @@ public class SubscribeRequestHeader extends Header {
private String env;
- private String region;
-
private String idc;
- private String dcn;
-
private String sys;
private String pid;
@@ -61,9 +57,7 @@ public class SubscribeRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -104,14 +98,6 @@ public class SubscribeRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -120,14 +106,6 @@ public class SubscribeRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -175,9 +153,7 @@ public class SubscribeRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -194,9 +170,7 @@ public class SubscribeRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeader.java
index dbd9bb0..f21fbce 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeader.java
@@ -33,12 +33,8 @@ public class SubscribeResponseHeader extends Header {
private String eventMeshEnv;
- private String eventMeshRegion;
-
private String eventMeshIdc;
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -71,14 +67,6 @@ public class SubscribeResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -87,24 +75,14 @@ public class SubscribeResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static SubscribeResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv,
+ String eventMeshIDC) {
SubscribeResponseHeader subscribeResponseHeader = new SubscribeResponseHeader();
subscribeResponseHeader.setCode(requestCode);
subscribeResponseHeader.setEventMeshCluster(eventMeshCluster);
- subscribeResponseHeader.setEventMeshDcn(eventMeshDcn);
subscribeResponseHeader.setEventMeshIp(eventMeshIp);
subscribeResponseHeader.setEventMeshEnv(eventMeshEnv);
- subscribeResponseHeader.setEventMeshRegion(eventMeshRegion);
subscribeResponseHeader.setEventMeshIdc(eventMeshIDC);
return subscribeResponseHeader;
}
@@ -115,9 +93,7 @@ public class SubscribeResponseHeader extends Header {
sb.append("subscribeResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -131,8 +107,6 @@ public class SubscribeResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java
index 9e040d2..09c3a6d 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeader.java
@@ -41,15 +41,9 @@ public class UnRegRequestHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -73,9 +67,7 @@ public class UnRegRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -116,14 +108,6 @@ public class UnRegRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -132,14 +116,6 @@ public class UnRegRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -187,9 +163,7 @@ public class UnRegRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -206,9 +180,7 @@ public class UnRegRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeader.java
index d009228..4338c03 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeader.java
@@ -34,12 +34,8 @@ public class UnRegResponseHeader extends Header {
private String eventMeshEnv;
- private String eventMeshRegion;
-
private String eventMeshIdc;
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -72,14 +68,6 @@ public class UnRegResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -88,24 +76,13 @@ public class UnRegResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static UnRegResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv, String eventMeshIDC) {
UnRegResponseHeader regResponseHeader = new UnRegResponseHeader();
regResponseHeader.setCode(requestCode);
regResponseHeader.setEventMeshCluster(eventMeshCluster);
- regResponseHeader.setEventMeshDcn(eventMeshDcn);
regResponseHeader.setEventMeshIp(eventMeshIp);
regResponseHeader.setEventMeshEnv(eventMeshEnv);
- regResponseHeader.setEventMeshRegion(eventMeshRegion);
regResponseHeader.setEventMeshIdc(eventMeshIDC);
return regResponseHeader;
}
@@ -116,9 +93,7 @@ public class UnRegResponseHeader extends Header {
sb.append("nnRegResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -132,8 +107,6 @@ public class UnRegResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeader.java
index 0c505ee..f5b3456 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeader.java
@@ -37,12 +37,8 @@ public class UnSubscribeRequestHeader extends Header {
private String env;
- private String region;
-
private String idc;
- private String dcn;
-
private String sys;
private String pid;
@@ -61,9 +57,7 @@ public class UnSubscribeRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -104,14 +98,6 @@ public class UnSubscribeRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -120,14 +106,6 @@ public class UnSubscribeRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -175,9 +153,7 @@ public class UnSubscribeRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -194,9 +170,7 @@ public class UnSubscribeRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeader.java
index f6e15be..f263823 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeader.java
@@ -33,12 +33,8 @@ public class UnSubscribeResponseHeader extends Header {
private String eventMeshEnv;
- private String eventMeshRegion;
-
private String eventMeshIdc;
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -71,14 +67,6 @@ public class UnSubscribeResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -87,24 +75,13 @@ public class UnSubscribeResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static UnSubscribeResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv, String eventMeshIDC) {
UnSubscribeResponseHeader unSubscribeResponseHeader = new UnSubscribeResponseHeader();
unSubscribeResponseHeader.setCode(requestCode);
unSubscribeResponseHeader.setEventMeshCluster(eventMeshCluster);
- unSubscribeResponseHeader.setEventMeshDcn(eventMeshDcn);
unSubscribeResponseHeader.setEventMeshIp(eventMeshIp);
unSubscribeResponseHeader.setEventMeshEnv(eventMeshEnv);
- unSubscribeResponseHeader.setEventMeshRegion(eventMeshRegion);
unSubscribeResponseHeader.setEventMeshIdc(eventMeshIDC);
return unSubscribeResponseHeader;
}
@@ -115,9 +92,7 @@ public class UnSubscribeResponseHeader extends Header {
sb.append("unSubscribeResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -131,8 +106,6 @@ public class UnSubscribeResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java
index e0b3fa9..18f6cfc 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageRequestHeader.java
@@ -44,12 +44,8 @@ public class PushMessageRequestHeader extends Header {
private String eventMeshEnv;
- private String eventMeshRegion;
-
private String eventMeshIdc;
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -82,14 +78,6 @@ public class PushMessageRequestHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -98,14 +86,6 @@ public class PushMessageRequestHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public String getLanguage() {
return language;
}
@@ -129,9 +109,7 @@ public class PushMessageRequestHeader extends Header {
pushMessageRequestHeader.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
pushMessageRequestHeader.setEventMeshCluster(MapUtils.getString(headerParam, ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER));
pushMessageRequestHeader.setEventMeshIp(MapUtils.getString(headerParam, ProtocolKey.EventMeshInstanceKey.EVENTMESHIP));
- pushMessageRequestHeader.setEventMeshDcn(MapUtils.getString(headerParam, ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN));
pushMessageRequestHeader.setEventMeshEnv(MapUtils.getString(headerParam, ProtocolKey.EventMeshInstanceKey.EVENTMESHENV));
- pushMessageRequestHeader.setEventMeshRegion(MapUtils.getString(headerParam, ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION));
pushMessageRequestHeader.setEventMeshIdc(MapUtils.getString(headerParam, ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC));
return pushMessageRequestHeader;
}
@@ -145,9 +123,7 @@ public class PushMessageRequestHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, eventMeshCluster);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
@@ -159,9 +135,7 @@ public class PushMessageRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version.getVersion()).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java
index 9571ba5..2f8fc63 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/PushMessageResponseHeader.java
@@ -40,15 +40,9 @@ public class PushMessageResponseHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -112,14 +106,6 @@ public class PushMessageResponseHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -128,14 +114,6 @@ public class PushMessageResponseHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -160,16 +138,14 @@ public class PushMessageResponseHeader extends Header {
this.ip = ip;
}
- public static PushMessageResponseHeader buildHeader(int requestCode, String clientEnv, String clientRegion, String clientIDC,
- String clientDCN, String clientSysId, String clientPid, String clientIP) {
+ public static PushMessageResponseHeader buildHeader(int requestCode, String clientEnv, String clientIDC,
+ String clientSysId, String clientPid, String clientIP) {
PushMessageResponseHeader pushMessageResponseHeader = new PushMessageResponseHeader();
pushMessageResponseHeader.setCode(requestCode);
pushMessageResponseHeader.setVersion(ProtocolVersion.V1);
pushMessageResponseHeader.setLanguage(Constants.LANGUAGE_JAVA);
pushMessageResponseHeader.setEnv(clientEnv);
- pushMessageResponseHeader.setRegion(clientRegion);
pushMessageResponseHeader.setIdc(clientIDC);
- pushMessageResponseHeader.setDcn(clientDCN);
pushMessageResponseHeader.setSys(clientSysId);
pushMessageResponseHeader.setPid(clientPid);
pushMessageResponseHeader.setIp(clientIP);
@@ -184,9 +160,7 @@ public class PushMessageResponseHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
@@ -202,9 +176,7 @@ public class PushMessageResponseHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java
index 3a25164..e7d347e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageRequestHeader.java
@@ -41,15 +41,9 @@ public class ReplyMessageRequestHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -113,14 +107,6 @@ public class ReplyMessageRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -129,14 +115,6 @@ public class ReplyMessageRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -169,9 +147,7 @@ public class ReplyMessageRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -187,9 +163,7 @@ public class ReplyMessageRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -206,9 +180,7 @@ public class ReplyMessageRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java
index fb2f42f..33fd373 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/ReplyMessageResponseHeader.java
@@ -38,15 +38,9 @@ public class ReplyMessageResponseHeader extends Header {
//处理该次Request请求的eventMesh所在的环境编号
private String eventMeshEnv;
- //处理该次Request请求的eventMesh所在区域
- private String eventMeshRegion;
-
//处理该次Request请求的eventMesh所在IDC
private String eventMeshIdc;
- //处理该次Request请求的eventMesh所在DCN
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -79,14 +73,6 @@ public class ReplyMessageResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -95,24 +81,14 @@ public class ReplyMessageResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static ReplyMessageResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv,
+ String eventMeshIDC) {
ReplyMessageResponseHeader replyMessageResponseHeader = new ReplyMessageResponseHeader();
replyMessageResponseHeader.setCode(requestCode);
replyMessageResponseHeader.setEventMeshCluster(eventMeshCluster);
- replyMessageResponseHeader.setEventMeshDcn(eventMeshDcn);
replyMessageResponseHeader.setEventMeshIp(eventMeshIp);
replyMessageResponseHeader.setEventMeshEnv(eventMeshEnv);
- replyMessageResponseHeader.setEventMeshRegion(eventMeshRegion);
replyMessageResponseHeader.setEventMeshIdc(eventMeshIDC);
return replyMessageResponseHeader;
}
@@ -123,9 +99,7 @@ public class ReplyMessageResponseHeader extends Header {
sb.append("replyMessageResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -139,8 +113,6 @@ public class ReplyMessageResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java
index fd62cce..f981ceb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchRequestHeader.java
@@ -42,15 +42,9 @@ public class SendMessageBatchRequestHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -114,14 +108,6 @@ public class SendMessageBatchRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -130,14 +116,6 @@ public class SendMessageBatchRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -170,9 +148,7 @@ public class SendMessageBatchRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -188,9 +164,7 @@ public class SendMessageBatchRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -207,9 +181,7 @@ public class SendMessageBatchRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java
index c008ed3..d06660e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchResponseHeader.java
@@ -38,15 +38,9 @@ public class SendMessageBatchResponseHeader extends Header {
//处理该次Request请求的eventMesh所在的环境编号
private String eventMeshEnv;
- //处理该次Request请求的eventMesh所在区域
- private String eventMeshRegion;
-
//处理该次Request请求的eventMesh所在IDC
private String eventMeshIdc;
- //处理该次Request请求的eventMesh所在DCN
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -79,14 +73,6 @@ public class SendMessageBatchResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -95,23 +81,13 @@ public class SendMessageBatchResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static SendMessageBatchResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv,
+ String eventMeshIDC) {
SendMessageBatchResponseHeader sendMessageBatchResponseHeader = new SendMessageBatchResponseHeader();
sendMessageBatchResponseHeader.setCode(requestCode);
sendMessageBatchResponseHeader.setEventMeshCluster(eventMeshCluster);
- sendMessageBatchResponseHeader.setEventMeshDcn(eventMeshDcn);
sendMessageBatchResponseHeader.setEventMeshEnv(eventMeshEnv);
- sendMessageBatchResponseHeader.setEventMeshRegion(eventMeshRegion);
sendMessageBatchResponseHeader.setEventMeshIdc(eventMeshIDC);
sendMessageBatchResponseHeader.setEventMeshIp(eventMeshIp);
return sendMessageBatchResponseHeader;
@@ -123,9 +99,7 @@ public class SendMessageBatchResponseHeader extends Header {
sb.append("sendMessageBatchResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -138,9 +112,7 @@ public class SendMessageBatchResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, eventMeshCluster);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java
index 8722d68..1e7468e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2RequestHeader.java
@@ -41,15 +41,9 @@ public class SendMessageBatchV2RequestHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -113,14 +107,6 @@ public class SendMessageBatchV2RequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -129,14 +115,6 @@ public class SendMessageBatchV2RequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -169,9 +147,7 @@ public class SendMessageBatchV2RequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -187,9 +163,7 @@ public class SendMessageBatchV2RequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -206,9 +180,7 @@ public class SendMessageBatchV2RequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java
index c284fc0..1544381 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageBatchV2ResponseHeader.java
@@ -37,15 +37,9 @@ public class SendMessageBatchV2ResponseHeader extends Header {
//处理该次Request请求的eventMesh所在的环境编号
private String eventMeshEnv;
- //处理该次Request请求的eventMesh所在区域
- private String eventMeshRegion;
-
//处理该次Request请求的eventMesh所在IDC
private String eventMeshIdc;
- //处理该次Request请求的eventMesh所在DCN
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -78,14 +72,6 @@ public class SendMessageBatchV2ResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -94,23 +80,13 @@ public class SendMessageBatchV2ResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static SendMessageBatchV2ResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv,
+ String eventMeshIDC) {
SendMessageBatchV2ResponseHeader header = new SendMessageBatchV2ResponseHeader();
header.setCode(requestCode);
header.setEventMeshCluster(eventMeshCluster);
- header.setEventMeshDcn(eventMeshDcn);
header.setEventMeshEnv(eventMeshEnv);
- header.setEventMeshRegion(eventMeshRegion);
header.setEventMeshIdc(eventMeshIDC);
header.setEventMeshIp(eventMeshIp);
return header;
@@ -122,9 +98,7 @@ public class SendMessageBatchV2ResponseHeader extends Header {
sb.append("sendMessageBatchV2ResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -137,9 +111,7 @@ public class SendMessageBatchV2ResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, eventMeshCluster);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java
index 3a1778f..8512529 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageRequestHeader.java
@@ -41,15 +41,9 @@ public class SendMessageRequestHeader extends Header {
//请求方所在环境编号
private String env;
- //请求方所在区域编码
- private String region;
-
//请求方所在IDC
private String idc;
- //请求方所在DCN
- private String dcn;
-
//请求方的子系统
private String sys;
@@ -113,14 +107,6 @@ public class SendMessageRequestHeader extends Header {
this.env = env;
}
- public String getRegion() {
- return region;
- }
-
- public void setRegion(String region) {
- this.region = region;
- }
-
public String getIdc() {
return idc;
}
@@ -129,14 +115,6 @@ public class SendMessageRequestHeader extends Header {
this.idc = idc;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getSys() {
return sys;
}
@@ -169,9 +147,7 @@ public class SendMessageRequestHeader extends Header {
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
header.setEnv(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.ENV));
- header.setRegion(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.REGION));
header.setIdc(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IDC));
- header.setDcn(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.DCN));
header.setSys(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.SYS));
header.setPid(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.PID));
header.setIp(MapUtils.getString(headerParam, ProtocolKey.ClientInstanceKey.IP));
@@ -187,9 +163,7 @@ public class SendMessageRequestHeader extends Header {
map.put(ProtocolKey.LANGUAGE, language);
map.put(ProtocolKey.VERSION, version);
map.put(ProtocolKey.ClientInstanceKey.ENV, env);
- map.put(ProtocolKey.ClientInstanceKey.REGION, region);
map.put(ProtocolKey.ClientInstanceKey.IDC, idc);
- map.put(ProtocolKey.ClientInstanceKey.DCN, dcn);
map.put(ProtocolKey.ClientInstanceKey.SYS, sys);
map.put(ProtocolKey.ClientInstanceKey.PID, pid);
map.put(ProtocolKey.ClientInstanceKey.IP, ip);
@@ -206,9 +180,7 @@ public class SendMessageRequestHeader extends Header {
.append("language=").append(language).append(",")
.append("version=").append(version).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
.append("sys=").append(sys).append(",")
.append("pid=").append(pid).append(",")
.append("ip=").append(ip).append(",")
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java
index 2f14bd6..caa5eed 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/header/message/SendMessageResponseHeader.java
@@ -38,15 +38,9 @@ public class SendMessageResponseHeader extends Header {
//处理该次Request请求的eventMesh所在的环境编号
private String eventMeshEnv;
- //处理该次Request请求的eventMesh所在区域
- private String eventMeshRegion;
-
//处理该次Request请求的eventMesh所在IDC
private String eventMeshIdc;
- //处理该次Request请求的eventMesh所在DCN
- private String eventMeshDcn;
-
public int getCode() {
return code;
}
@@ -79,14 +73,6 @@ public class SendMessageResponseHeader extends Header {
this.eventMeshEnv = eventMeshEnv;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshIdc() {
return eventMeshIdc;
}
@@ -95,24 +81,14 @@ public class SendMessageResponseHeader extends Header {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public static SendMessageResponseHeader buildHeader(Integer requestCode, String eventMeshCluster,
- String eventMeshIp, String eventMeshEnv, String eventMeshRegion,
- String eventMeshDcn, String eventMeshIDC) {
+ String eventMeshIp, String eventMeshEnv,
+ String eventMeshIDC) {
SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader();
sendMessageResponseHeader.setCode(requestCode);
sendMessageResponseHeader.setEventMeshCluster(eventMeshCluster);
- sendMessageResponseHeader.setEventMeshDcn(eventMeshDcn);
sendMessageResponseHeader.setEventMeshIp(eventMeshIp);
sendMessageResponseHeader.setEventMeshEnv(eventMeshEnv);
- sendMessageResponseHeader.setEventMeshRegion(eventMeshRegion);
sendMessageResponseHeader.setEventMeshIdc(eventMeshIDC);
return sendMessageResponseHeader;
}
@@ -123,9 +99,7 @@ public class SendMessageResponseHeader extends Header {
sb.append("sendMessageResponseHeader={")
.append("code=").append(code).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
.append("eventMeshIp=").append(eventMeshIp).append("}");
return sb.toString();
@@ -138,9 +112,7 @@ public class SendMessageResponseHeader extends Header {
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, eventMeshCluster);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, eventMeshIp);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, eventMeshEnv);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, eventMeshRegion);
map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, eventMeshIdc);
- map.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, eventMeshDcn);
return map;
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
index 27ba693..14b5058 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/UserAgent.java
@@ -21,7 +21,6 @@ public class UserAgent {
private String env;
private String subsystem;
- private String dcn;
private String path;
private int pid;
private String host;
@@ -30,12 +29,30 @@ public class UserAgent {
private String username;
private String password;
private String idc;
+ private String producerGroup;
+ private String consumerGroup;
private String purpose;
private int unack = 0;
public UserAgent() {
}
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
public String getEnv() {
return env;
}
@@ -60,14 +77,6 @@ public class UserAgent {
this.subsystem = subsystem;
}
- public String getDcn() {
- return dcn;
- }
-
- public void setDcn(String dcn) {
- this.dcn = dcn;
- }
-
public String getPath() {
return path;
}
@@ -145,7 +154,6 @@ public class UserAgent {
return "UserAgent{" +
"env='" + env + '\'' +
"subsystem='" + subsystem + '\'' +
- ", dcn='" + dcn + '\'' +
", path='" + path + '\'' +
", pid=" + pid +
", host='" + host + '\'' +
@@ -168,7 +176,6 @@ public class UserAgent {
if (port != userAgent.port) return false;
if (unack != userAgent.unack) return false;
if (subsystem != null ? !subsystem.equals(userAgent.subsystem) : userAgent.subsystem != null) return false;
- if (dcn != null ? !dcn.equals(userAgent.dcn) : userAgent.dcn != null) return false;
if (path != null ? !path.equals(userAgent.path) : userAgent.path != null) return false;
if (host != null ? !host.equals(userAgent.host) : userAgent.host != null) return false;
if (purpose != null ? !purpose.equals(userAgent.purpose) : userAgent.purpose != null) return false;
@@ -182,7 +189,6 @@ public class UserAgent {
@Override
public int hashCode() {
int result = subsystem != null ? subsystem.hashCode() : 0;
- result = 31 * result + (dcn != null ? dcn.hashCode() : 0);
result = 31 * result + (path != null ? path.hashCode() : 0);
result = 31 * result + pid;
result = 31 * result + (host != null ? host.hashCode() : 0);
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
index 9d54a2e..fc55a68 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/Constants.java
@@ -18,11 +18,8 @@
package org.apache.eventmesh.connector.rocketmq.common;
public class Constants {
- public static final String BROADCAST_PREFIX = "broadcast-";
-
- public final static String CONSUMER_GROUP_NAME_PREFIX = "ConsumerGroup-";
- public final static String PRODUCER_GROUP_NAME_PREFIX = "ProducerGroup-";
+ public static final String BROADCAST_PREFIX = "broadcast-";
public static final String PROPERTY_MESSAGE_TIMEOUT = "TIMEOUT";
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
index cce1645..69fb8c1 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/common/EventMeshConstants.java
@@ -29,10 +29,6 @@ public class EventMeshConstants {
public static final String BROADCAST_PREFIX = "broadcast-";
- public final static String CONSUMER_GROUP_NAME_PREFIX = "ConsumerGroup-";
-
- public final static String PRODUCER_GROUP_NAME_PREFIX = "ProducerGroup-";
-
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));
@@ -85,7 +81,6 @@ public class EventMeshConstants {
public static final String TAG = "TAG";
- public static final String MANAGE_DCN = "dcn";
public static final String MANAGE_SUBSYSTEM = "subSystem";
public static final String MANAGE_IP = "ip";
public static final String MANAGE_PORT = "port";
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
index 9eb6379..853c394 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java
@@ -71,9 +71,7 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {
if (isBroadcast) {
- consumerGroup = Constants.CONSUMER_GROUP_NAME_PREFIX + Constants.BROADCAST_PREFIX + consumerGroup;
- } else {
- consumerGroup = Constants.CONSUMER_GROUP_NAME_PREFIX + consumerGroup;
+ consumerGroup = Constants.BROADCAST_PREFIX + consumerGroup;
}
String omsNamesrv = clientConfiguration.namesrvAddr;
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 7377ac1..035b950 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -17,10 +17,8 @@
###############################EVNETMESH-runtime ENV#################################
eventMesh.server.idc=DEFAULT
eventMesh.server.env=PRD
-eventMesh.server.region=region1
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
-eventMesh.server.dcn=010
eventMesh.sysid=0000
eventMesh.server.http.port=10105
########################## eventMesh tcp configuration ############################
diff --git a/eventmesh-runtime/scripts/client_manage.sh b/eventmesh-runtime/scripts/client_manage.sh
index f9367a0..0913cc5 100644
--- a/eventmesh-runtime/scripts/client_manage.sh
+++ b/eventmesh-runtime/scripts/client_manage.sh
@@ -17,8 +17,8 @@
# specific language governing permissions and limitations
# under the License.
i_eg="sh client_manage.sh -i 10.255.34.160 24591"
-s_eg="sh client_manage.sh -s FT0 5319"
-r_eg="sh client_manage.sh -r FT0 9876 10.255.1.143 10000"
+s_eg="sh client_manage.sh -s 5319"
+r_eg="sh client_manage.sh -r 9876 10.255.1.143 10000"
a_eg="sh client_manage.sh -a"
x_eg="sh client_manage.sh -x 10.255.34.160 24591 10.255.1.143 10000"
y_eg="sh client_manage.sh -y bq-bypass 10.255.1.143 10000"
@@ -27,8 +27,8 @@ function printEg() {
echo "param error."
echo "reject client by ip_port, eg : ${i_eg}"
echo "reject all clients, eg : ${a_eg}"
- echo "reject clients by dcn_systemid, eg : ${s_eg}"
- echo "redirect client by dcn_systemid, eg : ${r_eg}"
+ echo "reject clients by systemid, eg : ${s_eg}"
+ echo "redirect client by systemid, eg : ${r_eg}"
echo "redirect client by ip port, eg : ${x_eg}"
echo "redirect client by path, eg : ${y_eg}"
}
@@ -52,9 +52,8 @@ do
CLIENT_PORT=$5
msg=`curl "http://${ADDR}/clientManage/rejectClientByIpPort?ip=${CLIENT_IP}&port=${CLIENT_PORT}"`;echo ${msg};break;;
-s|--subsystem)
- DCN=$4
- SUB_SYSTEM=$5
- msg=`curl "http://${ADDR}/clientManage/rejectClientBySubSystem?dcn=${DCN}&subSystem=${SUB_SYSTEM}"`;echo ${msg};break;;
+ SUB_SYSTEM=$4
+ msg=`curl "http://${ADDR}/clientManage/rejectClientBySubSystem?subSystem=${SUB_SYSTEM}"`;echo ${msg};break;;
-x|--redirectbyip)
CLIENT_IP=$4
CLIENT_PORT=$5
@@ -67,11 +66,10 @@ do
DEST_PROXY_PORT=$6
msg=`curl "http://${ADDR}/clientManage/redirectClientByPath?path=${CLIENT_PATH}&destProxyIp=${DEST_PROXY_IP}&destProxyPort=${DEST_PROXY_PORT}"`;echo ${msg};break;;
-r|--redirect)
- DCN=$4
- SUB_SYSTEM=$5
- DEST_PROXY_IP=$6
- DEST_PROXY_PORT=$7
- msg=`curl "http://${ADDR}/clientManage/redirectClientBySubSystem?dcn=${DCN}&subSystem=${SUB_SYSTEM}&destProxyIp=${DEST_PROXY_IP}&destProxyPort=${DEST_PROXY_PORT}"`;echo ${msg};break;;
+ SUB_SYSTEM=$4
+ DEST_PROXY_IP=$5
+ DEST_PROXY_PORT=$6
+ msg=`curl "http://${ADDR}/clientManage/redirectClientBySubSystem?subSystem=${SUB_SYSTEM}&destProxyIp=${DEST_PROXY_IP}&destProxyPort=${DEST_PROXY_PORT}"`;echo ${msg};break;;
--)
shift;
break;;
diff --git a/eventmesh-runtime/scripts/session.sh b/eventmesh-runtime/scripts/session.sh
index 86042c0..3540cf8 100644
--- a/eventmesh-runtime/scripts/session.sh
+++ b/eventmesh-runtime/scripts/session.sh
@@ -24,7 +24,6 @@ then
TOPIC=$1
curl -s "http://127.0.0.1:10106/clientManage/showListenClientByTopic?topic=${TOPIC}"
else
- CLIENT_DCN=$1
- CLIENT_SYSTEM=$2
- curl -s "http://127.0.0.1:10106/clientManage/showClientBySystemAndDcn?dcn=${CLIENT_DCN}&subSystem=${CLIENT_SYSTEM}"
+ CLIENT_SYSTEM=$1
+ curl -s "http://127.0.0.1:10106/clientManage/showClientBySystem?subSystem=${CLIENT_SYSTEM}"
fi
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
index 0544373..573eb02 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java
@@ -28,7 +28,7 @@ import org.apache.eventmesh.runtime.admin.handler.RedirectClientBySubSystemHandl
import org.apache.eventmesh.runtime.admin.handler.RejectAllClientHandler;
import org.apache.eventmesh.runtime.admin.handler.RejectClientByIpPortHandler;
import org.apache.eventmesh.runtime.admin.handler.RejectClientBySubSystemHandler;
-import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemAndDcnHandler;
+import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemHandler;
import org.apache.eventmesh.runtime.admin.handler.ShowClientHandler;
import org.apache.eventmesh.runtime.admin.handler.ShowListenClientByTopicHandler;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
@@ -49,14 +49,13 @@ public class ClientManageController {
int port = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerAdminPort;
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext("/clientManage/showClient", new ShowClientHandler(eventMeshTCPServer));
- server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler(eventMeshTCPServer));
+ server.createContext("/clientManage/showClientBySystem", new ShowClientBySystemHandler(eventMeshTCPServer));
server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler(eventMeshTCPServer));
server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler(eventMeshTCPServer));
server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler(eventMeshTCPServer));
server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler(eventMeshTCPServer));
server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler(eventMeshTCPServer));
server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler(eventMeshTCPServer));
-// server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler(eventMeshTCPServer));
server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler(eventMeshTCPServer));
server.start();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java
deleted file mode 100644
index cdc086e..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.runtime.admin.handler;
-
-import com.sun.net.httpserver.HttpExchange;
-import com.sun.net.httpserver.HttpHandler;
-import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class EventMeshMsgDownStreamHandler implements HttpHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(EventMeshMsgDownStreamHandler.class);
-
- private final EventMeshTCPServer eventMeshTCPServer;
-
- public EventMeshMsgDownStreamHandler(EventMeshTCPServer eventMeshTCPServer) {
- this.eventMeshTCPServer = eventMeshTCPServer;
- }
-
- @Override
- public void handle(HttpExchange httpExchange) throws IOException {
- String result = "false";
- OutputStream out = httpExchange.getResponseBody();
- try {
-// Map<String, Object> queryStringInfo = parsePostParameters(httpExchange);
-// String msgStr = (String)queryStringInfo.get("msg");
-// String groupName = (String)queryStringInfo.get("group");
-// logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr);
-// if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) {
-// logger.warn("msg or groupName is null");
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-// return;
-// }
-// MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class);
-// String topic = messageExt.getTopic();
-//
-// if (!EventMeshUtil.isValidRMBTopic(topic)) {
-// logger.warn("msg topic is illegal");
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-// return;
-// }
-//
-// DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy();
-// Set<Session> groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions();
-// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
-//
-// if(session == null){
-// logger.error("DownStream msg,retry other eventMesh found no session again");
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-// return;
-// }
-//
-// DownStreamMsgContext downStreamMsgContext =
-// new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true);
-// eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
-//
-// if (session.isCanDownStream()) {
-// session.downstreamMsg(downStreamMsgContext);
-// httpExchange.sendResponseHeaders(200, 0);
-// result = "true";
-// out.write(result.getBytes());
-// return;
-// }
-//
-// logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq);
-// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-// downStreamMsgContext.delay(delayTime);
-// eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
-// result = "true";
-// httpExchange.sendResponseHeaders(200, 0);
-// out.write(result.getBytes());
-
- } catch (Exception e) {
- logger.error("EventMeshMsgDownStreamHandler handle fail...", e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- logger.warn("out close failed...", e);
- }
- }
- }
- }
-
- private Map<String, Object> parsePostParameters(HttpExchange exchange)
- throws IOException {
- Map<String, Object> parameters = new HashMap<>();
- if ("post".equalsIgnoreCase(exchange.getRequestMethod())) {
- InputStreamReader isr =
- new InputStreamReader(exchange.getRequestBody(), "utf-8");
- BufferedReader br = new BufferedReader(isr);
- String query = br.readLine();
- parseQuery(query, parameters);
- }
- return parameters;
- }
-
- @SuppressWarnings("unchecked")
- private void parseQuery(String query, Map<String, Object> parameters)
- throws UnsupportedEncodingException {
-
- if (query != null) {
- String pairs[] = query.split("&");
-
- for (String pair : pairs) {
- String param[] = pair.split("=");
-
- String key = null;
- String value = null;
- if (param.length > 0) {
- key = URLDecoder.decode(param[0], "UTF-8");
- }
-
- if (param.length > 1) {
- value = URLDecoder.decode(param[1], "UTF-8");
- }
-
- if (parameters.containsKey(key)) {
- Object obj = parameters.get(key);
- if (obj instanceof List<?>) {
- List<String> values = (List<String>) obj;
- values.add(value);
- } else if (obj instanceof String) {
- List<String> values = new ArrayList<String>();
- values.add((String) obj);
- values.add(value);
- parameters.put(key, values);
- }
- } else {
- parameters.put(key, value);
- }
- }
- }
- }
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
index fa03e72..400c8c0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java
@@ -55,12 +55,11 @@ public class RedirectClientBySubSystemHandler implements HttpHandler {
try {
String queryString = httpExchange.getRequestURI().getQuery();
Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
- String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP);
String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT);
- if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem)
+ if (!StringUtils.isNumeric(subSystem)
|| StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort)
|| !StringUtils.isNumeric(destEventMeshPort)) {
httpExchange.sendResponseHeaders(200, 0);
@@ -68,14 +67,14 @@ public class RedirectClientBySubSystemHandler implements HttpHandler {
out.write(result.getBytes());
return;
}
- logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort);
+ logger.info("redirectClientBySubSystem in admin,subsys:{},destIp:{},destPort:{}====================", subSystem, destEventMeshIp, destEventMeshPort);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
String redirectResult = "";
try {
if (!sessionMap.isEmpty()) {
for (Session session : sessionMap.values()) {
- if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+ if (session.getClient().getSubsystem().equals(subSystem)) {
redirectResult += "|";
redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort),
session, clientSessionGroupMapping);
@@ -83,19 +82,19 @@ public class RedirectClientBySubSystemHandler implements HttpHandler {
}
}
} catch (Exception e) {
- logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" +
- "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e);
- result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " +
+ logger.error("clientManage|redirectClientBySubSystem|fail|subSystem={}|destEventMeshIp" +
+ "={}|destEventMeshPort={},errMsg={}", subSystem, destEventMeshIp, destEventMeshPort, e);
+ result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {subSystem=%s " +
"destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s",
- sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e
+ sessionMap.size(), subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e
.getMessage());
httpExchange.sendResponseHeaders(200, 0);
out.write(result.getBytes());
return;
}
- result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " +
+ result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {subSystem=%s " +
"destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ",
- sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult);
+ sessionMap.size(), subSystem, destEventMeshIp, destEventMeshPort, redirectResult);
httpExchange.sendResponseHeaders(200, 0);
out.write(result.getBytes());
} catch (Exception e) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
index 5b1e841..e5208c6 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java
@@ -47,6 +47,17 @@ public class RejectClientBySubSystemHandler implements HttpHandler {
this.eventMeshTCPServer = eventMeshTCPServer;
}
+ private String printClients(List<InetSocketAddress> clients) {
+ if (clients.isEmpty()) {
+ return "no session had been closed";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (InetSocketAddress addr : clients) {
+ sb.append(addr).append("|");
+ }
+ return sb.toString();
+ }
+
/**
* remove c client by dcn and susysId
* @param httpExchange
@@ -59,24 +70,23 @@ public class RejectClientBySubSystemHandler implements HttpHandler {
try {
String queryString = httpExchange.getRequestURI().getQuery();
Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
- String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
- if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) {
+ if (StringUtils.isBlank(subSystem)) {
httpExchange.sendResponseHeaders(200, 0);
result = "params illegal!";
out.write(result.getBytes());
return;
}
- logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn);
+ logger.info("rejectClientBySubSystem in admin,subsys:{}====================", subSystem);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
- final List<InetSocketAddress> successRemoteAddrs = new ArrayList<>();
+ final List<InetSocketAddress> successRemoteAddrs = new ArrayList<InetSocketAddress>();
try {
if (!sessionMap.isEmpty()) {
for (Session session : sessionMap.values()) {
- if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+ if (session.getClient().getSubsystem().equals(subSystem)) {
InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping);
if (addr != null) {
successRemoteAddrs.add(addr);
@@ -85,16 +95,15 @@ public class RejectClientBySubSystemHandler implements HttpHandler {
}
}
} catch (Exception e) {
- logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e);
- result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
- "port=%s}, errorMsg : %s", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn,
- subSystem, e.getMessage());
+ logger.error("clientManage|rejectClientBySubSystem|fail|subSystemId={},errMsg={}", subSystem, e);
+ result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%d} , {" +
+ "subSystemId=%s}, errorMsg : %s", sessionMap.size(), printClients(successRemoteAddrs), subSystem, e.getMessage());
httpExchange.sendResponseHeaders(200, 0);
out.write(result.getBytes());
return;
}
- result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " +
- "port=%s}", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn, subSystem);
+ result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {" +
+ "subSystemId=%s}", sessionMap.size(), printClients(successRemoteAddrs), subSystem);
httpExchange.sendResponseHeaders(200, 0);
out.write(result.getBytes());
} catch (Exception e) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java
similarity index 84%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java
rename to eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java
index 9ccd547..07cbbb2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemHandler.java
@@ -34,18 +34,18 @@ import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class ShowClientBySystemAndDcnHandler implements HttpHandler {
+public class ShowClientBySystemHandler implements HttpHandler {
- private static final Logger logger = LoggerFactory.getLogger(ShowClientBySystemAndDcnHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(ShowClientBySystemHandler.class);
private final EventMeshTCPServer eventMeshTCPServer;
- public ShowClientBySystemAndDcnHandler(EventMeshTCPServer eventMeshTCPServer) {
+ public ShowClientBySystemHandler(EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
}
/**
- * print clientInfo by subsys and dcn
+ * print clientInfo by subsys
*
* @param httpExchange
* @throws IOException
@@ -57,16 +57,15 @@ public class ShowClientBySystemAndDcnHandler implements HttpHandler {
try {
String queryString = httpExchange.getRequestURI().getQuery();
Map<String, String> queryStringInfo = NetUtils.formData2Dic(queryString);
- String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN);
String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM);
String newLine = System.getProperty("line.separator");
- logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn);
+ logger.info("showClientBySubsys,subsys:{}=================", subSystem);
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
ConcurrentHashMap<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
if (!sessionMap.isEmpty()) {
for (Session session : sessionMap.values()) {
- if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) {
+ if (session.getClient().getSubsystem().equals(subSystem)) {
UserAgent userAgent = session.getClient();
result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent
.getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine;
@@ -76,7 +75,7 @@ public class ShowClientBySystemAndDcnHandler implements HttpHandler {
httpExchange.sendResponseHeaders(200, 0);
out.write(result.getBytes());
} catch (Exception e) {
- logger.error("ShowClientBySystemAndDcnHandler fail...", e);
+ logger.error("ShowClientBySystemAndHandler fail...", e);
} finally {
if (out != null) {
try {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
index 314b2e5..64d5c5d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java
@@ -19,18 +19,18 @@ package org.apache.eventmesh.runtime.admin.handler;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping;
+import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
+import java.net.InetSocketAddress;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -54,18 +54,26 @@ public class ShowClientHandler implements HttpHandler {
String newLine = System.getProperty("line.separator");
logger.info("showAllClient=================");
ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping();
- Map<String, AtomicInteger> dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo();
- if (!dcnSystemMap.isEmpty()) {
- List<Map.Entry<String, AtomicInteger>> list = new ArrayList<>();
- for (Map.Entry<String, AtomicInteger> entry : dcnSystemMap.entrySet()) {
- list.add(entry);
+
+ HashMap<String, AtomicInteger> statMap = new HashMap<String, AtomicInteger>();
+
+ Map<InetSocketAddress, Session> sessionMap = clientSessionGroupMapping.getSessionMap();
+ if (!sessionMap.isEmpty()) {
+ for (Session session : sessionMap.values()) {
+ String key = session.getClient().getSubsystem();
+ if (!statMap.containsKey(key)) {
+ statMap.put(key, new AtomicInteger(1));
+ } else {
+ statMap.get(key).incrementAndGet();
+ }
}
- Collections.sort(list, Comparator.comparingInt(x -> x.getValue().intValue()));
- for (Map.Entry<String, AtomicInteger> entry : list) {
+
+ for (Map.Entry<String, AtomicInteger> entry : statMap.entrySet()) {
result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) +
newLine;
}
}
+
httpExchange.sendResponseHeaders(200, 0);
out.write(result.getBytes());
} catch (Exception e) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
index 62d76dd..6d8cbe4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java
@@ -65,7 +65,7 @@ public class ShowListenClientByTopicHandler implements HttpHandler {
for (ClientGroupWrapper cgw : clientGroupMap.values()) {
Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
if (listenSessionSet != null && listenSessionSet.size() > 0) {
- result += String.format("group:%s", cgw.getGroupName()) + newLine;
+ result += String.format("group:%s", cgw.getConsumerGroup()) + newLine;
for (Session session : listenSessionSet) {
UserAgent userAgent = session.getClient();
result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java
index d498305..6a5d35d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java
@@ -27,12 +27,6 @@ public class EventMeshConstants {
public static final String PROTOCOL_TCP = "tcp";
- public static final String BROADCAST_PREFIX = "broadcast-";
-
- public final static String CONSUMER_GROUP_NAME_PREFIX = "ConsumerGroup-";
-
- public final static String PRODUCER_GROUP_NAME_PREFIX = "ProducerGroup-";
-
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));
@@ -85,7 +79,6 @@ public class EventMeshConstants {
public static final String TAG = "TAG";
- public static final String MANAGE_DCN = "dcn";
public static final String MANAGE_SUBSYSTEM = "subSystem";
public static final String MANAGE_IP = "ip";
public static final String MANAGE_PORT = "port";
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index e39afb2..8a1049b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -84,7 +84,6 @@ public class EventMeshConsumer {
keyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup());
keyValue.put("eventMeshIDC", eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster));
persistentMqConsumer.init(keyValue);
@@ -94,7 +93,6 @@ public class EventMeshConsumer {
broadcastKeyValue.put("consumerGroup", consumerGroupConf.getConsumerGroup());
broadcastKeyValue.put("eventMeshIDC", eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
broadcastKeyValue.put("instanceName", EventMeshUtil.buildMeshClientID(consumerGroupConf.getConsumerGroup(),
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster));
broadcastMqConsumer.init(broadcastKeyValue);
inited4Persistent.compareAndSet(false, true);
@@ -103,7 +101,6 @@ public class EventMeshConsumer {
}
public synchronized void start() throws Exception {
-
persistentMqConsumer.start();
started4Persistent.compareAndSet(false, true);
broadcastMqConsumer.start();
@@ -255,8 +252,7 @@ public class EventMeshConsumer {
public void sendMessageBack(final Message msgBack, final String uniqueId, String bizSeqNo) throws Exception {
EventMeshProducer sendMessageBack
- = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(EventMeshConstants.PRODUCER_GROUP_NAME_PREFIX
- + consumerGroupConf.getConsumerGroup());
+ = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(consumerGroupConf.getConsumerGroup());
if (sendMessageBack == null) {
logger.warn("consumer:{} consume fail, sendMessageBack, bizSeqNo:{}, uniqueId:{}", consumerGroupConf.getConsumerGroup(), bizSeqNo, uniqueId);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 156cac2..df3b2af 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -78,8 +78,7 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
SendMessageBatchResponseHeader sendMessageBatchResponseHeader =
SendMessageBatchResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
if (StringUtils.isBlank(sendMessageBatchRequestHeader.getPid())
|| !StringUtils.isNumeric(sendMessageBatchRequestHeader.getPid())
@@ -93,6 +92,7 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
if (CollectionUtils.isEmpty(sendMessageBatchRequestBody.getContents())
|| StringUtils.isBlank(sendMessageBatchRequestBody.getBatchId())
+ || StringUtils.isBlank(sendMessageBatchRequestBody.getProducerGroup())
|| (Integer.valueOf(sendMessageBatchRequestBody.getSize()) != CollectionUtils.size(sendMessageBatchRequestBody.getContents()))) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchResponseHeader,
@@ -112,11 +112,8 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
return;
}
- if (StringUtils.isBlank(sendMessageBatchRequestHeader.getDcn())) {
- sendMessageBatchRequestHeader.setDcn("BATCH");
- }
- String producerGroup = EventMeshUtil.buildClientGroup(sendMessageBatchRequestHeader.getSys(),
- sendMessageBatchRequestHeader.getDcn());
+
+ String producerGroup = sendMessageBatchRequestBody.getProducerGroup();
EventMeshProducer batchEventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 5dff408..8d5f7b5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -73,8 +73,7 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
SendMessageBatchV2ResponseHeader sendMessageBatchV2ResponseHeader =
SendMessageBatchV2ResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
if (StringUtils.isBlank(sendMessageBatchV2RequestHeader.getPid())
|| !StringUtils.isNumeric(sendMessageBatchV2RequestHeader.getPid())
@@ -88,6 +87,7 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
if (StringUtils.isBlank(sendMessageBatchV2RequestBody.getBizSeqNo())
|| StringUtils.isBlank(sendMessageBatchV2RequestBody.getTopic())
+ || StringUtils.isBlank(sendMessageBatchV2RequestBody.getProducerGroup())
|| StringUtils.isBlank(sendMessageBatchV2RequestBody.getMsg())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchV2ResponseHeader,
@@ -107,11 +107,7 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
return;
}
- if (StringUtils.isBlank(sendMessageBatchV2RequestHeader.getDcn())) {
- sendMessageBatchV2RequestHeader.setDcn("BATCH");
- }
- String producerGroup = EventMeshUtil.buildClientGroup(sendMessageBatchV2RequestHeader.getSys(),
- sendMessageBatchV2RequestHeader.getDcn());
+ String producerGroup = sendMessageBatchV2RequestBody.getProducerGroup();
EventMeshProducer batchEventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
// batchEventMeshProducer.getMqProducerWrapper().getDefaultMQProducer().setRetryTimesWhenSendAsyncFailed(0);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index febfa91..9e4c41e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -68,13 +68,11 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
HeartbeatResponseHeader heartbeatResponseHeader =
HeartbeatResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(heartbeatRequestHeader.getIdc())
- || StringUtils.isBlank(heartbeatRequestHeader.getDcn())
|| StringUtils.isBlank(heartbeatRequestHeader.getPid())
|| !StringUtils.isNumeric(heartbeatRequestHeader.getPid())
|| StringUtils.isBlank(heartbeatRequestHeader.getSys())) {
@@ -87,6 +85,7 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
//validate body
if (StringUtils.isBlank(heartbeatRequestBody.getClientType())
+ || StringUtils.isBlank(heartbeatRequestBody.getConsumerGroup())
|| CollectionUtils.isEmpty(heartbeatRequestBody.getHeartbeatEntities())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
@@ -97,20 +96,17 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
}
ConcurrentHashMap<String, List<Client>> tmp = new ConcurrentHashMap<>();
String env = heartbeatRequestHeader.getEnv();
- String dcn = heartbeatRequestHeader.getDcn();
String idc = heartbeatRequestHeader.getIdc();
String sys = heartbeatRequestHeader.getSys();
String ip = heartbeatRequestHeader.getIp();
String pid = heartbeatRequestHeader.getPid();
- String consumerGroup = EventMeshUtil.buildClientGroup(heartbeatRequestHeader.getSys(),
- heartbeatRequestHeader.getDcn());
+ String consumerGroup = heartbeatRequestBody.getConsumerGroup();
List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = heartbeatRequestBody.getHeartbeatEntities();
for (HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : heartbeatEntities) {
String topic = heartbeatEntity.topic;
String url = heartbeatEntity.url;
Client client = new Client();
client.env = env;
- client.dcn = dcn;
client.idc = idc;
client.sys = sys;
client.ip = ip;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index 3c461ed..5d8745b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -78,12 +78,10 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
ReplyMessageResponseHeader replyMessageResponseHeader =
ReplyMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//HEADER校验
if (StringUtils.isBlank(replyMessageRequestHeader.getIdc())
- || StringUtils.isBlank(replyMessageRequestHeader.getDcn())
|| StringUtils.isBlank(replyMessageRequestHeader.getPid())
|| !StringUtils.isNumeric(replyMessageRequestHeader.getPid())
|| StringUtils.isBlank(replyMessageRequestHeader.getSys())) {
@@ -97,6 +95,7 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
//validate body
if (StringUtils.isBlank(replyMessageRequestBody.getBizSeqNo())
|| StringUtils.isBlank(replyMessageRequestBody.getUniqueId())
+ || StringUtils.isBlank(replyMessageRequestBody.getProducerGroup())
|| StringUtils.isBlank(replyMessageRequestBody.getContent())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
replyMessageResponseHeader,
@@ -105,8 +104,7 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
return;
}
- String producerGroup = EventMeshUtil.buildClientGroup(replyMessageRequestHeader.getSys(),
- replyMessageRequestHeader.getDcn());
+ String producerGroup = replyMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index 8fd2fad..cd3d4be 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -75,12 +75,10 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(sendMessageRequestHeader.getIdc())
- || StringUtils.isBlank(sendMessageRequestHeader.getDcn())
|| StringUtils.isBlank(sendMessageRequestHeader.getPid())
|| !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
|| StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
@@ -94,6 +92,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
//validate body
if (StringUtils.isBlank(sendMessageRequestBody.getBizSeqNo())
|| StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
+ || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
|| StringUtils.isBlank(sendMessageRequestBody.getTopic())
|| StringUtils.isBlank(sendMessageRequestBody.getContent())
|| (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
@@ -105,8 +104,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
return;
}
- String producerGroup = EventMeshUtil.buildClientGroup(sendMessageRequestHeader.getSys(),
- sendMessageRequestHeader.getDcn());
+ String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index d2dacdb..5ee12d3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -79,11 +79,9 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
if (StringUtils.isBlank(sendMessageRequestHeader.getIdc())
- || StringUtils.isBlank(sendMessageRequestHeader.getDcn())
|| StringUtils.isBlank(sendMessageRequestHeader.getPid())
|| !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
|| StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
@@ -96,6 +94,7 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
if (StringUtils.isBlank(sendMessageRequestBody.getBizSeqNo())
|| StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
+ || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
|| StringUtils.isBlank(sendMessageRequestBody.getTopic())
|| StringUtils.isBlank(sendMessageRequestBody.getContent())
|| (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
@@ -106,8 +105,7 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
return;
}
- String producerGroup = EventMeshUtil.buildClientGroup(sendMessageRequestHeader.getSys(),
- sendMessageRequestHeader.getDcn());
+ String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 2186944..e22a86c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -72,13 +72,11 @@ public class SubscribeProcessor implements HttpRequestProcessor {
SubscribeResponseHeader subscribeResponseHeader =
SubscribeResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(subscribeRequestHeader.getIdc())
- || StringUtils.isBlank(subscribeRequestHeader.getDcn())
|| StringUtils.isBlank(subscribeRequestHeader.getPid())
|| !StringUtils.isNumeric(subscribeRequestHeader.getPid())
|| StringUtils.isBlank(subscribeRequestHeader.getSys())) {
@@ -91,7 +89,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
//validate body
if (StringUtils.isBlank(subscribeRequestBody.getUrl())
- || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())) {
+ || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())
+ || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
subscribeResponseHeader,
@@ -102,8 +101,7 @@ public class SubscribeProcessor implements HttpRequestProcessor {
List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
String url = subscribeRequestBody.getUrl();
- String consumerGroup = EventMeshUtil.buildClientGroup(subscribeRequestHeader.getSys(),
- subscribeRequestHeader.getDcn());
+ String consumerGroup = subscribeRequestBody.getConsumerGroup();
synchronized (eventMeshHTTPServer.localClientInfoMapping) {
@@ -213,7 +211,6 @@ public class SubscribeProcessor implements HttpRequestProcessor {
for(SubscriptionItem item: subscriptionItems) {
Client client = new Client();
client.env = subscribeRequestHeader.getEnv();
- client.dcn = subscribeRequestHeader.getDcn();
client.idc = subscribeRequestHeader.getIdc();
client.sys = subscribeRequestHeader.getSys();
client.ip = subscribeRequestHeader.getIp();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index b723054..d457812 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -72,13 +72,11 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
UnSubscribeResponseHeader unSubscribeResponseHeader =
UnSubscribeResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshRegion,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshDCN, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(unSubscribeRequestHeader.getIdc())
- || StringUtils.isBlank(unSubscribeRequestHeader.getDcn())
|| StringUtils.isBlank(unSubscribeRequestHeader.getPid())
|| !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())
|| StringUtils.isBlank(unSubscribeRequestHeader.getSys())) {
@@ -91,7 +89,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
//validate body
if (StringUtils.isBlank(unSubscribeRequestBody.getUrl())
- || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())) {
+ || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())
+ || StringUtils.isBlank(unSubscribeRequestBody.getConsumerGroup())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
unSubscribeResponseHeader,
@@ -100,13 +99,11 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
return;
}
String env = unSubscribeRequestHeader.getEnv();
- String dcn = unSubscribeRequestHeader.getDcn();
String idc = unSubscribeRequestHeader.getIdc();
String sys = unSubscribeRequestHeader.getSys();
String ip = unSubscribeRequestHeader.getIp();
String pid = unSubscribeRequestHeader.getPid();
- String consumerGroup = EventMeshUtil.buildClientGroup(unSubscribeRequestHeader.getSys(),
- unSubscribeRequestHeader.getDcn());
+ String consumerGroup = unSubscribeRequestBody.getConsumerGroup();
String unSubscribeUrl = unSubscribeRequestBody.getUrl();
List<String> unSubTopicList = unSubscribeRequestBody.getTopics();
@@ -244,7 +241,6 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
for(String topic: topicList) {
Client client = new Client();
client.env = unSubscribeRequestHeader.getEnv();
- client.dcn = unSubscribeRequestHeader.getDcn();
client.idc = unSubscribeRequestHeader.getIdc();
client.sys = unSubscribeRequestHeader.getSys();
client.ip = unSubscribeRequestHeader.getIp();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
index 4387509..1b834d9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
@@ -28,8 +28,6 @@ public class Client {
public String env;
- public String dcn;
-
public String idc;
public String consumerGroup;
@@ -64,7 +62,6 @@ public class Client {
client.topic = StringUtils.trim(jsonObject.getString("topic"));
client.url = StringUtils.trim(jsonObject.getString("url"));
client.sys = StringUtils.trim(jsonObject.getString("sys"));
- client.dcn = StringUtils.trim(jsonObject.getString("dcn"));
client.idc = StringUtils.trim(jsonObject.getString("idc"));
client.ip = StringUtils.trim(jsonObject.getString("ip"));
client.pid = StringUtils.trim(jsonObject.getString("pid"));
@@ -79,7 +76,6 @@ public class Client {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("endPoint={env=").append(env)
- .append(",dcn=").append(dcn)
.append(",idc=").append(idc)
.append(",consumerGroup=").append(consumerGroup)
.append(",topic=").append(topic)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index c5806da..cf41ca2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -81,8 +81,7 @@ public class EventMeshProducer {
Properties keyValue = new Properties();
keyValue.put("producerGroup", producerGroupConfig.getGroupName());
- keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(producerGroupConfig.getGroupName(),
- eventMeshHttpConfiguration.eventMeshRegion, eventMeshHttpConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil.buildMeshClientID(producerGroupConfig.getGroupName(), eventMeshHttpConfiguration.eventMeshCluster));
//TODO for defibus
keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 831296a..3ab5649 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -100,9 +100,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshCluster);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtil.getLocalAddress());
- builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHDCN, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshDCN);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
- builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHREGION, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshRegion);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
handleMsgContext.getMsg().getUserProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 2c3a46f..a297935 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -65,11 +65,12 @@ public class ClientGroupWrapper {
public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
- private String groupName;
+ private String producerGroup;
- private String sysId;
+ private String consumerGroup;
- private String dcn;
+ //can be sysid + ext(eg dcn)
+ private String sysId;
private EventMeshTCPConfiguration eventMeshTCPConfiguration;
@@ -103,16 +104,16 @@ public class ClientGroupWrapper {
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
- public ClientGroupWrapper(String sysId, String dcn,
+ public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
this.sysId = sysId;
- this.dcn = dcn;
+ this.producerGroup = producerGroup;
+ this.consumerGroup = consumerGroup;
this.eventMeshTCPServer = eventMeshTCPServer;
this.eventMeshTCPConfiguration = eventMeshTCPServer.getEventMeshTCPConfiguration();
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
- this.groupName = EventMeshUtil.buildClientGroup(sysId, dcn);
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
}
@@ -169,8 +170,7 @@ public class ClientGroupWrapper {
}
public boolean addSubscription(String topic, Session session) throws Exception {
- if (session == null
- || !StringUtils.equalsIgnoreCase(groupName, EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn()))) {
+ if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("addSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -184,9 +184,9 @@ public class ClientGroupWrapper {
}
r = topic2sessionInGroupMapping.get(topic).add(session);
if (r) {
- logger.info("addSubscription success, group:{} topic:{} client:{}", groupName, topic, session.getClient());
+ logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
} else {
- logger.warn("addSubscription fail, group:{} topic:{} client:{}", groupName, topic, session.getClient());
+ logger.warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
}
} catch (Exception e) {
logger.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
@@ -199,7 +199,7 @@ public class ClientGroupWrapper {
public boolean removeSubscription(String topic, Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(groupName, EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn()))) {
+ || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("removeSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -210,14 +210,14 @@ public class ClientGroupWrapper {
if (topic2sessionInGroupMapping.containsKey(topic)) {
r = topic2sessionInGroupMapping.get(topic).remove(session);
if (r) {
- logger.info("removeSubscription remove session success, group:{} topic:{} client:{}", groupName, topic, session.getClient());
+ logger.info("removeSubscription remove session success, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
} else {
- logger.warn("removeSubscription remove session failed, group:{} topic:{} client:{}", groupName, topic, session.getClient());
+ logger.warn("removeSubscription remove session failed, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
}
}
if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
topic2sessionInGroupMapping.remove(topic);
- logger.info("removeSubscription remove topic success, group:{} topic:{}", groupName, topic);
+ logger.info("removeSubscription remove topic success, group:{} topic:{}", consumerGroup, topic);
}
} catch (Exception e) {
logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(), e);
@@ -234,8 +234,8 @@ public class ClientGroupWrapper {
Properties keyValue = new Properties();
// KeyValue keyValue = OMS.newKeyValue();
- keyValue.put("producerGroup", groupName);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, dcn, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("producerGroup", producerGroup);
+ keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
//TODO for defibus
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
@@ -243,7 +243,7 @@ public class ClientGroupWrapper {
mqProducerWrapper.init(keyValue);
mqProducerWrapper.start();
producerStarted.compareAndSet(false, true);
- logger.info("starting producer success, group:{}", groupName);
+ logger.info("starting producer success, group:{}", producerGroup);
}
public synchronized void shutdownProducer() throws Exception {
@@ -252,17 +252,28 @@ public class ClientGroupWrapper {
}
mqProducerWrapper.shutdown();
producerStarted.compareAndSet(true, false);
- logger.info("shutdown producer success for group:{}", groupName);
+ logger.info("shutdown producer success for group:{}", producerGroup);
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
}
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
- public String getGroupName() {
- return groupName;
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
}
public boolean addGroupConsumerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(groupName, EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn()))) {
+ || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("addGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -272,10 +283,10 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.add(session);
if (r) {
- logger.info("addGroupConsumerSession success, group:{} client:{}", groupName, session.getClient());
+ logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup, session.getClient());
}
} catch (Exception e) {
- logger.error("addGroupConsumerSession error! group:{} client:{}", groupName, session.getClient(), e);
+ logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup, session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -284,7 +295,7 @@ public class ClientGroupWrapper {
public boolean addGroupProducerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(groupName, EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn()))) {
+ || !StringUtils.equalsIgnoreCase(producerGroup, EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
logger.error("addGroupProducerSession param error,session:{}", session);
return false;
}
@@ -294,10 +305,10 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.add(session);
if (r) {
- logger.info("addGroupProducerSession success, group:{} client:{}", groupName, session.getClient());
+ logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup, session.getClient());
}
} catch (Exception e) {
- logger.error("addGroupProducerSession error! group:{} client:{}", groupName, session.getClient(), e);
+ logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup, session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -306,7 +317,7 @@ public class ClientGroupWrapper {
public boolean removeGroupConsumerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(groupName, EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn()))) {
+ || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("removeGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -316,10 +327,10 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.remove(session);
if (r) {
- logger.info("removeGroupConsumerSession success, group:{} client:{}", groupName, session.getClient());
+ logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup, session.getClient());
}
} catch (Exception e) {
- logger.error("removeGroupConsumerSession error! group:{} client:{}", groupName, session.getClient(), e);
+ logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup, session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -328,7 +339,7 @@ public class ClientGroupWrapper {
public boolean removeGroupProducerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(groupName, EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn()))) {
+ || !StringUtils.equalsIgnoreCase(producerGroup, EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
logger.error("removeGroupProducerSession param error,session:{}", session);
return false;
}
@@ -338,10 +349,10 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.remove(session);
if (r) {
- logger.info("removeGroupProducerSession success, group:{} client:{}", groupName, session.getClient());
+ logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup, session.getClient());
}
} catch (Exception e) {
- logger.error("removeGroupProducerSession error! group:{} client:{}", groupName, session.getClient(), e);
+ logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup, session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -356,9 +367,9 @@ public class ClientGroupWrapper {
Properties keyValue = new Properties();
keyValue.put("isBroadcast", "false");
- keyValue.put("consumerGroup", groupName);
+ keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, dcn, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
persistentMsgConsumer.init(keyValue);
// persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
@@ -426,7 +437,7 @@ public class ClientGroupWrapper {
// }
// });
inited4Persistent.compareAndSet(false, true);
- logger.info("init persistentMsgConsumer success, group:{}", groupName);
+ logger.info("init persistentMsgConsumer success, group:{}", consumerGroup);
}
public synchronized void startClientGroupPersistentConsumer() throws Exception {
@@ -435,7 +446,7 @@ public class ClientGroupWrapper {
}
persistentMsgConsumer.start();
started4Persistent.compareAndSet(false, true);
- logger.info("starting persistentMsgConsumer success, group:{}", groupName);
+ logger.info("starting persistentMsgConsumer success, group:{}", consumerGroup);
}
public synchronized void initClientGroupBroadcastConsumer() throws Exception {
@@ -445,9 +456,9 @@ public class ClientGroupWrapper {
Properties keyValue = new Properties();
keyValue.put("isBroadcast", "true");
- keyValue.put("consumerGroup", groupName);
+ keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, dcn, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId,"SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
// broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
// @Override
@@ -501,7 +512,7 @@ public class ClientGroupWrapper {
// }
// });
inited4Broadcast.compareAndSet(false, true);
- logger.info("init broadCastMsgConsumer success, group:{}", groupName);
+ logger.info("init broadCastMsgConsumer success, group:{}", consumerGroup);
}
public synchronized void startClientGroupBroadcastConsumer() throws Exception {
@@ -510,7 +521,7 @@ public class ClientGroupWrapper {
}
broadCastMsgConsumer.start();
started4Broadcast.compareAndSet(false, true);
- logger.info("starting broadCastMsgConsumer success, group:{}", groupName);
+ logger.info("starting broadCastMsgConsumer success, group:{}", consumerGroup);
}
public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
@@ -574,7 +585,7 @@ public class ClientGroupWrapper {
message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
- Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
+ Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
if (session == null) {
try {
@@ -587,10 +598,10 @@ public class ClientGroupWrapper {
sendBackFromEventMeshIp = message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
}
- logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", groupName, topic, bizSeqNo, sendBackTimes, sendBackFromEventMeshIp);
+ logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", consumerGroup, topic, bizSeqNo, sendBackTimes, sendBackFromEventMeshIp);
if (sendBackTimes >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
- logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes, groupName, topic, bizSeqNo);
+ logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes, consumerGroup, topic, bizSeqNo);
} else {
sendBackTimes++;
message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, sendBackTimes.toString());
@@ -632,7 +643,7 @@ public class ClientGroupWrapper {
public synchronized void shutdownBroadCastConsumer() throws Exception {
if (started4Broadcast.get()) {
broadCastMsgConsumer.shutdown();
- logger.info("broadcast consumer group:{} shutdown...", groupName);
+ logger.info("broadcast consumer group:{} shutdown...", consumerGroup);
}
started4Broadcast.compareAndSet(true, false);
inited4Broadcast.compareAndSet(true, false);
@@ -643,7 +654,7 @@ public class ClientGroupWrapper {
if (started4Persistent.get()) {
persistentMsgConsumer.shutdown();
- logger.info("persistent consumer group:{} shutdown...", groupName);
+ logger.info("persistent consumer group:{} shutdown...", consumerGroup);
}
started4Persistent.compareAndSet(true, false);
inited4Persistent.compareAndSet(true, false);
@@ -658,10 +669,6 @@ public class ClientGroupWrapper {
return groupProducerSessions;
}
- public void setGroupName(String groupName) {
- this.groupName = groupName;
- }
-
public EventMeshTCPConfiguration getEventMeshTCPConfiguration() {
return eventMeshTCPConfiguration;
}
@@ -709,7 +716,7 @@ public class ClientGroupWrapper {
paramValues.add("msg");
paramValues.add(JSON.toJSONString(msg));
paramValues.add("group");
- paramValues.add(groupName);
+ paramValues.add(consumerGroup);
result = HttpTinyClient.httpPost(
targetUrl.toString(),
@@ -743,12 +750,12 @@ public class ClientGroupWrapper {
send(new UpStreamMsgContext(null, null, msg), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
- logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", groupName, bizSeqNo, topic);
+ logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
}
@Override
public void onException(OnExceptionContext context) {
- logger.warn("consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", groupName, bizSeqNo, topic);
+ logger.warn("consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
}
// @Override
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index c4cd170..a5a4b40 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -57,9 +56,9 @@ public class ClientSessionGroupMapping {
private ConcurrentHashMap<InetSocketAddress, Session> sessionTable = new ConcurrentHashMap<>();
- private ConcurrentHashMap<String /** groupName*/, ClientGroupWrapper> clientGroupMap = new ConcurrentHashMap<String, ClientGroupWrapper>();
+ private ConcurrentHashMap<String /** subsystem eg . 5109 or 5109-1A0 */, ClientGroupWrapper> clientGroupMap = new ConcurrentHashMap<String, ClientGroupWrapper>();
- private ConcurrentHashMap<String /** groupName*/, Object> lockMap = new ConcurrentHashMap<String, Object>();
+ private ConcurrentHashMap<String /** subsystem eg . 5109 or 5109-1A0 */, Object> lockMap = new ConcurrentHashMap<String, Object>();
private EventMeshTCPServer eventMeshTCPServer;
@@ -75,8 +74,8 @@ public class ClientSessionGroupMapping {
this.eventMeshTCPServer = eventMeshTCPServer;
}
- public ClientGroupWrapper getClientGroupWrapper(String groupName) {
- return MapUtils.getObject(clientGroupMap, groupName, null);
+ public ClientGroupWrapper getClientGroupWrapper(String sysId) {
+ return MapUtils.getObject(clientGroupMap, sysId, null);
}
public Session getSession(ChannelHandlerContext ctx) {
@@ -177,30 +176,28 @@ public class ClientSessionGroupMapping {
}
}
- private ClientGroupWrapper constructClientGroupWrapper(String sysId, String dcn,
+ private ClientGroupWrapper constructClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
- return new ClientGroupWrapper(sysId, dcn
- , eventMeshTCPServer, downstreamDispatchStrategy);
+ return new ClientGroupWrapper(sysId, producerGroup, consumerGroup, eventMeshTCPServer, downstreamDispatchStrategy);
}
private void initClientGroupWrapper(UserAgent user, Session session) throws Exception {
- final String clientGroup = EventMeshUtil.buildClientGroup(user.getSubsystem(), user.getDcn());
- if (!lockMap.containsKey(clientGroup)) {
- Object obj = lockMap.putIfAbsent(clientGroup, new Object());
+ if (!lockMap.containsKey(user.getSubsystem())) {
+ Object obj = lockMap.putIfAbsent(user.getSubsystem(), new Object());
if (obj == null) {
- logger.info("add lock to map for group:{}", clientGroup);
+ logger.info("add lock to map for subsystem:{}", user.getSubsystem());
}
}
- synchronized (lockMap.get(clientGroup)) {
- if (!clientGroupMap.containsKey(clientGroup)) {
- ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getDcn()
- , eventMeshTCPServer, new FreePriorityDispatchStrategy());
- clientGroupMap.put(clientGroup, cgw);
- logger.info("create new ClientGroupWrapper,group:{}", clientGroup);
+ synchronized (lockMap.get(user.getSubsystem())) {
+ if (!clientGroupMap.containsKey(user.getSubsystem())) {
+ ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getProducerGroup(),
+ user.getConsumerGroup(), eventMeshTCPServer, new FreePriorityDispatchStrategy());
+ clientGroupMap.put(user.getSubsystem(), cgw);
+ logger.info("create new ClientGroupWrapper, subsystem:{}", user.getSubsystem());
}
- ClientGroupWrapper cgw = clientGroupMap.get(clientGroup);
+ ClientGroupWrapper cgw = clientGroupMap.get(user.getSubsystem());
if (EventMeshConstants.PURPOSE_PUB.equals(user.getPurpose())) {
startClientGroupProducer(cgw, session);
@@ -241,11 +238,10 @@ public class ClientSessionGroupMapping {
}
private void startClientGroupConsumer(Session session) throws Exception {
- final String clientGroup = EventMeshUtil.buildClientGroup(session.getClient().getSubsystem(), session.getClient().getDcn());
- if (!lockMap.containsKey(clientGroup)) {
- lockMap.putIfAbsent(clientGroup, new Object());
+ if (!lockMap.containsKey(session.getClient().getSubsystem())) {
+ lockMap.putIfAbsent(session.getClient().getSubsystem(), new Object());
}
- synchronized (lockMap.get(clientGroup)) {
+ synchronized (lockMap.get(session.getClient().getSubsystem())) {
logger.info("readySession session[{}]", session);
ClientGroupWrapper cgw = session.getClientGroupWrapper().get();
@@ -304,7 +300,7 @@ public class ClientSessionGroupMapping {
logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt), session.getClient());
continue;
}
- Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(session.getClientGroupWrapper().get().getGroupName()
+ Session reChooseSession = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(session.getClientGroupWrapper().get().getConsumerGroup()
, downStreamMsgContext.msgExt.getTopic()
, session.getClientGroupWrapper().get().groupConsumerSessions);
if(reChooseSession != null){
@@ -329,9 +325,9 @@ public class ClientSessionGroupMapping {
&& (session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0)) {
shutdownClientGroupProducer(session);
- clientGroupMap.remove(session.getClientGroupWrapper().get().getGroupName());
- lockMap.remove(session.getClientGroupWrapper().get().getGroupName());
- logger.info("remove clientGroupWrapper group[{}]", session.getClientGroupWrapper().get().getGroupName());
+ clientGroupMap.remove(session.getClientGroupWrapper().get().getSysId());
+ lockMap.remove(session.getClientGroupWrapper().get().getSysId());
+ logger.info("remove clientGroupWrapper subsystem[{}]", session.getClientGroupWrapper().get().getSysId());
}
}
@@ -429,40 +425,6 @@ public class ClientSessionGroupMapping {
return clientGroupMap;
}
- public HashMap<String, AtomicInteger> statDCNSystemInfo() {
- HashMap<String, AtomicInteger> result = new HashMap<String, AtomicInteger>();
- if (!sessionTable.isEmpty()) {
- for (Session session : sessionTable.values()) {
- String key = session.getClient().getDcn() + "|" + session.getClient().getSubsystem();
- if (!result.containsKey(key)) {
- result.put(key, new AtomicInteger(1));
- } else {
- result.get(key).incrementAndGet();
- }
- }
- }
- return result;
- }
-
- public HashMap<String, AtomicInteger> statDCNSystemInfoByPurpose(String purpose) {
- HashMap<String, AtomicInteger> result = new HashMap<String, AtomicInteger>();
- if (!sessionTable.isEmpty()) {
- for (Session session : sessionTable.values()) {
- if (!StringUtils.equals(session.getClient().getPurpose(), purpose)) {
- continue;
- }
-
- String key = session.getClient().getDcn() + "|" + session.getClient().getSubsystem() + "|" + purpose;
- if (!result.containsKey(key)) {
- result.put(key, new AtomicInteger(1));
- } else {
- result.get(key).incrementAndGet();
- }
- }
- }
- return result;
- }
-
public Map<String, Map<String, Integer>> prepareEventMeshClientDistributionData() {
Map<String, Map<String, Integer>> result = null;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index fca8d2a..8f4e72a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -229,7 +229,7 @@ public class Session {
@Override
public String toString() {
return "Session{" +
- "group=" + clientGroupWrapper.get().getGroupName() +
+ "sysId=" + clientGroupWrapper.get().getSysId() +
",remoteAddr=" + RemotingHelper.parseSocketAddressAddr(remoteAddress) +
",client=" + client +
",sessionState=" + sessionState +
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
index b8fcf0f..57165a9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
@@ -122,7 +122,7 @@ public class EventMeshTcpRetryer {
String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
rechoosen = downStreamMsgContext.session.getClientGroupWrapper()
- .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getGroupName()
+ .get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getSysId()
, topic
, downStreamMsgContext.session.getClientGroupWrapper().get().getGroupConsumerSessions());
} else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
index 53b97ec..161b1d7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
@@ -101,5 +101,15 @@ public class HelloTask extends AbstractTask {
if (!(StringUtils.equals(EventMeshConstants.PURPOSE_PUB, user.getPurpose()) || StringUtils.equals(EventMeshConstants.PURPOSE_SUB, user.getPurpose()))) {
throw new Exception("client purpose config is error");
}
+
+ if (StringUtils.equals(EventMeshConstants.PURPOSE_PUB, user.getPurpose())
+ && StringUtils.isBlank(user.getProducerGroup())) {
+ throw new Exception("client producerGroup cannot be null");
+ }
+
+ if (StringUtils.equals(EventMeshConstants.PURPOSE_SUB, user.getPurpose())
+ && StringUtils.isBlank(user.getConsumerGroup())) {
+ throw new Exception("client consumerGroup cannot be null");
+ }
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 0ecb375..eca44b9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -60,59 +60,23 @@ public class EventMeshUtil {
return StringUtils.rightPad(String.valueOf(System.currentTimeMillis()), 6) + String.valueOf(RandomStringUtils.randomNumeric(4));
}
- public static String buildMeshClientID(String clientGroup, String meshRegion, String meshCluster) {
+ public static String buildMeshClientID(String clientGroup, String meshCluster) {
return StringUtils.trim(clientGroup)
- + "-" + StringUtils.trim(meshRegion) + "(" + StringUtils.trim(meshCluster) + ")"
+ + "(" + StringUtils.trim(meshCluster) + ")"
+ "-" + EventMeshVersion.getCurrentVersionDesc()
+ "-" + ThreadUtil.getPID();
}
- public static String buildMeshTcpClientID(String clientSysId, String clientDcn, String purpose, String meshCluster) {
+ public static String buildMeshTcpClientID(String clientSysId, String purpose, String meshCluster) {
return StringUtils.trim(clientSysId)
- + "-" + StringUtils.trim(clientDcn)
+ "-" + StringUtils.trim(purpose)
+ "-" + StringUtils.trim(meshCluster)
+ "-" + EventMeshVersion.getCurrentVersionDesc()
+ "-" + ThreadUtil.getPID();
}
- public static String buildMeshTcpRRReplyerProducerGroup() {
- return "EventMesh-Tcp-RRReplyer";
- }
-
- public static String buildMeshTcpRRReplyerClientID(String meshSysId, String meshRegion, String meshDcn, String meshCluster) {
- return meshSysId
- + "-" + StringUtils.trim(meshRegion)
- + "-" + StringUtils.trim(meshDcn)
- + "-" + StringUtils.trim(meshCluster)
- + "-" + EventMeshVersion.getCurrentVersionDesc()
- + "-" + ThreadUtil.getPID()
- + "-RRReplyer";
- //return EventMeshVersion.getCurrentVersionDesc() + "-" + ThreadUtil.getPID() + "(" + meshConfiguration.meshCluster + ")";
- }
-
- public static String buildBroadcastClientConsumerGroup(String systemId, String dcn) {
- return EventMeshConstants.CONSUMER_GROUP_NAME_PREFIX + EventMeshConstants.BROADCAST_PREFIX + systemId + "-" + dcn;
- }
-
- public static String buildPersistentClientConsumerGroup(String systemId, String dcn) {
- return EventMeshConstants.CONSUMER_GROUP_NAME_PREFIX + systemId + "-" + dcn;
- }
-
- public static String buildClientGroup(String systemId, String dcn) {
- return systemId + "-" + dcn;
- }
-
- public static String buildClientProducerGroup(String systemId, String dcn) {
- return EventMeshConstants.PRODUCER_GROUP_NAME_PREFIX + systemId + "-" + dcn;
- }
-
- public static String buildCCAddr(String str) {
- return str + "/namesrvAddr";
- }
-
- public static String buildCCAddr(String str, String idc) {
- return str + "/namesrvAddr/" + idc;
+ public static String buildClientGroup(String systemId) {
+ return systemId;
}
/**
@@ -362,7 +326,7 @@ public class EventMeshUtil {
}
StringBuilder sb = new StringBuilder();
sb.append(client.getSubsystem()).append("-")
- .append(client.getDcn()).append("-")
+ .append("-")
.append(client.getPid()).append("-")
.append(client.getHost()).append(":").append(client.getPort());
return sb.toString();
diff --git a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
index c1cc984..2b18a62 100644
--- a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
+++ b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
@@ -141,7 +141,6 @@ public class MessageUtils {
public static UserAgent generatePubClient() {
UserAgent user = new UserAgent();
- user.setDcn("AC0");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
@@ -156,7 +155,6 @@ public class MessageUtils {
public static UserAgent generateSubServer() {
UserAgent user = new UserAgent();
- user.setDcn("FT0");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
diff --git a/eventmesh-runtime/src/test/java/client/common/UserAgentUtils.java b/eventmesh-runtime/src/test/java/client/common/UserAgentUtils.java
index eff23f1..fdcdff4 100644
--- a/eventmesh-runtime/src/test/java/client/common/UserAgentUtils.java
+++ b/eventmesh-runtime/src/test/java/client/common/UserAgentUtils.java
@@ -26,7 +26,6 @@ public class UserAgentUtils {
public static UserAgent createPubUserAgent() {
UserAgent userAgent = new UserAgent();
userAgent.setSubsystem("5023");
- userAgent.setDcn("AC0");
userAgent.setPid(32893);
userAgent.setVersion("2.0.11");
userAgent.setIdc("FT");
@@ -43,7 +42,6 @@ public class UserAgentUtils {
public static UserAgent createUserAgent() {
UserAgent userAgent = new UserAgent();
userAgent.setSubsystem("5123");
- userAgent.setDcn("WAC");
// userAgent.setPid(UtilAll.getPid());
// userAgent.setHost(RemotingUtil.getLocalAddress());
userAgent.setVersion("2.0.8");
@@ -55,7 +53,6 @@ public class UserAgentUtils {
public static UserAgent createSubUserAgent() {
UserAgent userAgent = new UserAgent();
userAgent.setSubsystem("5243");
- userAgent.setDcn("WAC");
// userAgent.setPid(UtilAll.getPid());
// userAgent.setHost(RemotingUtil.getLocalAddress());
userAgent.setPort(8888);
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java
index e85937e..c24198d 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/RemotingServer.java
@@ -258,8 +258,7 @@ public class RemotingServer {
final LiteConsumeContext eventMeshConsumeContext = new LiteConsumeContext(pushMessageRequestHeader.getEventMeshIp(),
pushMessageRequestHeader.getEventMeshEnv(), pushMessageRequestHeader.getEventMeshIdc(),
- pushMessageRequestHeader.getEventMeshRegion(),
- pushMessageRequestHeader.getEventMeshCluster(), pushMessageRequestHeader.getEventMeshDcn());
+ pushMessageRequestHeader.getEventMeshCluster());
final LiteMessage liteMessage = new LiteMessage(pushMessageRequestBody.getBizSeqNo(), pushMessageRequestBody.getUniqueId(),
topic, pushMessageRequestBody.getContent());
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
index 3688173..fc5368a 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
@@ -40,11 +40,11 @@ public class LiteClientConfig {
private String env;
- private String region;
+ private String consumerGroup = "DefaultConsumerGroup";
- private String idc;
+ private String producerGroup = "DefaultProducerGroup";
- private String dcn;
+ private String idc;
private String ip = "127.0.0.1";
@@ -103,15 +103,6 @@ public class LiteClientConfig {
return this;
}
- public String getRegion() {
- return region;
- }
-
- public LiteClientConfig setRegion(String region) {
- this.region = region;
- return this;
- }
-
public String getIdc() {
return idc;
}
@@ -121,15 +112,6 @@ public class LiteClientConfig {
return this;
}
- public String getDcn() {
- return dcn;
- }
-
- public LiteClientConfig setDcn(String dcn) {
- this.dcn = dcn;
- return this;
- }
-
public String getIp() {
return ip;
}
@@ -184,6 +166,24 @@ public class LiteClientConfig {
return this;
}
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public LiteClientConfig setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public LiteClientConfig setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ return this;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -193,9 +193,9 @@ public class LiteClientConfig {
.append("consumeThreadCore=").append(consumeThreadCore).append(",")
.append("consumeThreadMax=").append(consumeThreadMax).append(",")
.append("env=").append(env).append(",")
- .append("region=").append(region).append(",")
.append("idc=").append(idc).append(",")
- .append("dcn=").append(dcn).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
+ .append("consumerGroup=").append(consumerGroup).append(",")
.append("ip=").append(ip).append(",")
.append("pid=").append(pid).append(",")
.append("sys=").append(sys).append(",")
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index 55b7e16..7841a67 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -17,6 +17,10 @@
package org.apache.eventmesh.client.http.consumer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -47,6 +51,7 @@ import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
+import org.apache.eventmesh.common.protocol.http.body.client.UnSubscribeRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientType;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
@@ -155,9 +160,7 @@ public class LiteConsumer extends AbstractLiteClient {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.SUBSCRIBE.getRequestCode()))
.addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.REGION, eventMeshClientConfig.getRegion())
.addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.DCN, eventMeshClientConfig.getDcn())
.addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
.addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
.addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
@@ -167,6 +170,7 @@ public class LiteConsumer extends AbstractLiteClient {
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
.addBody(SubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
+ .addBody(SubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
.addBody(SubscribeRequestBody.URL, url);
return requestParam;
}
@@ -183,9 +187,7 @@ public class LiteConsumer extends AbstractLiteClient {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.HEARTBEAT.getRequestCode()))
.addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.REGION, eventMeshClientConfig.getRegion())
.addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.DCN, eventMeshClientConfig.getDcn())
.addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
.addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
.addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
@@ -195,6 +197,7 @@ public class LiteConsumer extends AbstractLiteClient {
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
.addBody(HeartbeatRequestBody.CLIENTTYPE, ClientType.SUB.name())
+ .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
.addBody(HeartbeatRequestBody.HEARTBEATENTITIES, JSON.toJSONString(heartbeatEntities));
return requestParam;
}
@@ -271,9 +274,7 @@ public class LiteConsumer extends AbstractLiteClient {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.UNSUBSCRIBE.getRequestCode()))
.addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.REGION, eventMeshClientConfig.getRegion())
.addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.DCN, eventMeshClientConfig.getDcn())
.addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
.addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
.addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
@@ -282,8 +283,9 @@ public class LiteConsumer extends AbstractLiteClient {
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
- .addBody(SubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
- .addBody(SubscribeRequestBody.URL, url);
+ .addBody(UnSubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
+ .addBody(UnSubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+ .addBody(UnSubscribeRequestBody.URL, url);
return requestParam;
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/context/LiteConsumeContext.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/context/LiteConsumeContext.java
index 42739ca..91d84a5 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/context/LiteConsumeContext.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/context/LiteConsumeContext.java
@@ -28,26 +28,20 @@ public class LiteConsumeContext {
private String eventMeshIdc;
- private String eventMeshRegion;
-
private String eventMeshCluster;
- private String eventMeshDcn;
-
//本地RETRY次数
private int retryTimes = 0;
private long createTime = System.currentTimeMillis();
public LiteConsumeContext(String eventMeshIp, String eventMeshEnv,
- String eventMeshIdc, String eventMeshRegion,
- String eventMeshCluster, String eventMeshDcn) {
+ String eventMeshIdc,
+ String eventMeshCluster) {
this.eventMeshIp = eventMeshIp;
this.eventMeshEnv = eventMeshEnv;
this.eventMeshIdc = eventMeshIdc;
- this.eventMeshRegion = eventMeshRegion;
this.eventMeshCluster = eventMeshCluster;
- this.eventMeshDcn = eventMeshDcn;
}
@@ -75,14 +69,6 @@ public class LiteConsumeContext {
this.eventMeshIdc = eventMeshIdc;
}
- public String getEventMeshRegion() {
- return eventMeshRegion;
- }
-
- public void setEventMeshRegion(String eventMeshRegion) {
- this.eventMeshRegion = eventMeshRegion;
- }
-
public String getEventMeshCluster() {
return eventMeshCluster;
}
@@ -91,14 +77,6 @@ public class LiteConsumeContext {
this.eventMeshCluster = eventMeshCluster;
}
- public String getEventMeshDcn() {
- return eventMeshDcn;
- }
-
- public void setEventMeshDcn(String eventMeshDcn) {
- this.eventMeshDcn = eventMeshDcn;
- }
-
public int getRetryTimes() {
return retryTimes;
}
@@ -113,10 +91,8 @@ public class LiteConsumeContext {
sb.append("liteConsumeContext={")
.append("eventMeshIp=").append(eventMeshIp).append(",")
.append("eventMeshEnv=").append(eventMeshEnv).append(",")
- .append("eventMeshRegion=").append(eventMeshRegion).append(",")
.append("eventMeshIdc=").append(eventMeshIdc).append(",")
.append("eventMeshCluster=").append(eventMeshCluster).append(",")
- .append("eventMeshDcn=").append(eventMeshDcn).append(",")
.append("retryTimes=").append(retryTimes).append(",")
.append("createTime=").append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT))
.append("}");
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
index d16b8f9..a240b95 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
@@ -106,9 +106,7 @@ public class LiteProducer extends AbstractLiteClient {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))
.addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.REGION, liteClientConfig.getRegion())
.addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.DCN, liteClientConfig.getDcn())
.addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
.addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
.addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
@@ -117,6 +115,7 @@ public class LiteProducer extends AbstractLiteClient {
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+ .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.TOPIC, message.getTopic())
.addBody(SendMessageRequestBody.CONTENT, message.getContent())
.addBody(SendMessageRequestBody.TTL, message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
@@ -164,9 +163,7 @@ public class LiteProducer extends AbstractLiteClient {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
.addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.REGION, liteClientConfig.getRegion())
.addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.DCN, liteClientConfig.getDcn())
.addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
.addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
.addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
@@ -175,6 +172,7 @@ public class LiteProducer extends AbstractLiteClient {
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(timeout)
+ .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.TOPIC, message.getTopic())
.addBody(SendMessageRequestBody.CONTENT, message.getContent())
.addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
@@ -219,9 +217,7 @@ public class LiteProducer extends AbstractLiteClient {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
.addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.REGION, liteClientConfig.getRegion())
.addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.DCN, liteClientConfig.getDcn())
.addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
.addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
.addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
@@ -230,6 +226,7 @@ public class LiteProducer extends AbstractLiteClient {
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.setTimeout(timeout)
+ .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.TOPIC, message.getTopic())
.addBody(SendMessageRequestBody.CONTENT, message.getContent())
.addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 87ef68a..33cf434 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -99,7 +99,6 @@ public class MessageUtils {
public static UserAgent generateSubClient(UserAgent agent) {
UserAgent user = new UserAgent();
- user.setDcn(agent.getDcn());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
@@ -116,7 +115,6 @@ public class MessageUtils {
public static UserAgent generatePubClient(UserAgent agent) {
UserAgent user = new UserAgent();
- user.setDcn(agent.getDcn());
user.setHost(agent.getHost());
user.setPassword(agent.getPassword());
user.setUsername(agent.getUsername());
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncPublishInstance.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncPublishInstance.java
index 9eaf745..a86d17a 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncPublishInstance.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncPublishInstance.java
@@ -47,9 +47,9 @@ public class AsyncPublishInstance {
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setProducerGroup("EventMeshTest-producerGroup")
.setEnv("env")
.setIdc("idc")
- .setDcn("dcn")
.setIp(IPUtil.getLocalAddress())
.setSys("1234")
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncSyncRequestInstance.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncSyncRequestInstance.java
index a036185..cacf6b2 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncSyncRequestInstance.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/AsyncSyncRequestInstance.java
@@ -48,9 +48,9 @@ public class AsyncSyncRequestInstance {
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setProducerGroup("EventMeshTest-producerGroup")
.setEnv("env")
.setIdc("idc")
- .setDcn("dcn")
.setIp(IPUtil.getLocalAddress())
.setSys("1234")
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/SyncRequestInstance.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/SyncRequestInstance.java
index 22e3f40..b62d294 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/SyncRequestInstance.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/demo/SyncRequestInstance.java
@@ -46,9 +46,9 @@ public class SyncRequestInstance {
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setProducerGroup("EventMeshTest-producerGroup")
.setEnv("env")
.setIdc("idc")
- .setDcn("dcn")
.setIp(IPUtil.getLocalAddress())
.setSys("1234")
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
index a34f523..78e1016 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/common/EventMeshTestUtils.java
@@ -39,10 +39,11 @@ public class EventMeshTestUtils {
public static UserAgent generateClient1() {
UserAgent user = new UserAgent();
- user.setDcn("AC0");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
+ user.setConsumerGroup("EventmeshTest-ConsumerGroup");
+ user.setProducerGroup("EventmeshTest-ProducerGroup");
user.setPath("/data/app/umg_proxy");
user.setPort(8362);
user.setSubsystem("5023");
@@ -54,10 +55,11 @@ public class EventMeshTestUtils {
public static UserAgent generateClient2() {
UserAgent user = new UserAgent();
- user.setDcn("FT0");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
+ user.setConsumerGroup("EventmeshTest-ConsumerGroup");
+ user.setProducerGroup("EventmeshTest-ProducerGroup");
user.setPath("/data/app/umg_proxy");
user.setPort(9362);
user.setSubsystem("5017");
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java
index 558773f..be07624 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java
@@ -56,9 +56,9 @@ public class AsyncPublishInstance {
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setProducerGroup("EventMeshTest-producerGroup")
.setEnv("env")
.setIdc("idc")
- .setDcn("dcn")
.setIp(IPUtil.getLocalAddress())
.setSys("1234")
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncSyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncSyncRequestInstance.java
index 54b2f2c..7bdb6fe 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncSyncRequestInstance.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncSyncRequestInstance.java
@@ -54,9 +54,9 @@ public class AsyncSyncRequestInstance {
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setProducerGroup("EventMeshTest-producerGroup")
.setEnv("env")
.setIdc("idc")
- .setDcn("dcn")
.setIp(IPUtil.getLocalAddress())
.setSys("1234")
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
index 239beea..9d3af8f 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
@@ -46,9 +46,9 @@ public class SyncRequestInstance {
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setProducerGroup("EventMeshTest-producerGroup")
.setEnv("env")
.setIdc("idc")
- .setDcn("dcn")
.setIp(IPUtil.getLocalAddress())
.setSys("1234")
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
index 3fcfa3f..09b64b4 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
@@ -60,7 +60,6 @@ public class SubService implements InitializingBean {
final String url = "http://" + localIp + ":" + localPort + "/sub/test";
final String env = "P";
final String idc = "FT";
- final String dcn = "FT0";
final String subsys = "1234";
// CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher)
@@ -75,9 +74,9 @@ public class SubService implements InitializingBean {
}
LiteClientConfig eventMeshClientConfig = new LiteClientConfig();
eventMeshClientConfig.setLiteEventMeshAddr(eventMeshIPPort)
+ .setConsumerGroup("EventMeshTest-consumerGroup")
.setEnv(env)
.setIdc(idc)
- .setDcn(dcn)
.setIp(IPUtil.getLocalAddress())
.setSys(subsys)
.setPid(String.valueOf(ThreadUtil.getPID()));
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index dedce3a..88578f5 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -24,6 +24,7 @@ import static org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet.TOPIC_PR
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
@@ -35,10 +36,11 @@ public class EventMeshTestUtils {
public static UserAgent generateClient1() {
UserAgent user = new UserAgent();
- user.setDcn("AC0");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
+ user.setProducerGroup("EventmeshTest-ProducerGroup");
+ user.setConsumerGroup("EventmeshTest-ConsumerGroup");
user.setPath("/data/app/umg_proxy");
user.setPort(8362);
user.setSubsystem("5023");
@@ -50,10 +52,11 @@ public class EventMeshTestUtils {
public static UserAgent generateClient2() {
UserAgent user = new UserAgent();
- user.setDcn("FT0");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
+ user.setConsumerGroup("EventmeshTest-ConsumerGroup");
+ user.setProducerGroup("EventmeshTest-ProducerGroup");
user.setPath("/data/app/umg_proxy");
user.setPort(9362);
user.setSubsystem("5017");
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
index c4cb72e..ca03337 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
@@ -36,6 +36,7 @@ public class SyncResponse implements ReceiveMsgHook {
private static EventMeshClient client;
+
public static SyncResponse handler = new SyncResponse();
public static void main(String[] agrs) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org