You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/06/17 07:09:31 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #366 ] remove
custom-format topic concept (#388)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ffe74e [ISSUE #366 ] remove custom-format topic concept (#388)
0ffe74e is described below
commit 0ffe74e660368db1cab6d9d9c55cca206c29384f
Author: nanoxiong <xi...@163.com>
AuthorDate: Thu Jun 17 15:08:26 2021 +0800
[ISSUE #366 ] remove custom-format topic concept (#388)
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
---
.../java/org/apache/eventmesh/common/IPUtil.java | 2 +-
.../eventmesh/common/protocol/SubcriptionType.java | 26 +++++++-
.../Subscription.java => SubscriptionItem.java} | 51 ++++++++++-----
.../common/protocol/SubscriptionMode.java | 28 ++++++++-
.../http/body/client/HeartbeatRequestBody.java | 1 -
.../protocol/http/body/client/RegRequestBody.java | 9 +--
.../http/body/client/SubscribeRequestBody.java | 15 +++--
.../common/protocol/tcp/Subscription.java | 12 ++--
.../runtime/boot/EventMeshHTTPServer.java | 4 +-
.../eventmesh/runtime/boot/EventMeshServer.java | 1 -
.../runtime/constants/DeFiBusConstant.java | 72 ----------------------
.../core/consumergroup/ConsumerGroupTopicConf.java | 18 +++++-
.../http/consumer/ConsumerGroupManager.java | 6 +-
.../protocol/http/consumer/ConsumerManager.java | 1 +
.../protocol/http/consumer/EventMeshConsumer.java | 18 +++---
.../protocol/http/consumer/HandleMsgContext.java | 18 +++++-
.../http/processor/SubscribeProcessor.java | 23 ++++---
.../http/processor/UnSubscribeProcessor.java | 1 +
.../protocol/http/push/AsyncHTTPPushRequest.java | 3 +-
.../tcp/client/group/ClientGroupWrapper.java | 22 ++++---
.../client/group/ClientSessionGroupMapping.java | 12 ++--
.../core/protocol/tcp/client/session/Session.java | 33 +++++-----
.../tcp/client/session/SessionContext.java | 5 +-
.../client/session/push/DownStreamMsgContext.java | 8 ++-
.../tcp/client/session/push/SessionPusher.java | 13 ++--
.../session/push/retry/EventMeshTcpRetryer.java | 7 ++-
.../tcp/client/task/MessageTransferTask.java | 1 +
.../protocol/tcp/client/task/SubscribeTask.java | 21 +++----
.../protocol/tcp/client/task/UnSubscribeTask.java | 8 ++-
.../eventmesh/runtime/util/EventMeshUtil.java | 58 -----------------
.../src/test/java/client/EventMeshClient.java | 10 ++-
.../src/test/java/client/SubClient.java | 10 ++-
.../src/test/java/client/common/MessageUtils.java | 38 ++++++------
.../test/java/client/impl/EventMeshClientImpl.java | 17 ++---
.../src/test/java/client/impl/SubClientImpl.java | 37 +++++------
.../src/test/java/demo/AsyncSubClient.java | 4 +-
.../src/test/java/demo/BroadCastSubClient.java | 4 +-
.../src/test/java/demo/CCSubClient.java | 4 +-
.../src/test/java/demo/CClientDemo.java | 6 +-
.../src/test/java/demo/SyncSubClient.java | 4 +-
.../client/http/consumer/LiteConsumer.java | 29 ++++++---
.../eventmesh/client/tcp/EventMeshClient.java | 4 +-
.../eventmesh/client/tcp/SimpleSubClient.java | 4 +-
.../eventmesh/client/tcp/common/MessageUtils.java | 19 +++---
.../client/tcp/impl/DefaultEventMeshClient.java | 6 +-
.../client/tcp/impl/SimpleSubClientImpl.java | 22 ++++---
.../eventmesh/client/tcp/demo/AsyncSubscribe.java | 4 +-
.../client/tcp/demo/AsyncSubscribeBroadcast.java | 4 +-
.../eventmesh/client/tcp/demo/SyncResponse.java | 4 +-
.../http/demo/sub/service/SubService.java | 12 +++-
.../apache/eventmesh/tcp/demo/AsyncSubscribe.java | 4 +-
.../tcp/demo/AsyncSubscribeBroadcast.java | 4 +-
.../apache/eventmesh/tcp/demo/SyncResponse.java | 4 +-
53 files changed, 382 insertions(+), 369 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java
index 4cd19e4..5165deb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/IPUtil.java
@@ -35,7 +35,7 @@ public class IPUtil {
public static String getLocalAddress() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
- String dockerHostIp = System.getenv("webank_docker_host_ip");
+ String dockerHostIp = System.getenv("docker_host_ip");
if (dockerHostIp != null && !"".equals(dockerHostIp))
return dockerHostIp;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java
similarity index 67%
copy from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java
index f06ba02..6b6cb16 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubcriptionType.java
@@ -15,7 +15,29 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.consumergroup.event;
+package org.apache.eventmesh.common.protocol;
-public class ConsumerGroupInstanceChangeEvent {
+public enum SubcriptionType {
+ /**
+ * SYNC
+ */
+ SYNC("SYNC"),
+ /**
+ * ASYNC
+ */
+ ASYNC("ASYNC");
+
+ private String type;
+
+ SubcriptionType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java
similarity index 50%
copy from eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java
index 83b0dc9..abcc22c 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionItem.java
@@ -15,36 +15,57 @@
* limitations under the License.
*/
-package org.apache.eventmesh.common.protocol.tcp;
+package org.apache.eventmesh.common.protocol;
-import java.util.LinkedList;
-import java.util.List;
+public class SubscriptionItem {
-public class Subscription {
+ private String topic;
- private List<String> topicList = new LinkedList<>();
+ private SubscriptionMode mode;
- public Subscription() {
+ private SubcriptionType type;
+
+ public SubscriptionItem() {
+ }
+
+ public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
+ this.topic = topic;
+ this.mode = mode;
+ this.type = type;
}
- public Subscription(List<String> topicList) {
- this.topicList = topicList;
+ public SubcriptionType getType() {
+ return type;
}
- public List<String> getTopicList() {
- return topicList;
+ public void setType(SubcriptionType type) {
+ this.type = type;
}
- public void setTopicList(List<String> topicList) {
- this.topicList = topicList;
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public SubscriptionMode getMode() {
+ return mode;
+ }
+
+ public void setMode(SubscriptionMode mode) {
+ this.mode = mode;
}
@Override
public String toString() {
- return "Subscription{" +
- "topicList=" + topicList +
+ return "SubscriptionItem{" +
+ "topic=" + topic +
+ ", mode=" + mode +
+ ", type=" + type +
'}';
}
+}
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java
similarity index 64%
rename from eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java
index f06ba02..ad4b751 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/event/ConsumerGroupInstanceChangeEvent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/SubscriptionMode.java
@@ -15,7 +15,31 @@
* limitations under the License.
*/
-package org.apache.eventmesh.runtime.core.consumergroup.event;
+package org.apache.eventmesh.common.protocol;
+
+public enum SubscriptionMode {
+
+ /**
+ * broadcast
+ */
+ BROADCASTING("BROADCASTING"),
+ /**
+ * clustering
+ */
+ CLUSTERING("CLUSTERING");
+
+ private String mode;
+
+ SubscriptionMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
-public class ConsumerGroupInstanceChangeEvent {
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
index 5c3d1ea..ace4d71 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
@@ -32,7 +32,6 @@ public class HeartbeatRequestBody extends Body {
public static final String CLIENTTYPE = "clientType";
public static final String HEARTBEATENTITIES = "heartbeatEntities";
-
private String clientType;
private List<HeartbeatEntity> heartbeatEntities;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
index 41c5c5a..78bb684 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
@@ -25,6 +25,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.apache.commons.collections4.MapUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;
public class RegRequestBody extends Body {
@@ -39,13 +40,13 @@ public class RegRequestBody extends Body {
private String endPoint;
- private List<String> topics;
+ private List<SubscriptionItem> topics;
- public List<String> getTopics() {
+ public List<SubscriptionItem> getTopics() {
return topics;
}
- public void setTopics(List<String> topics) {
+ public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}
@@ -69,7 +70,7 @@ public class RegRequestBody extends Body {
RegRequestBody body = new RegRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
- body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), String.class));
+ body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
return body;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
index 6a37cc5..991b834 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
@@ -25,6 +25,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.apache.commons.collections4.MapUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;
public class SubscribeRequestBody extends Body {
@@ -33,20 +34,18 @@ public class SubscribeRequestBody extends Body {
public static final String URL = "url";
- private List<String> topics;
+ private List<SubscriptionItem> topics;
- private String url;
-
- private String topic;
-
- public List<String> getTopics() {
+ public List<SubscriptionItem> getTopics() {
return topics;
}
- public void setTopics(List<String> topics) {
+ public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}
+ private String url;
+
public String getUrl() {
return url;
}
@@ -58,7 +57,7 @@ public class SubscribeRequestBody extends Body {
public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
SubscribeRequestBody body = new SubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
- body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
+ body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
return body;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
index 83b0dc9..3fb74b9 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Subscription.java
@@ -17,25 +17,27 @@
package org.apache.eventmesh.common.protocol.tcp;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+
import java.util.LinkedList;
import java.util.List;
public class Subscription {
- private List<String> topicList = new LinkedList<>();
+ private List<SubscriptionItem> topicList = new LinkedList<>();
public Subscription() {
}
- public Subscription(List<String> topicList) {
+ public Subscription(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}
- public List<String> getTopicList() {
+ public List<SubscriptionItem> getTopicList() {
return topicList;
}
- public void setTopicList(List<String> topicList) {
+ public void setTopicList(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}
@@ -45,6 +47,4 @@ public class Subscription {
"topicList=" + topicList +
'}';
}
-
-
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 80540c0..84a2ab7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -54,9 +54,9 @@ public class EventMeshHTTPServer extends AbrstractHTTPServer {
private EventMeshHTTPConfiguration eventMeshHttpConfiguration;
- public final ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
+ public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
- public final ConcurrentHashMap<String, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
+ public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
public EventMeshHTTPServer(EventMeshServer eventMeshServer,
EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index 61bfb89..19d6953 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -54,7 +54,6 @@ public class EventMeshServer {
String eventstore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV));
logger.info("eventstore : {}", eventstore);
-// logger.info("load custom {} class for eventMesh", ConsumeMessageConcurrentlyService.class.getCanonicalName());
serviceState = ServiceState.INITED;
logger.info("server state:{}", serviceState);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java
deleted file mode 100644
index 898854d..0000000
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/DeFiBusConstant.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to Apache Software Foundation (ASF) under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Apache Software Foundation (ASF) licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.eventmesh.runtime.constants;
-
-//TODO
-public class DeFiBusConstant {
- public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; //requester clientId
-
- public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID";
-
- public static final String PROPERTY_MESSAGE_TTL = "TTL"; //timeout for request-response
-
- public static final String PROPERTY_MESSAGE_CLUSTER = "CLUSTER"; //cluster name
-
- public static final String PROPERTY_MESSAGE_BROKER = "BROKER"; //broker name where message stored
-
- public static final String REDIRECT = "REDIRECT";
-
- public static final String REDIRECT_FLAG = "REDIRECT_FLAG";
-
- public static final String PLUGIN_CLASS_NAME = "org.apache.defibus.broker.plugin.DeFiPluginMessageStore";
-
- public static final String RR_REPLY_TOPIC = "rr-reply-topic"; //post fix for reply topic
-
- public static final String KEY = "msgType";
-
- public static final String DEFAULT_TTL = "14400000";
-
- public static final String EXT_CONSUMER_GROUP = "ExtConsumerGroup";
-
- public static final String RMQ_SYS = "RMQ_SYS_";
-
- /**
- * msgType1: indicate the msg is broadcast message
- */
- public static final String DIRECT = "direct";
-
- /**
- * msgType2: msg of type except broadcast and reply
- */
- public static final String PERSISTENT = "persistent";
-
- /**
- * msgType3: indicate the msg is which consumer reply to producer
- */
- public static final String REPLY = "reply";
-
- public static final String INSTANCE_NAME_SEPERATER = "#";
-
- public static final String IDC_SEPERATER = "-";
-
- public static final String LEAVE_TIME = "LEAVE_TIME"; //leaveBrokerTime
- public static final String ARRIVE_TIME = "ARRIVE_TIME";
- public static final String STORE_TIME = "STORE_TIME";
-
-}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
index 5a16c4c..f29d90a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
@@ -25,6 +25,7 @@ import java.util.Set;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,11 @@ public class ConsumerGroupTopicConf {
private String topic;
/**
+ * @see org.apache.eventmesh.common.protocol.SubscriptionItem
+ */
+ private SubscriptionItem subscriptionItem;
+
+ /**
* PUSH URL
*/
private Map<String /** IDC*/, List<String> /** IDC内URL列表*/> idcUrls = Maps.newConcurrentMap();
@@ -53,12 +59,13 @@ public class ConsumerGroupTopicConf {
ConsumerGroupTopicConf that = (ConsumerGroupTopicConf) o;
return consumerGroup.equals(that.consumerGroup) &&
Objects.equals(topic, that.topic) &&
+ Objects.equals(subscriptionItem, that.subscriptionItem) &&
Objects.equals(idcUrls, that.idcUrls);
}
@Override
public int hashCode() {
- return Objects.hash(consumerGroup, topic, idcUrls);
+ return Objects.hash(consumerGroup, topic, subscriptionItem, idcUrls);
}
@Override
@@ -66,6 +73,7 @@ public class ConsumerGroupTopicConf {
StringBuilder sb = new StringBuilder();
sb.append("consumeTopicConfig={consumerGroup=").append(consumerGroup)
.append(",topic=").append(topic)
+ .append(",subscriptionMode=").append(subscriptionItem)
.append(",idcUrls=").append(idcUrls).append("}");
return sb.toString();
}
@@ -86,6 +94,14 @@ public class ConsumerGroupTopicConf {
this.topic = topic;
}
+ public SubscriptionItem getSubscriptionItem() {
+ return subscriptionItem;
+ }
+
+ public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
+ this.subscriptionItem = subscriptionItem;
+ }
+
public Map<String, List<String>> getIdcUrls() {
return idcUrls;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java
index 76381e3..c73523b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerGroupManager.java
@@ -17,10 +17,12 @@
package org.apache.eventmesh.runtime.core.protocol.http.consumer;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
public class ConsumerGroupManager {
@@ -52,8 +54,8 @@ public class ConsumerGroupManager {
}
private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception {
- for (String topic : consumerGroupConfig.getConsumerGroupTopicConf().keySet()) {
- eventMeshConsumer.subscribe(topic);
+ for (Map.Entry<String, ConsumerGroupTopicConf> conf : consumerGroupConfig.getConsumerGroupTopicConf().entrySet()) {
+ eventMeshConsumer.subscribe(conf.getKey(), conf.getValue().getSubscriptionItem());
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
index 53e8a3c..14486bb 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
@@ -113,6 +113,7 @@ public class ConsumerManager {
ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(topic);
+ latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);
latestTopicConf.setIdcUrls(idcUrls);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 36dee3c..e39afb2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -33,6 +33,8 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -108,9 +110,9 @@ public class EventMeshConsumer {
started4Broadcast.compareAndSet(false, true);
}
- public void subscribe(String topic) throws Exception {
+ public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception {
AsyncMessageListener listener = null;
- if (!EventMeshUtil.isBroadcast(topic)) {
+ if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
listener = new AsyncMessageListener() {
@Override
public void consume(Message message, AsyncConsumeContext context) {
@@ -139,7 +141,7 @@ public class EventMeshConsumer {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+ topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
@@ -188,7 +190,7 @@ public class EventMeshConsumer {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
- topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
+ topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
@@ -210,8 +212,8 @@ public class EventMeshConsumer {
}
}
- public void unsubscribe(String topic) throws Exception {
- if (EventMeshUtil.isBroadcast(topic)) {
+ public void unsubscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
+ if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
broadcastMqConsumer.unsubscribe(topic);
} else {
persistentMqConsumer.unsubscribe(topic);
@@ -234,8 +236,8 @@ public class EventMeshConsumer {
started4Broadcast.compareAndSet(true, false);
}
- public void updateOffset(String topic, List<Message> msgs, AbstractContext context) {
- if (EventMeshUtil.isBroadcast(topic)) {
+ public void updateOffset(String topic, SubscriptionMode subscriptionMode, List<Message> msgs, AbstractContext context) {
+ if (SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
broadcastMqConsumer.updateOffset(msgs, context);
} else {
persistentMqConsumer.updateOffset(msgs, context);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
index 01768ca..2d19174 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/HandleMsgContext.java
@@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -50,6 +52,8 @@ public class HandleMsgContext {
private String topic;
+ private SubscriptionItem subscriptionItem;
+
private Message msg;
private int ttl;
@@ -67,7 +71,7 @@ public class HandleMsgContext {
private Map<String, String> props;
public HandleMsgContext(String msgRandomNo, String consumerGroup, EventMeshConsumer eventMeshConsumer,
- String topic, Message msg,
+ String topic, Message msg, SubscriptionItem subscriptionItem,
AbstractContext context, ConsumerGroupConf consumerGroupConfig,
EventMeshHTTPServer eventMeshHTTPServer, String bizSeqNo, String uniqueId, ConsumerGroupTopicConf consumeTopicConfig) {
this.msgRandomNo = msgRandomNo;
@@ -75,6 +79,7 @@ public class HandleMsgContext {
this.eventMeshConsumer = eventMeshConsumer;
this.topic = topic;
this.msg = msg;
+ this.subscriptionItem = subscriptionItem;
this.context = context;
this.consumerGroupConfig = consumerGroupConfig;
this.eventMeshHTTPServer = eventMeshHTTPServer;
@@ -152,6 +157,14 @@ public class HandleMsgContext {
this.msg = msg;
}
+ public SubscriptionItem getSubscriptionItem() {
+ return subscriptionItem;
+ }
+
+ public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
+ this.subscriptionItem = subscriptionItem;
+ }
+
public long getCreateTime() {
return createTime;
}
@@ -188,7 +201,7 @@ public class HandleMsgContext {
// msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER),
// msg.getQueueId(), msg.getQueueOffset());
}
- eventMeshConsumer.updateOffset(topic, Arrays.asList(msg), context);
+ eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Arrays.asList(msg), context);
}
}
@@ -214,6 +227,7 @@ public class HandleMsgContext {
sb.append("handleMsgContext={")
.append("consumerGroup=").append(consumerGroup)
.append(",topic=").append(topic)
+ .append(",subscriptionItem=").append(subscriptionItem)
.append(",consumeTopicConfig=").append(consumeTopicConfig)
.append(",bizSeqNo=").append(bizSeqNo)
.append(",uniqueId=").append(uniqueId)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 1d0b6e8..2186944 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -30,6 +30,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
@@ -98,7 +99,7 @@ public class SubscribeProcessor implements HttpRequestProcessor {
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- List<String> subTopicList = subscribeRequestBody.getTopics();
+ List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
String url = subscribeRequestBody.getUrl();
String consumerGroup = EventMeshUtil.buildClientGroup(subscribeRequestHeader.getSys(),
@@ -108,8 +109,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
- for (String subTopic : subTopicList) {
- List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic);
+ for (SubscriptionItem subTopic : subTopicList) {
+ List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic.getTopic());
if (CollectionUtils.isEmpty(groupTopicClients)) {
httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic);
@@ -131,23 +132,25 @@ public class SubscribeProcessor implements HttpRequestProcessor {
consumerGroupConf = new ConsumerGroupConf(consumerGroup);
ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
consumeTopicConfig.setConsumerGroup(consumerGroup);
- consumeTopicConfig.setTopic(subTopic);
+ consumeTopicConfig.setTopic(subTopic.getTopic());
+ consumeTopicConfig.setSubscriptionItem(subTopic);
consumeTopicConfig.setUrls(new HashSet<>(Arrays.asList(url)));
consumeTopicConfig.setIdcUrls(idcUrls);
Map<String, ConsumerGroupTopicConf> map = new HashMap<>();
- map.put(subTopic, consumeTopicConfig);
+ map.put(subTopic.getTopic(), consumeTopicConfig);
consumerGroupConf.setConsumerGroupTopicConf(map);
} else {
// 已有订阅
Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
for (String key : map.keySet()) {
- if (StringUtils.equals(subTopic, key)) {
+ if (StringUtils.equals(subTopic.getTopic(), key)) {
ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
ConsumerGroupTopicConf currentTopicConf = map.get(key);
latestTopicConf.setConsumerGroup(consumerGroup);
- latestTopicConf.setTopic(subTopic);
+ latestTopicConf.setTopic(subTopic.getTopic());
+ latestTopicConf.setSubscriptionItem(subTopic);
latestTopicConf.setUrls(new HashSet<>(Arrays.asList(url)));
latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());
@@ -206,8 +209,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
}
private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup,
- List<String> topicList, String url) {
- for(String topic: topicList) {
+ List<SubscriptionItem> subscriptionItems, String url) {
+ for(SubscriptionItem item: subscriptionItems) {
Client client = new Client();
client.env = subscribeRequestHeader.getEnv();
client.dcn = subscribeRequestHeader.getDcn();
@@ -216,7 +219,7 @@ public class SubscribeProcessor implements HttpRequestProcessor {
client.ip = subscribeRequestHeader.getIp();
client.pid = subscribeRequestHeader.getPid();
client.consumerGroup = consumerGroup;
- client.topic = topic;
+ client.topic = item.getTopic();
client.url = url;
client.lastUpTime = new Date();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index 4d9f2e2..b723054 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -166,6 +166,7 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(unSubTopic);
+ latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);
latestTopicConf.setIdcUrls(idcUrls);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index af71ce8..831296a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.text.RandomStringGenerator;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
@@ -88,7 +89,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
String requestCode = "";
- if (EventMeshUtil.isService(handleMsgContext.getTopic())) {
+ if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 0bc645e..2c3a46f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -43,6 +43,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -511,9 +513,9 @@ public class ClientGroupWrapper {
logger.info("starting broadCastMsgConsumer success, group:{}", groupName);
}
- public void subscribe(String topic) throws Exception {
+ public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
AsyncMessageListener listener = null;
- if (EventMeshUtil.isBroadcast(topic)) {
+ if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
listener = new AsyncMessageListener() {
@Override
public void consume(Message message, AsyncConsumeContext context) {
@@ -534,7 +536,7 @@ public class ClientGroupWrapper {
Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false);
+ new DownStreamMsgContext(message, null, broadCastMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
while (sessionsItr.hasNext()) {
Session session = sessionsItr.next();
@@ -562,7 +564,7 @@ public class ClientGroupWrapper {
context.commit(Action.CommitMessage);
}
};
- broadCastMsgConsumer.subscribe(topic, listener);
+ broadCastMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
} else {
listener = new AsyncMessageListener() {
@Override
@@ -606,7 +608,7 @@ public class ClientGroupWrapper {
}
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false);
+ new DownStreamMsgContext(message, session, persistentMsgConsumer, ((MeshAsyncConsumeContext)context).getContext(), false, subscriptionItem);
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
@@ -615,15 +617,15 @@ public class ClientGroupWrapper {
context.commit(Action.CommitMessage);
}
};
- persistentMsgConsumer.subscribe(topic, listener);
+ persistentMsgConsumer.subscribe(subscriptionItem.getTopic(), listener);
}
}
- public void unsubscribe(String topic) throws Exception {
- if (EventMeshUtil.isBroadcast(topic)) {
- broadCastMsgConsumer.unsubscribe(topic);
+ public void unsubscribe(SubscriptionItem subscriptionItem) throws Exception {
+ if (SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
+ broadCastMsgConsumer.unsubscribe(subscriptionItem.getTopic());
} else {
- persistentMsgConsumer.unsubscribe(topic);
+ persistentMsgConsumer.unsubscribe(subscriptionItem.getTopic());
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index c45c545..c4cd170 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -34,6 +34,8 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.ThreadUtil;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -280,10 +282,10 @@ public class ClientSessionGroupMapping {
* @param session
*/
private void cleanSubscriptionInSession(Session session) throws Exception {
- for (String topic : session.getSessionContext().subscribeTopics.values()) {
- session.getClientGroupWrapper().get().removeSubscription(topic, session);
- if (!session.getClientGroupWrapper().get().hasSubscription(topic)) {
- session.getClientGroupWrapper().get().unsubscribe(topic);
+ for (SubscriptionItem item : session.getSessionContext().subscribeTopics.values()) {
+ session.getClientGroupWrapper().get().removeSubscription(item.getTopic(), session);
+ if (!session.getClientGroupWrapper().get().hasSubscription(item.getTopic())) {
+ session.getClientGroupWrapper().get().unsubscribe(item);
}
}
}
@@ -298,7 +300,7 @@ public class ClientSessionGroupMapping {
if (unAckMsg.size() > 0 && session.getClientGroupWrapper().get().getGroupConsumerSessions().size() > 0) {
for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
DownStreamMsgContext downStreamMsgContext = entry.getValue();
- if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
+ if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt), session.getClient());
continue;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index 4ff0480..fca8d2a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -32,10 +32,9 @@ import io.openmessaging.api.SendCallback;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
@@ -162,27 +161,27 @@ public class Session {
this.listenRequestSeq = listenRequestSeq;
}
- public void subscribe(List<String> topics) throws Exception {
- for (String topic : topics) {
- sessionContext.subscribeTopics.putIfAbsent(topic, topic);
- clientGroupWrapper.get().subscribe(topic);
+ public void subscribe(List<SubscriptionItem> items) throws Exception {
+ for (SubscriptionItem item : items) {
+ sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
+ clientGroupWrapper.get().subscribe(item);
- clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(topic,
+ clientGroupWrapper.get().getMqProducerWrapper().getMeshMQProducer().getDefaultTopicRouteInfoFromNameServer(item.getTopic(),
EventMeshConstants.DEFAULT_TIME_OUT_MILLS);
- clientGroupWrapper.get().addSubscription(topic, this);
- subscribeLogger.info("subscribe|succeed|topic={}|user={}", topic, client);
+ clientGroupWrapper.get().addSubscription(item.getTopic(), this);
+ subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
}
}
- public void unsubscribe(List<String> topics) throws Exception {
- for (String topic : topics) {
- sessionContext.subscribeTopics.remove(topic);
- clientGroupWrapper.get().removeSubscription(topic, this);
+ public void unsubscribe(List<SubscriptionItem> items) throws Exception {
+ for (SubscriptionItem item : items) {
+ sessionContext.subscribeTopics.remove(item.getTopic());
+ clientGroupWrapper.get().removeSubscription(item.getTopic(), this);
- if (!clientGroupWrapper.get().hasSubscription(topic)) {
- clientGroupWrapper.get().unsubscribe(topic);
- subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", topic, client);
+ if (!clientGroupWrapper.get().hasSubscription(item.getTopic())) {
+ clientGroupWrapper.get().unsubscribe(item);
+ subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client);
}
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
index 9d5b22e..e8f851b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/SessionContext.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.session;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
public class SessionContext {
@@ -28,7 +29,7 @@ public class SessionContext {
public ConcurrentHashMap<String, String> sendTopics = new ConcurrentHashMap<String, String>();
- public ConcurrentHashMap<String, String> subscribeTopics = new ConcurrentHashMap<String, String>();
+ public ConcurrentHashMap<String, SubscriptionItem> subscribeTopics = new ConcurrentHashMap<String, SubscriptionItem>();
public long createTime = System.currentTimeMillis();
@@ -38,7 +39,7 @@ public class SessionContext {
@Override
public String toString() {
- return "SessionContext{subscribeTopics=" + subscribeTopics.keySet()
+ return "SessionContext{subscribeTopics=" + subscribeTopics
+ ",sendTopics=" + sendTopics.keySet()
+ ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) + "}";
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
index 01ba40e..8e441bd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java
@@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
@@ -51,6 +53,8 @@ public class DownStreamMsgContext implements Delayed {
public int retryTimes;
+ public SubscriptionItem subscriptionItem;
+
private long executeTime;
public long lastPushTime;
@@ -61,7 +65,7 @@ public class DownStreamMsgContext implements Delayed {
public boolean msgFromOtherEventMesh;
- public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper consumer, AbstractContext consumeConcurrentlyContext, boolean msgFromOtherEventMesh) {
+ public DownStreamMsgContext(Message msgExt, Session session, MQConsumerWrapper consumer, AbstractContext consumeConcurrentlyContext, boolean msgFromOtherEventMesh, SubscriptionItem subscriptionItem) {
this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet());
this.msgExt = msgExt;
this.session = session;
@@ -71,6 +75,7 @@ public class DownStreamMsgContext implements Delayed {
this.lastPushTime = System.currentTimeMillis();
this.executeTime = System.currentTimeMillis();
this.createTime = System.currentTimeMillis();
+ this.subscriptionItem = subscriptionItem;
String ttlStr = msgExt.getUserProperties("TTL");
long ttl = StringUtils.isNumeric(ttlStr) ? Long.parseLong(ttlStr) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
this.expireTime = System.currentTimeMillis() + ttl;
@@ -108,6 +113,7 @@ public class DownStreamMsgContext implements Delayed {
",consumer=" + consumer +
// todo ",consumerGroup=" + consumer.getClass().getConsumerGroup() +
",topic=" + msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION) +
+ ",subscriptionItem=" + subscriptionItem +
",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT) +
",executeTime=" + DateFormatUtils.format(executeTime, EventMeshConstants.DATE_FORMAT) +
",lastPushTime=" + DateFormatUtils.format(lastPushTime, EventMeshConstants.DATE_FORMAT) + '}';
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
index b1171f2..12c7cd1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.java
@@ -21,10 +21,9 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.Constants;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
@@ -63,9 +62,9 @@ public class SessionPusher {
public void push(final DownStreamMsgContext downStreamMsgContext) {
Command cmd;
- if (EventMeshUtil.isBroadcast(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
+ if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
cmd = Command.BROADCAST_MESSAGE_TO_CLIENT;
- } else if (EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION))) {
+ } else if (SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType())) {
cmd = Command.REQUEST_TO_CLIENT;
} else {
cmd = Command.ASYNC_MESSAGE_TO_CLIENT;
@@ -102,7 +101,7 @@ public class SessionPusher {
logger.warn("isolate client:{},isolateTime:{}", session.getClient(), isolateTime);
//retry
- long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
+ long delayTime = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0 : session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills;
downStreamMsgContext.delay(delayTime);
session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
} else {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
index fc619b3..b8fcf0f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.java
@@ -28,6 +28,8 @@ import io.openmessaging.api.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
@@ -74,7 +76,7 @@ public class EventMeshTcpRetryer {
return;
}
- int maxRetryTimes = EventMeshUtil.isService(downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION)) ? 1 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes;
+ int maxRetryTimes = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 1 : eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes;
if (downStreamMsgContext.retryTimes >= maxRetryTimes) {
logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", maxRetryTimes, downStreamMsgContext.retryTimes,
downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
@@ -118,7 +120,7 @@ public class EventMeshTcpRetryer {
Session rechoosen = null;
String topic = downStreamMsgContext.msgExt.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- if (!EventMeshUtil.isBroadcast(topic)) {
+ if (!SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
rechoosen = downStreamMsgContext.session.getClientGroupWrapper()
.get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getGroupName()
, topic
@@ -127,7 +129,6 @@ public class EventMeshTcpRetryer {
rechoosen = downStreamMsgContext.session;
}
-
if (rechoosen == null) {
logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index c903742..2d4b80d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -85,6 +85,7 @@ public class MessageTransferTask extends AbstractTask {
long sendTime = System.currentTimeMillis();
addTimestamp(eventMeshMessage, cmd, sendTime);
if (cmd.equals(Command.REQUEST_TO_SERVER)) {
+ //Message Attach SYNC
eventMeshMessage.getProperties().put(EventMeshConstants.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper()
.get().getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
index e26c667..ccacf56 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
@@ -22,13 +22,11 @@ import java.util.List;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
-import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,17 +49,14 @@ public class SubscribeTask extends AbstractTask {
throw new Exception("subscriptionInfo is null");
}
- List<String> topicList = new ArrayList<>();
+ List<SubscriptionItem> subscriptionItems = new ArrayList<>();
for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) {
- String topic = subscriptionInfo.getTopicList().get(i);
- if (!EventMeshUtil.isValidRMBTopic(topic)) {
- throw new Exception("invalid topic!");
- }
- topicList.add(topic);
+ SubscriptionItem item = subscriptionInfo.getTopicList().get(i);
+ subscriptionItems.add(item);
}
synchronized (session) {
- session.subscribe(topicList);
- messageLogger.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), topicList);
+ session.subscribe(subscriptionItems);
+ messageLogger.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems);
}
msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(), pkg.getHeader()
.getSeq()));
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
index 3a338ed..c35245a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/UnSubscribeTask.java
@@ -19,10 +19,12 @@ package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.collections4.MapUtils;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
@@ -46,10 +48,10 @@ public class UnSubscribeTask extends AbstractTask {
Package msg = new Package();
try {
synchronized (session) {
- List<String> topics = new ArrayList<String>();
+ List<SubscriptionItem> topics = new ArrayList<SubscriptionItem>();
if (MapUtils.isNotEmpty(session.getSessionContext().subscribeTopics)) {
- for (String topic : session.getSessionContext().subscribeTopics.keySet()) {
- topics.add(topic);
+ for (Map.Entry<String, SubscriptionItem> entry : session.getSessionContext().subscribeTopics.entrySet()) {
+ topics.add(entry.getValue());
}
session.unsubscribe(topics);
messageLogger.info("UnSubscriberTask succeed|user={}|topics={}", session.getClient(), topics);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
index 3814fb7..0ecb375 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/EventMeshUtil.java
@@ -115,64 +115,6 @@ public class EventMeshUtil {
return str + "/namesrvAddr/" + idc;
}
- public static boolean isValidRMBTopic(String topic) {
- if (StringUtils.isEmpty(topic) || !StringUtils.contains(topic, "-")) {
- return false;
- }
-
- String[] args = StringUtils.split(topic, "-");
- if (ArrayUtils.getLength(args) != 5) {
- return false;
- }
-
- String s0e = args[1];
- if (!StringUtils.equalsIgnoreCase("s", s0e) && !StringUtils.equalsIgnoreCase("e", s0e)) {
- return false;
- }
-
- String service = args[2];
- if (!StringUtils.isNumeric(service)) {
- return false;
- }
-
- return true;
- }
-
- public static String getServiceIDStr(String topic) {
- if (!isValidRMBTopic(topic)) {
- return "";
- }
-
- String[] args = StringUtils.split(topic, "-");
- return args[2];
- }
-
- public static String getPidStr(String topic) {
- if (!isValidRMBTopic(topic)) {
- return "";
- }
-
- String[] args = StringUtils.split(topic, "-");
- return args[3];
- }
-
- public static boolean isService(String topic) {
- String serviceStr = getServiceIDStr(topic);
- if (StringUtils.isEmpty(serviceStr)) {
- return false;
- }
- return "0".equals(StringUtils.substring(serviceStr, 3, 4));
- }
-
- public static boolean isBroadcast(String topic) {
- String serviceStr = getServiceIDStr(topic);
- if (StringUtils.isEmpty(serviceStr)) {
- return false;
- }
- String[] args = StringUtils.split(topic, "-");
- return "3".equals(StringUtils.substring(args[2], 3, 4)) || "4".equals(StringUtils.substring(args[2], 3, 4));
- }
-
/**
* 自定义取堆栈
*
diff --git a/eventmesh-runtime/src/test/java/client/EventMeshClient.java b/eventmesh-runtime/src/test/java/client/EventMeshClient.java
index ce4a13e..547bf61 100644
--- a/eventmesh-runtime/src/test/java/client/EventMeshClient.java
+++ b/eventmesh-runtime/src/test/java/client/EventMeshClient.java
@@ -17,9 +17,11 @@
package client;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
import client.hook.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
public interface EventMeshClient {
@@ -37,13 +39,9 @@ public interface EventMeshClient {
Package listen() throws Exception;
- Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception;
+ Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
- Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception;
-
- Package justSubscribe(String topic) throws Exception;
-
- Package justUnsubscribe(String topic) throws Exception;
+ Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
void registerPubBusiHandler(ReceiveMsgHook handler) throws Exception;
diff --git a/eventmesh-runtime/src/test/java/client/SubClient.java b/eventmesh-runtime/src/test/java/client/SubClient.java
index dff635a..e591ea6 100644
--- a/eventmesh-runtime/src/test/java/client/SubClient.java
+++ b/eventmesh-runtime/src/test/java/client/SubClient.java
@@ -17,7 +17,9 @@
package client;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import client.hook.ReceiveMsgHook;
@@ -32,13 +34,9 @@ public interface SubClient {
void reconnect() throws Exception;
- Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception;
+ Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
- Package justSubscribe(String topic) throws Exception;
-
- Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception;
-
- Package justUnsubscribe(String topic) throws Exception;
+ Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
Package listen() throws Exception;
diff --git a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
index 0eb313e..c1cc984 100644
--- a/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
+++ b/eventmesh-runtime/src/test/java/client/common/MessageUtils.java
@@ -21,12 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
public class MessageUtils {
public static int seqLength = 10;
@@ -63,10 +63,10 @@ public class MessageUtils {
return msg;
}
- public static Package subscribe(String topic) {
+ public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
- msg.setBody(generateSubscription(topic));
+ msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
return msg;
}
@@ -76,10 +76,10 @@ public class MessageUtils {
return msg;
}
- public static Package unsubscribe(String topic) {
+ public static Package unsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.UNSUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
- msg.setBody(generateSubscription(topic));
+ msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
return msg;
}
@@ -170,20 +170,20 @@ public class MessageUtils {
public static Subscription generateSubscription() {
Subscription subscription = new Subscription();
- List<String> topicList = new ArrayList<>();
- topicList.add("FT0-s-80000000-01-0");
- topicList.add("FT0-s-80000000-02-0");
- topicList.add("FT0-s-80000000-03-0");
- topicList.add("FT0-s-80000000-04-0");
- subscription.setTopicList(topicList);
+ List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+ subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+ subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-02-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+ subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-03-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+ subscriptionItems.add(new SubscriptionItem("FT0-s-80000000-04-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC));
+ subscription.setTopicList(subscriptionItems);
return subscription;
}
- public static Subscription generateSubscription(String topic) {
+ public static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
Subscription subscription = new Subscription();
- List<String> topicList = new ArrayList<>();
- topicList.add(topic);
- subscription.setTopicList(topicList);
+ List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+ subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+ subscription.setTopicList(subscriptionItems);
return subscription;
}
diff --git a/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java
index 5f58c1e..3913bcd 100644
--- a/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java
+++ b/eventmesh-runtime/src/test/java/client/impl/EventMeshClientImpl.java
@@ -17,7 +17,9 @@
package client.impl;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import client.EventMeshClient;
@@ -76,26 +78,19 @@ public class EventMeshClientImpl implements EventMeshClient {
this.subClient.heartbeat();
}
- public Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception {
- return this.subClient.justSubscribe(serviceId, scenario, dcn);
- }
public Package listen() throws Exception {
return this.subClient.listen();
}
- public Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception {
- return this.subClient.justUnsubscribe(serviceId, scenario, dcn);
- }
-
@Override
- public Package justSubscribe(String topic) throws Exception {
- return this.subClient.justSubscribe(topic);
+ public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+ return this.subClient.justSubscribe(topic, subscriptionMode, subcriptionType);
}
@Override
- public Package justUnsubscribe(String topic) throws Exception {
- return this.subClient.justUnsubscribe(topic);
+ public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+ return this.subClient.justUnsubscribe(topic, subscriptionMode, subcriptionType);
}
diff --git a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java
index 3a61b48..e771715 100644
--- a/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java
+++ b/eventmesh-runtime/src/test/java/client/impl/SubClientImpl.java
@@ -27,10 +27,11 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.OPStatus;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public class SubClientImpl extends TCPClient implements SubClient {
private ReceiveMsgHook callback;
- private List<String> topics = new ArrayList<String>();
+ private List<SubscriptionItem> subscriptionItems = new ArrayList<SubscriptionItem>();
private ScheduledFuture<?> task;
@@ -72,9 +73,9 @@ public class SubClientImpl extends TCPClient implements SubClient {
public void reconnect() throws Exception {
super.reconnect();
hello();
- if (!CollectionUtils.isEmpty(topics)) {
- for (String topic : topics) {
- Package request = MessageUtils.subscribe(topic);
+ if (!CollectionUtils.isEmpty(subscriptionItems)) {
+ for (SubscriptionItem item : subscriptionItems) {
+ Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType());
this.dispatcher(request, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}
}
@@ -117,14 +118,9 @@ public class SubClientImpl extends TCPClient implements SubClient {
this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}
- public Package justSubscribe(String serviceId, String scenario, String dcn) throws Exception {
- Package msg = MessageUtils.subscribe();
- return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
- }
-
- public Package justSubscribe(String topic) throws Exception {
- topics.add(topic);
- Package msg = MessageUtils.subscribe(topic);
+ public Package justSubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+ subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+ Package msg = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}
@@ -144,14 +140,9 @@ public class SubClientImpl extends TCPClient implements SubClient {
// this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
// }
- public Package justUnsubscribe(String topic) throws Exception {
- topics.remove(topic);
- Package msg = MessageUtils.unsubscribe(topic);
- return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
- }
-
- public Package justUnsubscribe(String serviceId, String scenario, String dcn) throws Exception {
- Package msg = MessageUtils.unsubscribe();
+ public Package justUnsubscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+ subscriptionItems.remove(topic);
+ Package msg = MessageUtils.unsubscribe(topic, subscriptionMode, subcriptionType);
return this.dispatcher(msg, ClientConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}
diff --git a/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java b/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java
index eb55d0e..fc479ca 100644
--- a/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/AsyncSubClient.java
@@ -19,6 +19,7 @@ package demo;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -26,13 +27,14 @@ import client.common.ClientConstants;
import client.common.MessageUtils;
import client.hook.ReceiveMsgHook;
import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
public class AsyncSubClient {
public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10002, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
- client.justSubscribe(ClientConstants.ASYNC_TOPIC);
+ client.justSubscribe(ClientConstants.ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java b/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java
index f3f29f6..3d9385d 100644
--- a/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/BroadCastSubClient.java
@@ -19,6 +19,7 @@ package demo;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -27,13 +28,14 @@ import client.common.ClientConstants;
import client.common.MessageUtils;
import client.hook.ReceiveMsgHook;
import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
public class BroadCastSubClient {
public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
- client.justSubscribe(ClientConstants.BROADCAST_TOPIC);
+ client.justSubscribe(ClientConstants.BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-runtime/src/test/java/demo/CCSubClient.java b/eventmesh-runtime/src/test/java/demo/CCSubClient.java
index d415743..935a554 100644
--- a/eventmesh-runtime/src/test/java/demo/CCSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/CCSubClient.java
@@ -19,6 +19,7 @@ package demo;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -26,6 +27,7 @@ import client.common.MessageUtils;
import client.common.UserAgentUtils;
import client.hook.ReceiveMsgHook;
import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
public class CCSubClient {
@@ -34,7 +36,7 @@ public class CCSubClient {
subClient.init();
subClient.heartbeat();
subClient.listen();
- subClient.justSubscribe("FT0-s-80000000-01-0");
+ subClient.justSubscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
subClient.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-runtime/src/test/java/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/demo/CClientDemo.java
index 590aa3f..61c4f52 100644
--- a/eventmesh-runtime/src/test/java/demo/CClientDemo.java
+++ b/eventmesh-runtime/src/test/java/demo/CClientDemo.java
@@ -19,8 +19,10 @@ package demo;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +46,8 @@ public class CClientDemo {
EventMeshClientImpl client = new EventMeshClientImpl("127.0.0.1", 10000);
client.init();
client.heartbeat();
- client.justSubscribe(ASYNC_TOPIC);
- client.justSubscribe(BROADCAST_TOPIC);
+ client.justSubscribe(ASYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
+ client.justSubscribe(BROADCAST_TOPIC, SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.listen();
// for (int i = 0; i < 10000; i++) {
// Package rr = null;
diff --git a/eventmesh-runtime/src/test/java/demo/SyncSubClient.java b/eventmesh-runtime/src/test/java/demo/SyncSubClient.java
index 10e13c4..9bc1141 100644
--- a/eventmesh-runtime/src/test/java/demo/SyncSubClient.java
+++ b/eventmesh-runtime/src/test/java/demo/SyncSubClient.java
@@ -19,6 +19,7 @@ package demo;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -26,13 +27,14 @@ import client.common.ClientConstants;
import client.common.MessageUtils;
import client.hook.ReceiveMsgHook;
import client.impl.SubClientImpl;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
public class SyncSubClient {
public static void main(String[] args) throws Exception {
SubClientImpl client = new SubClientImpl("127.0.0.1", 10000, MessageUtils.generateSubServer());
client.init();
client.heartbeat();
- client.justSubscribe(ClientConstants.SYNC_TOPIC);
+ client.justSubscribe(ClientConstants.SYNC_TOPIC, SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
client.registerBusiHandler(new ReceiveMsgHook() {
@Override
public void handle(Package msg, ChannelHandlerContext ctx) {
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index ede5ef0..55b7e16 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -17,8 +17,7 @@
package org.apache.eventmesh.client.http.consumer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -45,6 +44,7 @@ import org.apache.eventmesh.client.tcp.common.EventMeshThreadFactoryImpl;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.ThreadPoolFactory;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientType;
@@ -52,6 +52,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
@@ -67,7 +68,7 @@ public class LiteConsumer extends AbstractLiteClient {
protected LiteClientConfig eventMeshClientConfig;
- private List<String> subscription = Lists.newArrayList();
+ private List<SubscriptionItem> subscription = Lists.newArrayList();
private LiteMessageListener messageListener;
@@ -116,7 +117,7 @@ public class LiteConsumer extends AbstractLiteClient {
logger.info("LiteConsumer shutdown");
}
- public boolean subscribe(List<String> topicList, String url) throws Exception {
+ public boolean subscribe(List<SubscriptionItem> topicList, String url) throws Exception {
subscription.addAll(topicList);
if (!started.get()) {
start();
@@ -146,7 +147,7 @@ public class LiteConsumer extends AbstractLiteClient {
}
- private RequestParam generateSubscribeRequestParam(List<String> topicList, String url) {
+ private RequestParam generateSubscribeRequestParam(List<SubscriptionItem> topicList, String url) {
// final LiteMessage liteMessage = new LiteMessage();
// liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30))
// .setContent("subscribe message")
@@ -170,11 +171,11 @@ public class LiteConsumer extends AbstractLiteClient {
return requestParam;
}
- private RequestParam generateHeartBeatRequestParam(List<String> topics, String url) {
+ private RequestParam generateHeartBeatRequestParam(List<SubscriptionItem> topics, String url) {
List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = new ArrayList<>();
- for (String topic : topics) {
+ for (SubscriptionItem item : topics) {
HeartbeatRequestBody.HeartbeatEntity heartbeatEntity = new HeartbeatRequestBody.HeartbeatEntity();
- heartbeatEntity.topic = topic;
+ heartbeatEntity.topic = item.getTopic();
heartbeatEntity.url = url;
heartbeatEntities.add(heartbeatEntity);
}
@@ -198,7 +199,7 @@ public class LiteConsumer extends AbstractLiteClient {
return requestParam;
}
- public void heartBeat(List<String> topicList, String url) throws Exception {
+ public void heartBeat(List<SubscriptionItem> topicList, String url) throws Exception {
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -234,7 +235,15 @@ public class LiteConsumer extends AbstractLiteClient {
}
public boolean unsubscribe(List<String> topicList, String url) throws Exception {
- subscription.removeAll(topicList);
+ Set<String> unSub = new HashSet<>(topicList);
+ Iterator<SubscriptionItem> itr = subscription.iterator();
+ while(itr.hasNext()) {
+ SubscriptionItem item = itr.next();
+ if (unSub.contains(item.getTopic())) {
+ itr.remove();
+ }
+ }
+
RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url);
long startTime = System.currentTimeMillis();
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java
index 4e11260..afdb90a 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java
@@ -19,7 +19,9 @@ package org.apache.eventmesh.client.tcp;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
public interface EventMeshClient {
@@ -39,7 +41,7 @@ public interface EventMeshClient {
void listen() throws Exception;
- void subscribe(String topic) throws Exception;
+ void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
void unsubscribe() throws Exception;
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java
index 251fd93..3803edc 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimpleSubClient.java
@@ -19,6 +19,8 @@ package org.apache.eventmesh.client.tcp;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
public interface SimpleSubClient {
@@ -30,7 +32,7 @@ public interface SimpleSubClient {
void reconnect() throws Exception;
- void subscribe(String topic) throws Exception;
+ void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception;
void unsubscribe() throws Exception;
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 4122a23..87ef68a 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -21,10 +21,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.Header;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
public class MessageUtils {
@@ -55,10 +56,10 @@ public class MessageUtils {
return msg;
}
- public static Package subscribe(String topic) {
+ public static Package subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
Package msg = new Package();
msg.setHeader(new Header(Command.SUBSCRIBE_REQUEST, 0, null, generateRandomString(seqLength)));
- msg.setBody(generateSubscription(topic));
+ msg.setBody(generateSubscription(topic, subscriptionMode, subcriptionType));
return msg;
}
@@ -130,11 +131,11 @@ public class MessageUtils {
return user;
}
- private static Subscription generateSubscription(String topic) {
+ private static Subscription generateSubscription(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) {
Subscription subscription = new Subscription();
- List<String> topicList = new ArrayList<>();
- topicList.add(topic);
- subscription.setTopicList(topicList);
+ List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+ subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+ subscription.setTopicList(subscriptionItems);
return subscription;
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java
index 4cc14eb..8e645fa 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java
@@ -24,7 +24,9 @@ import org.apache.eventmesh.client.tcp.SimpleSubClient;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
public class DefaultEventMeshClient implements EventMeshClient {
@@ -95,8 +97,8 @@ public class DefaultEventMeshClient implements EventMeshClient {
}
@Override
- public void subscribe(String topic) throws Exception {
- this.subClient.subscribe(topic);
+ public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+ this.subClient.subscribe(topic, subscriptionMode, subcriptionType);
}
@Override
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
index 0579184..38d52f6 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
@@ -33,10 +33,12 @@ import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
-import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.protocol.tcp.*;
+import org.apache.eventmesh.common.protocol.tcp.Package;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +50,7 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
private ReceiveMsgHook callback;
- private List<String> topics = new ArrayList<String>();
+ private List<SubscriptionItem> subscriptionItems = new ArrayList<SubscriptionItem>();
private ScheduledFuture<?> task;
@@ -70,9 +72,9 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
public void reconnect() throws Exception {
super.reconnect();
hello();
- if (!CollectionUtils.isEmpty(topics)) {
- for (String topic : topics) {
- Package request = MessageUtils.subscribe(topic);
+ if (!CollectionUtils.isEmpty(subscriptionItems)) {
+ for (SubscriptionItem item : subscriptionItems) {
+ Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType());
this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
}
}
@@ -121,9 +123,9 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
}
- public void subscribe(String topic) throws Exception {
- topics.add(topic);
- Package request = MessageUtils.subscribe(topic);
+ public void subscribe(String topic, SubscriptionMode subscriptionMode, SubcriptionType subcriptionType) throws Exception {
+ subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subcriptionType));
+ Package request = MessageUtils.subscribe(topic, subscriptionMode, subcriptionType);
this.io(request, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
}
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java
index 319e428..12da846 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java
@@ -24,6 +24,8 @@ import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -45,7 +47,7 @@ public class AsyncSubscribe implements ReceiveMsgHook {
client.init();
client.heartbeat();
- client.subscribe("FT0-e-80010000-01-1");
+ client.subscribe("FT0-e-80010000-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java
index 8b77e1a..e21ee45 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java
@@ -24,6 +24,8 @@ import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.protocol.tcp.Package;
@@ -45,7 +47,7 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
client.init();
client.heartbeat();
- client.subscribe("FT0-e-80030001-01-3");
+ client.subscribe("FT0-e-80030001-01-3", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java
index b4fbbf7..756150a 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java
@@ -23,6 +23,8 @@ import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class SyncResponse implements ReceiveMsgHook {
client.init();
client.heartbeat();
- client.subscribe("FT0-s-80000000-01-0");
+ client.subscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
//同步RR消息
client.registerSubBusiHandler(handler);
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
index ed87688..3fcfa3f 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java
@@ -19,6 +19,7 @@
package org.apache.eventmesh.http.demo.sub.service;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -29,6 +30,9 @@ import org.apache.eventmesh.client.http.consumer.LiteConsumer;
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.ThreadUtil;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.http.demo.AsyncPublishInstance;
import org.apache.eventmesh.util.Utils;
import org.slf4j.Logger;
@@ -48,7 +52,7 @@ public class SubService implements InitializingBean {
final Properties properties = Utils.readPropertiesFile("application.properties");
- final List<String> topicList = Arrays.asList("FT0-e-80010001-01-1");
+ final List<SubscriptionItem> topicList = Arrays.asList(new SubscriptionItem("FT0-e-80010001-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC));
final String localIp = IPUtil.getLocalAddress();
final String localPort = properties.getProperty("server.port");
final String eventMeshIp = properties.getProperty("eventmesh.ip");
@@ -100,7 +104,11 @@ public class SubService implements InitializingBean {
public void cleanup() {
logger.info("start destory ....");
try {
- liteConsumer.unsubscribe(topicList, url);
+ List<String> unSubList = new ArrayList<>();
+ for (SubscriptionItem item:topicList) {
+ unSubList.add(item.getTopic());
+ }
+ liteConsumer.unsubscribe(unSubList, url);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java
index 1f7d3c8..4cde0ee 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java
@@ -24,8 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
@@ -50,7 +52,7 @@ public class AsyncSubscribe implements ReceiveMsgHook {
client.init();
client.heartbeat();
- client.subscribe("FT0-e-80010000-01-1");
+ client.subscribe("FT0-e-80010000-01-1", SubscriptionMode.CLUSTERING, SubcriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java
index 74bdaaa..cf2323a 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java
@@ -24,8 +24,10 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
@@ -50,7 +52,7 @@ public class AsyncSubscribeBroadcast implements ReceiveMsgHook {
client.init();
client.heartbeat();
- client.subscribe("FT0-e-80030000-01-3");
+ client.subscribe("FT0-e-80030000-01-3", SubscriptionMode.BROADCASTING, SubcriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
index 969163d..c4cb72e 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java
@@ -22,7 +22,9 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.client.tcp.EventMeshClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient;
+import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class SyncResponse implements ReceiveMsgHook {
client.init();
client.heartbeat();
- client.subscribe("FT0-s-80000000-01-0");
+ client.subscribe("FT0-s-80000000-01-0", SubscriptionMode.CLUSTERING, SubcriptionType.SYNC);
//同步RR消息
client.registerSubBusiHandler(handler);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org