You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/10/14 14:11:59 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #550] Remove
unused jar in plugin module (#551)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new b652be9 [ISSUE #550] Remove unused jar in plugin module (#551)
b652be9 is described below
commit b652be9f3cfee746272e87f8fc4b7f8b52bb05e5
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Oct 14 22:11:53 2021 +0800
[ISSUE #550] Remove unused jar in plugin module (#551)
* optimize build.gradle, remove unused jar in plugin module
* resolve license check
* format code
* fix doc
---
build.gradle | 1 -
.../instructions/eventmesh-runtime-quickstart.md | 4 +-
.../instructions/eventmesh-runtime-quickstart.md | 2 +-
eventmesh-common/build.gradle | 40 +-
.../eventmesh/common/command/HttpCommand.java | 52 +--
.../eventmesh/common/exception/JsonException.java | 23 +-
.../http/body/client/HeartbeatRequestBody.java | 36 +-
.../protocol/http/body/client/RegRequestBody.java | 31 +-
.../http/body/client/SubscribeRequestBody.java | 29 +-
.../http/body/client/UnRegRequestBody.java | 31 +-
.../http/body/client/UnSubscribeRequestBody.java | 29 +-
.../http/body/message/PushMessageRequestBody.java | 44 +--
.../http/body/message/ReplyMessageRequestBody.java | 42 ++-
.../body/message/SendMessageBatchRequestBody.java | 49 +--
.../http/body/message/SendMessageRequestBody.java | 49 +--
.../apache/eventmesh/common/utils/JsonUtils.java | 83 +++++
.../eventmesh-connector-api/build.gradle | 4 +-
.../eventmesh-connector-rocketmq/build.gradle | 35 +-
.../connector/rocketmq/producer/ProducerImpl.java | 84 +++--
.../eventmesh-connector-standalone/build.gradle | 4 +-
eventmesh-examples/build.gradle | 6 +
.../http/demo/sub/controller/SubController.java | 16 +-
.../eventmesh-registry-api/build.gradle | 2 +-
.../eventmesh-registry-namesrv/build.gradle | 2 +-
eventmesh-runtime/build.gradle | 29 +-
.../protocol/http/consumer/ConsumerManager.java | 120 +++---
.../http/processor/SendSyncMessageProcessor.java | 230 +++++++-----
.../http/processor/SubscribeProcessor.java | 135 ++++---
.../http/processor/UnSubscribeProcessor.java | 175 +++++----
.../core/protocol/http/processor/inf/Client.java | 48 +--
.../protocol/http/push/AsyncHTTPPushRequest.java | 136 ++++---
.../tcp/client/group/ClientGroupWrapper.java | 405 ++++++++++-----------
eventmesh-schema-registry/build.gradle | 16 -
.../eventmesh-schema-registry-server/build.gradle | 3 -
eventmesh-sdk-java/build.gradle | 18 +
.../client/http/consumer/LiteConsumer.java | 169 +++++----
.../client/http/producer/LiteProducer.java | 176 +++++----
.../producer/RRCallbackResponseHandlerAdapter.java | 45 ++-
.../eventmesh-security-acl/build.gradle | 2 +-
.../eventmesh-security-api/build.gradle | 2 +-
eventmesh-spi/build.gradle | 2 +-
.../spi/loader/JarExtensionClassLoader.java | 52 ++-
style/checkStyle.xml | 11 +-
tool/license/allowed-licenses.txt | 4 +-
44 files changed, 1387 insertions(+), 1089 deletions(-)
diff --git a/build.gradle b/build.gradle
index 889d2d5..32efd44 100644
--- a/build.gradle
+++ b/build.gradle
@@ -389,7 +389,6 @@ subprojects {
dependency "org.apache.logging.log4j:log4j-web:2.13.3"
dependency "com.lmax:disruptor:3.4.2"
- dependency "com.alibaba:fastjson:1.2.71"
dependency "com.fasterxml.jackson.core:jackson-databind:2.11.0"
dependency "com.fasterxml.jackson.core:jackson-core:2.11.0"
diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.md b/docs/cn/instructions/eventmesh-runtime-quickstart.md
index 6750674..684a0f6 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.md
@@ -20,7 +20,7 @@ Gradle至少为7.0, 推荐 7.0.*
```$ xslt
unzip EventMesh-master.zip
cd / *您的部署路径* /EventMesh-master
-gradle clean dist copyConnectorPlugin tar -x test
+gradle clean dist
```
您将在目录/ *您的部署路径* /EventMesh-master/eventmesh-runtime/dist中获得**eventmesh-runtime_1.0.0.tar.gz**
@@ -68,7 +68,7 @@ sh start.sh
> 插件实例需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件,文件名为SPI接口全类名.
> 文件内容为插件实例名到插件实例的映射, 具体可以参考eventmesh-connector-rocketmq插件模块
-插件可以从classpath和插件目录下面加载. 在本地开发阶段可以将使用的插件在eventmesh-starter模块build.gradle中进行声明,或者执行gradle的copyConnectorPlugin任务
+插件可以从classpath和插件目录下面加载. 在本地开发阶段可以将使用的插件在eventmesh-starter模块build.gradle中进行声明,或者执行gradle的dist任务
将插件拷贝至dist/plugin目录下, eventmesh默认会加载项目下dist/plugin目录下的插件, 加载目录可以通过-DeventMeshPluginDir=your_plugin_directory来改变插件目录.
运行时需要使用的插件实例可以在eventmesh.properties中进行配置.如果需要使用rocketmq插件实行快速启动,需要在eventmesh-starter模块build.gradle中进行如下声明
```
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index e3f7228..dd33e72 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -20,7 +20,7 @@ You will get **EventMesh-master.zip**
```$xslt
unzip EventMesh-master.zip
cd /*YOUR DEPLOY PATH*/EventMesh-master
-gradle clean dist copyConnectorPlugin tar -x test
+gradle clean dist
```
You will get **EventMesh_1.3.0-SNAPSHOT.tar.gz** in directory /* YOUR DEPLOY PATH */EventMesh-master/build
diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle
index 3e160e0..b552781 100644
--- a/eventmesh-common/build.gradle
+++ b/eventmesh-common/build.gradle
@@ -18,36 +18,31 @@
dependencies {
api "org.apache.commons:commons-lang3"
api "org.apache.commons:commons-collections4"
- api "commons-io:commons-io"
- api "org.apache.commons:commons-text"
-
api "com.google.guava:guava"
-
api "org.slf4j:slf4j-api"
- api "org.apache.logging.log4j:log4j-api"
- api "org.apache.logging.log4j:log4j-core"
- api "org.apache.logging.log4j:log4j-core"
- api "org.apache.logging.log4j:log4j-slf4j-impl"
- api "org.apache.logging.log4j:log4j-web"
+ api "junit:junit"
+ api "org.assertj:assertj-core"
- api "com.lmax:disruptor"
- api "com.alibaba:fastjson"
+ implementation "commons-io:commons-io"
+ implementation "org.apache.commons:commons-text"
- api "com.fasterxml.jackson.core:jackson-databind"
- api "com.fasterxml.jackson.core:jackson-core"
- api "com.fasterxml.jackson.core:jackson-annotations"
+ implementation "org.apache.logging.log4j:log4j-api"
+ implementation "org.apache.logging.log4j:log4j-core"
+ implementation "org.apache.logging.log4j:log4j-core"
+ implementation "org.apache.logging.log4j:log4j-slf4j-impl"
+ implementation "org.apache.logging.log4j:log4j-web"
- api "org.apache.httpcomponents:httpclient"
+ implementation "com.lmax:disruptor"
- api "io.netty:netty-all"
+ implementation "com.fasterxml.jackson.core:jackson-databind"
+ implementation "com.fasterxml.jackson.core:jackson-core"
+ implementation "com.fasterxml.jackson.core:jackson-annotations"
- api "junit:junit"
- api "com.github.stefanbirkner:system-rules"
- api "org.assertj:assertj-core"
+ implementation "org.apache.httpcomponents:httpclient"
+
+ implementation "io.netty:netty-all"
- api "org.mockito:mockito-core"
- api "org.powermock:powermock-module-junit4"
- api "org.powermock:powermock-api-mockito2"
+ implementation "com.github.stefanbirkner:system-rules"
testImplementation "org.apache.commons:commons-lang3"
testImplementation "org.apache.commons:commons-collections4"
@@ -64,7 +59,6 @@ dependencies {
testImplementation "org.apache.logging.log4j:log4j-web"
testImplementation "com.lmax:disruptor"
- testImplementation "com.alibaba:fastjson"
testImplementation "com.fasterxml.jackson.core:jackson-databind"
testImplementation "com.fasterxml.jackson.core:jackson-core"
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
index 65bc770..b6c61d6 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/command/HttpCommand.java
@@ -17,20 +17,27 @@
package org.apache.eventmesh.common.command;
-import com.alibaba.fastjson.JSON;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http.*;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.http.body.BaseResponseBody;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader;
import org.apache.eventmesh.common.protocol.http.header.Header;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
public class HttpCommand {
private static AtomicLong requestId = new AtomicLong(0);
@@ -178,18 +185,18 @@ public class HttpCommand {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("httpCommand={")
- .append(cmdType).append(",")
- .append(httpMethod).append("/").append(httpVersion).append(",")
- .append("requestCode=").append(requestCode).append(",")
- .append("opaque=").append(opaque).append(",");
+ .append(cmdType).append(",")
+ .append(httpMethod).append("/").append(httpVersion).append(",")
+ .append("requestCode=").append(requestCode).append(",")
+ .append("opaque=").append(opaque).append(",");
if (cmdType == CmdType.RES) {
sb.append("cost=").append(resTime - reqTime).append(",");
}
sb.append("header=").append(header).append(",")
- .append("body=").append(body)
- .append("}");
+ .append("body=").append(body)
+ .append("}");
return sb.toString();
}
@@ -197,17 +204,17 @@ public class HttpCommand {
public String abstractDesc() {
StringBuilder sb = new StringBuilder();
sb.append("httpCommand={")
- .append(cmdType).append(",")
- .append(httpMethod).append("/").append(httpVersion).append(",")
- .append("requestCode=").append(requestCode).append(",")
- .append("opaque=").append(opaque).append(",");
+ .append(cmdType).append(",")
+ .append(httpMethod).append("/").append(httpVersion).append(",")
+ .append("requestCode=").append(requestCode).append(",")
+ .append("opaque=").append(opaque).append(",");
if (cmdType == CmdType.RES) {
sb.append("cost=").append(resTime - reqTime).append(",");
}
sb.append("header=").append(header).append(",")
- .append("bodySize=").append(body.toString().length()).append("}");
+ .append("bodySize=").append(body.toString().length()).append("}");
return sb.toString();
}
@@ -215,9 +222,9 @@ public class HttpCommand {
public String simpleDesc() {
StringBuilder sb = new StringBuilder();
sb.append("httpCommand={")
- .append(cmdType).append(",")
- .append(httpMethod).append("/").append(httpVersion).append(",")
- .append("requestCode=").append(requestCode).append("}");
+ .append(cmdType).append(",")
+ .append(httpMethod).append("/").append(httpVersion).append(",")
+ .append("requestCode=").append(requestCode).append("}");
return sb.toString();
}
@@ -228,10 +235,11 @@ public class HttpCommand {
}
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus.OK,
- Unpooled.wrappedBuffer(JSON.toJSONString(this.getBody()).getBytes(Constants.DEFAULT_CHARSET)));
+ HttpResponseStatus.OK,
+ Unpooled.wrappedBuffer(
+ JsonUtils.serialize(this.getBody()).getBytes(Constants.DEFAULT_CHARSET)));
response.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN +
- "; charset=" + Constants.DEFAULT_CHARSET);
+ "; charset=" + Constants.DEFAULT_CHARSET);
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
Map<String, Object> customHeader = this.getHeader().toMap();
diff --git a/eventmesh-schema-registry/build.gradle b/eventmesh-common/src/main/java/org/apache/eventmesh/common/exception/JsonException.java
similarity index 69%
copy from eventmesh-schema-registry/build.gradle
copy to eventmesh-common/src/main/java/org/apache/eventmesh/common/exception/JsonException.java
index 96a33a1..50f4bfa 100644
--- a/eventmesh-schema-registry/build.gradle
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/exception/JsonException.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-plugins {
-}
-
-group 'org.apache.eventmesh'
-version '1.2.0-SNAPSHOT'
+package org.apache.eventmesh.common.exception;
-repositories {
- mavenCentral()
-}
+/**
+ * Json format exception, see {@link org.apache.eventmesh.common.utils.JsonUtils}.
+ */
+public class JsonException extends RuntimeException {
-dependencies {
-}
+ public JsonException(String message) {
+ super(message);
+ }
-test {
- useJUnitPlatform()
+ public JsonException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
index 751b632..0804098 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/HeartbeatRequestBody.java
@@ -17,21 +17,22 @@
package org.apache.eventmesh.common.protocol.http.body.client;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
public class HeartbeatRequestBody extends Body {
- public static final String CLIENTTYPE = "clientType";
+ public static final String CLIENTTYPE = "clientType";
public static final String HEARTBEATENTITIES = "heartbeatEntities";
- public static final String CONSUMERGROUP = "consumerGroup";
+ public static final String CONSUMERGROUP = "consumerGroup";
private String consumerGroup;
@@ -67,16 +68,19 @@ public class HeartbeatRequestBody extends Body {
HeartbeatRequestBody body = new HeartbeatRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
- body.setHeartbeatEntities(JSONArray.parseArray(MapUtils.getString(bodyParam, HEARTBEATENTITIES), HeartbeatEntity.class));
+ body.setHeartbeatEntities(JsonUtils
+ .deserialize(MapUtils.getString(bodyParam, HEARTBEATENTITIES),
+ new TypeReference<List<HeartbeatEntity>>() {
+ }));
return body;
}
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<String, Object>();
+ Map<String, Object> map = new HashMap<>();
map.put(CLIENTTYPE, clientType);
map.put(CONSUMERGROUP, consumerGroup);
- map.put(HEARTBEATENTITIES, JSON.toJSONString(heartbeatEntities));
+ map.put(HEARTBEATENTITIES, JsonUtils.serialize(heartbeatEntities));
return map;
}
@@ -90,10 +94,10 @@ public class HeartbeatRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("heartbeatEntity={")
- .append("topic=").append(topic).append(",")
- .append("serviceId=").append(serviceId).append(",")
- .append("instanceId=").append(instanceId).append(",")
- .append("url=").append(url).append("}");
+ .append("topic=").append(topic).append(",")
+ .append("serviceId=").append(serviceId).append(",")
+ .append("instanceId=").append(instanceId).append(",")
+ .append("url=").append(url).append("}");
return sb.toString();
}
}
@@ -102,8 +106,8 @@ public class HeartbeatRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("heartbeatRequestBody={")
- .append("consumerGroup=").append(consumerGroup).append(",")
- .append("clientType=").append(clientType).append("}");
+ .append("consumerGroup=").append(consumerGroup).append(",")
+ .append("clientType=").append(clientType).append("}");
return sb.toString();
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
index 78bb684..21a7beb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/RegRequestBody.java
@@ -17,16 +17,17 @@
package org.apache.eventmesh.common.protocol.http.body.client;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
public class RegRequestBody extends Body {
@@ -70,25 +71,27 @@ public class RegRequestBody extends Body {
RegRequestBody body = new RegRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
- body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
+ body.setTopics(JsonUtils.deserialize(MapUtils.getString(bodyParam, TOPICS),
+ new TypeReference<List<SubscriptionItem>>() {
+ }));
return body;
}
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<String, Object>();
+ Map<String, Object> map = new HashMap<>();
map.put(CLIENTTYPE, clientType);
map.put(ENDPOINT, endPoint);
- map.put(TOPICS, JSON.toJSONString(topics));
+ map.put(TOPICS, JsonUtils.serialize(topics));
return map;
}
@Override
public String toString() {
- return "regRequestBody{" +
- "clientType='" + clientType + '\'' +
- ", endPoint='" + endPoint + '\'' +
- ", topics=" + topics +
- '}';
+ return "regRequestBody{"
+ + "clientType='" + clientType + '\''
+ + ", endPoint='" + endPoint + '\''
+ + ", topics=" + topics
+ + '}';
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
index 72e2e91..156cc3c 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/SubscribeRequestBody.java
@@ -17,16 +17,17 @@
package org.apache.eventmesh.common.protocol.http.body.client;
+import org.apache.eventmesh.common.protocol.SubscriptionItem;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.SubscriptionItem;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
public class SubscribeRequestBody extends Body {
@@ -69,7 +70,9 @@ public class SubscribeRequestBody extends Body {
public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
SubscribeRequestBody body = new SubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
- body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
+ body.setTopics(JsonUtils.deserialize(MapUtils.getString(bodyParam, TOPIC),
+ new TypeReference<List<SubscriptionItem>>() {
+ }));
body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
return body;
}
@@ -78,17 +81,17 @@ public class SubscribeRequestBody extends Body {
public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<String, Object>();
map.put(URL, url);
- map.put(TOPIC, JSON.toJSONString(topics));
+ map.put(TOPIC, JsonUtils.serialize(topics));
map.put(CONSUMERGROUP, consumerGroup);
return map;
}
@Override
public String toString() {
- return "subscribeBody{" +
- "consumerGroup='" + consumerGroup + '\'' +
- ", url='" + url + '\'' +
- ", topics=" + topics +
- '}';
+ return "subscribeBody{"
+ + "consumerGroup='" + consumerGroup + '\''
+ + ", url='" + url + '\''
+ + ", topics=" + topics
+ + '}';
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java
index fe44595..b20f4c2 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnRegRequestBody.java
@@ -17,15 +17,16 @@
package org.apache.eventmesh.common.protocol.http.body.client;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
public class UnRegRequestBody extends Body {
@@ -56,15 +57,17 @@ public class UnRegRequestBody extends Body {
public static UnRegRequestBody buildBody(Map<String, Object> bodyParam) {
UnRegRequestBody body = new UnRegRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
- body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), UnRegTopicEntity.class));
+ body.setTopics(JsonUtils.deserialize(MapUtils.getString(bodyParam, TOPICS),
+ new TypeReference<List<UnRegTopicEntity>>() {
+ }));
return body;
}
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<String, Object>();
+ Map<String, Object> map = new HashMap<>();
map.put(CLIENTTYPE, clientType);
- map.put(TOPICS, JSON.toJSONString(topics));
+ map.put(TOPICS, JsonUtils.serialize(topics));
return map;
}
@@ -72,9 +75,9 @@ public class UnRegRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("regRequestBody={")
- .append("clientType=").append(clientType)
- .append("topics=").append(topics)
- .append("}");
+ .append("clientType=").append(clientType)
+ .append("topics=").append(topics)
+ .append("}");
return sb.toString();
}
@@ -87,9 +90,9 @@ public class UnRegRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("unRegTopicEntity={")
- .append("topic=").append(topic).append(",")
- .append("serviceId=").append(serviceId).append(",")
- .append("instanceId=").append(instanceId).append("}");
+ .append("topic=").append(topic).append(",")
+ .append("serviceId=").append(serviceId).append(",")
+ .append("instanceId=").append(instanceId).append("}");
return sb.toString();
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
index 756f1a9..5904a96 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/client/UnSubscribeRequestBody.java
@@ -17,15 +17,16 @@
package org.apache.eventmesh.common.protocol.http.body.client;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
public class UnSubscribeRequestBody extends Body {
@@ -68,26 +69,28 @@ public class UnSubscribeRequestBody extends Body {
public static UnSubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
UnSubscribeRequestBody body = new UnSubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
- body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
+ body.setTopics(JsonUtils
+ .deserialize(MapUtils.getString(bodyParam, TOPIC), new TypeReference<List<String>>() {
+ }));
body.setConsumerGroup(MapUtils.getString(bodyParam, CONSUMERGROUP));
return body;
}
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<String, Object>();
+ Map<String, Object> map = new HashMap<>();
map.put(URL, url);
- map.put(TOPIC, JSON.toJSONString(topics));
+ map.put(TOPIC, JsonUtils.serialize(topics));
map.put(CONSUMERGROUP, consumerGroup);
return map;
}
@Override
public String toString() {
- return "unSubscribeRequestBody{" +
- "consumerGroup='" + consumerGroup + '\'' +
- ", url='" + url + '\'' +
- ", topics=" + topics +
- '}';
+ return "unSubscribeRequestBody{"
+ + "consumerGroup='" + consumerGroup + '\''
+ + ", url='" + url + '\''
+ + ", topics=" + topics
+ + '}';
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java
index accefa6..fad1e95 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/PushMessageRequestBody.java
@@ -17,23 +17,24 @@
package org.apache.eventmesh.common.protocol.http.body.message;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
public class PushMessageRequestBody extends Body {
- public static final String RANDOMNO = "randomNo";
- public static final String TOPIC = "topic";
- public static final String BIZSEQNO = "bizSeqNo";
- public static final String UNIQUEID = "uniqueId";
- public static final String CONTENT = "content";
+ public static final String RANDOMNO = "randomNo";
+ public static final String TOPIC = "topic";
+ public static final String BIZSEQNO = "bizSeqNo";
+ public static final String UNIQUEID = "uniqueId";
+ public static final String CONTENT = "content";
public static final String EXTFIELDS = "extFields";
private String randomNo;
@@ -96,7 +97,6 @@ public class PushMessageRequestBody extends Body {
this.extFields = extFields;
}
- @SuppressWarnings("unchecked")
public static PushMessageRequestBody buildBody(final Map<String, Object> bodyParam) {
PushMessageRequestBody pushMessageRequestBody = new PushMessageRequestBody();
pushMessageRequestBody.setContent(MapUtils.getString(bodyParam, CONTENT));
@@ -107,20 +107,22 @@ public class PushMessageRequestBody extends Body {
String extFields = MapUtils.getString(bodyParam, EXTFIELDS);
if (StringUtils.isNotBlank(extFields)) {
- pushMessageRequestBody.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
+ pushMessageRequestBody.setExtFields(
+ JsonUtils.deserialize(extFields, new TypeReference<HashMap<String, String>>() {
+ }));
}
return pushMessageRequestBody;
}
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<String, Object>();
+ Map<String, Object> map = new HashMap<>();
map.put(RANDOMNO, randomNo);
map.put(TOPIC, topic);
map.put(CONTENT, content);
map.put(BIZSEQNO, bizSeqNo);
map.put(UNIQUEID, uniqueId);
- map.put(EXTFIELDS, JSON.toJSONString(extFields));
+ map.put(EXTFIELDS, JsonUtils.serialize(extFields));
return map;
}
@@ -129,12 +131,12 @@ public class PushMessageRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("pushMessageRequestBody={")
- .append("randomNo=").append(randomNo).append(",")
- .append("topic=").append(topic).append(",")
- .append("bizSeqNo=").append(bizSeqNo).append(",")
- .append("uniqueId=").append(uniqueId).append(",")
- .append("content=").append(content).append(",")
- .append("extFields=").append(extFields).append("}");
+ .append("randomNo=").append(randomNo).append(",")
+ .append("topic=").append(topic).append(",")
+ .append("bizSeqNo=").append(bizSeqNo).append(",")
+ .append("uniqueId=").append(uniqueId).append(",")
+ .append("content=").append(content).append(",")
+ .append("extFields=").append(extFields).append("}");
return sb.toString();
}
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
index e2de019..93fa8b5 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/ReplyMessageRequestBody.java
@@ -17,23 +17,24 @@
package org.apache.eventmesh.common.protocol.http.body.message;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
public class ReplyMessageRequestBody extends Body {
- public static final String ORIGTOPIC = "origTopic";
- public static final String BIZSEQNO = "bizSeqNo";
- public static final String UNIQUEID = "uniqueId";
- public static final String CONTENT = "content";
- public static final String EXTFIELDS = "extFields";
+ public static final String ORIGTOPIC = "origTopic";
+ public static final String BIZSEQNO = "bizSeqNo";
+ public static final String UNIQUEID = "uniqueId";
+ public static final String CONTENT = "content";
+ public static final String EXTFIELDS = "extFields";
public static final String PRODUCERGROUP = "producerGroup";
private String bizSeqNo;
@@ -96,7 +97,6 @@ public class ReplyMessageRequestBody extends Body {
this.producerGroup = producerGroup;
}
- @SuppressWarnings("unchecked")
public static ReplyMessageRequestBody buildBody(Map<String, Object> bodyParam) {
ReplyMessageRequestBody body = new ReplyMessageRequestBody();
body.setBizSeqNo(MapUtils.getString(bodyParam, BIZSEQNO));
@@ -105,7 +105,9 @@ public class ReplyMessageRequestBody extends Body {
body.setOrigTopic(MapUtils.getString(bodyParam, ORIGTOPIC));
String extFields = MapUtils.getString(bodyParam, EXTFIELDS);
if (StringUtils.isNotBlank(extFields)) {
- body.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
+ body.setExtFields(
+ JsonUtils.deserialize(extFields, new TypeReference<HashMap<String, String>>() {
+ }));
}
body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
return body;
@@ -115,12 +117,12 @@ public class ReplyMessageRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("replyMessageRequestBody={")
- .append("bizSeqNo=").append(bizSeqNo).append(",")
- .append("uniqueId=").append(uniqueId).append(",")
- .append("origTopic=").append(origTopic).append(",")
- .append("content=").append(content).append(",")
- .append("producerGroup=").append(producerGroup).append(",")
- .append("extFields=").append(extFields).append("}");
+ .append("bizSeqNo=").append(bizSeqNo).append(",")
+ .append("uniqueId=").append(uniqueId).append(",")
+ .append("origTopic=").append(origTopic).append(",")
+ .append("content=").append(content).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
+ .append("extFields=").append(extFields).append("}");
return sb.toString();
}
@@ -131,7 +133,7 @@ public class ReplyMessageRequestBody extends Body {
map.put(ORIGTOPIC, origTopic);
map.put(UNIQUEID, uniqueId);
map.put(CONTENT, content);
- map.put(EXTFIELDS, JSON.toJSONString(extFields));
+ map.put(EXTFIELDS, JsonUtils.serialize(extFields));
map.put(PRODUCERGROUP, producerGroup);
return map;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
index 576e352..0b544c4 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageBatchRequestBody.java
@@ -17,22 +17,23 @@
package org.apache.eventmesh.common.protocol.http.body.message;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+import com.fasterxml.jackson.core.type.TypeReference;
public class SendMessageBatchRequestBody extends Body {
- public static final String BATCHID = "batchId";
- public static final String CONTENTS = "contents";
- public static final String SIZE = "size";
+ public static final String BATCHID = "batchId";
+ public static final String CONTENTS = "contents";
+ public static final String SIZE = "size";
public static final String PRODUCERGROUP = "producerGroup";
private String batchId;
@@ -82,10 +83,10 @@ public class SendMessageBatchRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("sendMessageBatchRequestBody={")
- .append("batchId=").append(batchId).append(",")
- .append("size=").append(size).append(",")
- .append("producerGroup=").append(producerGroup).append(",")
- .append("contents=").append(JSON.toJSONString(contents)).append("}");
+ .append("batchId=").append(batchId).append(",")
+ .append("size=").append(size).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
+ .append("contents=").append(JsonUtils.serialize(contents)).append("}");
return sb.toString();
}
@@ -100,27 +101,29 @@ public class SendMessageBatchRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("batchMessageEntity={")
- .append("bizSeqNo=").append(bizSeqNo).append(",")
- .append("topic=").append(topic).append(",")
- .append("msg=").append(msg).append(",")
- .append("ttl=").append(ttl).append(",")
- .append("tag=").append(tag).append("}");
+ .append("bizSeqNo=").append(bizSeqNo).append(",")
+ .append("topic=").append(topic).append(",")
+ .append("msg=").append(msg).append(",")
+ .append("ttl=").append(ttl).append(",")
+ .append("tag=").append(tag).append("}");
return sb.toString();
}
}
public static SendMessageBatchRequestBody buildBody(final Map<String, Object> bodyParam) {
String batchId = MapUtils.getString(bodyParam,
- BATCHID);
+ BATCHID);
String size = StringUtils.isBlank(MapUtils.getString(bodyParam,
- SIZE)) ? "1" : MapUtils.getString(bodyParam,
- SIZE);
+ SIZE)) ? "1" : MapUtils.getString(bodyParam,
+ SIZE);
String contents = MapUtils.getString(bodyParam,
- CONTENTS, null);
+ CONTENTS, null);
SendMessageBatchRequestBody body = new SendMessageBatchRequestBody();
body.setBatchId(batchId);
if (StringUtils.isNotBlank(contents)) {
- body.setContents(JSONArray.parseArray(contents, BatchMessageEntity.class));
+ body.setContents(
+ JsonUtils.deserialize(contents, new TypeReference<List<BatchMessageEntity>>() {
+ }));
}
body.setSize(size);
body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
index 5c302e8..196a9cc 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/body/message/SendMessageRequestBody.java
@@ -17,24 +17,26 @@
package org.apache.eventmesh.common.protocol.http.body.message;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.fastjson.JSONObject;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.common.protocol.http.body.Body;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
public class SendMessageRequestBody extends Body {
- public static final String TOPIC = "topic";
- public static final String BIZSEQNO = "bizSeqNo";
- public static final String UNIQUEID = "uniqueId";
- public static final String CONTENT = "content";
- public static final String TTL = "ttl";
- public static final String TAG = "tag";
- public static final String EXTFIELDS = "extFields";
+ public static final String TOPIC = "topic";
+ public static final String BIZSEQNO = "bizSeqNo";
+ public static final String UNIQUEID = "uniqueId";
+ public static final String CONTENT = "content";
+ public static final String TTL = "ttl";
+ public static final String TAG = "tag";
+ public static final String EXTFIELDS = "extFields";
public static final String PRODUCERGROUP = "producerGroup";
private String topic;
@@ -117,7 +119,6 @@ public class SendMessageRequestBody extends Body {
this.producerGroup = producerGroup;
}
- @SuppressWarnings("unchecked")
public static SendMessageRequestBody buildBody(Map<String, Object> bodyParam) {
SendMessageRequestBody body = new SendMessageRequestBody();
body.setTopic(MapUtils.getString(bodyParam, TOPIC));
@@ -128,7 +129,9 @@ public class SendMessageRequestBody extends Body {
body.setContent(MapUtils.getString(bodyParam, CONTENT));
String extFields = MapUtils.getString(bodyParam, EXTFIELDS);
if (StringUtils.isNotBlank(extFields)) {
- body.setExtFields((HashMap<String, String>) JSONObject.parseObject(extFields, HashMap.class));
+ body.setExtFields(
+ JsonUtils.deserialize(extFields, new TypeReference<HashMap<String, String>>() {
+ }));
}
body.setProducerGroup(MapUtils.getString(bodyParam, PRODUCERGROUP));
return body;
@@ -136,7 +139,7 @@ public class SendMessageRequestBody extends Body {
@Override
public Map<String, Object> toMap() {
- Map<String, Object> map = new HashMap<String, Object>();
+ Map<String, Object> map = new HashMap<>();
map.put(TOPIC, topic);
map.put(BIZSEQNO, bizSeqNo);
map.put(UNIQUEID, uniqueId);
@@ -152,14 +155,14 @@ public class SendMessageRequestBody extends Body {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("sendMessageRequestBody={")
- .append("topic=").append(topic).append(",")
- .append("bizSeqNo=").append(bizSeqNo).append(",")
- .append("uniqueId=").append(uniqueId).append(",")
- .append("content=").append(content).append(",")
- .append("ttl=").append(ttl).append(",")
- .append("tag=").append(tag).append(",")
- .append("producerGroup=").append(producerGroup).append(",")
- .append("extFields=").append(extFields).append("}");
+ .append("topic=").append(topic).append(",")
+ .append("bizSeqNo=").append(bizSeqNo).append(",")
+ .append("uniqueId=").append(uniqueId).append(",")
+ .append("content=").append(content).append(",")
+ .append("ttl=").append(ttl).append(",")
+ .append("tag=").append(tag).append(",")
+ .append("producerGroup=").append(producerGroup).append(",")
+ .append("extFields=").append(extFields).append("}");
return sb.toString();
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
new file mode 100644
index 0000000..a9c3fc1
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.apache.eventmesh.common.exception.JsonException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+/**
+ * Json serialize or deserialize utils.
+ */
+public class JsonUtils {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ static {
+ OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ }
+
+ /**
+ * Serialize object to json string.
+ *
+ * @param obj obj
+ * @return json string
+ */
+ public static String serialize(Object obj) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(obj);
+ } catch (JsonProcessingException e) {
+ throw new JsonException("serialize to json error", e);
+ }
+ }
+
+ /**
+ * Deserialize json string to object.
+ *
+ * @param str json string
+ * @param clz object class
+ * @param <T> object type
+ * @return object
+ */
+ public static <T> T deserialize(String str, Class<T> clz) {
+ try {
+ return OBJECT_MAPPER.readValue(str, clz);
+ } catch (JsonProcessingException e) {
+ throw new JsonException("deserialize json string to object error", e);
+ }
+ }
+
+ /**
+ * Deserialize json string to object.
+ *
+ * @param str json string
+ * @param typeReference object type reference
+ * @param <T> object type
+ * @return object
+ */
+ public static <T> T deserialize(String str, TypeReference<T> typeReference) {
+ try {
+ return OBJECT_MAPPER.readValue(str, typeReference);
+ } catch (JsonProcessingException e) {
+ throw new JsonException("deserialize json string to object error", e);
+ }
+ }
+}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
index 4b88cfd..962a46a 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle
@@ -16,7 +16,8 @@
*/
dependencies {
- api project(":eventmesh-spi")
+ compileOnly project(":eventmesh-spi")
+ compileOnly project(":eventmesh-common")
api 'io.openmessaging:openmessaging-api'
api 'io.dropwizard.metrics:metrics-core'
api "io.dropwizard.metrics:metrics-healthchecks"
@@ -24,6 +25,7 @@ dependencies {
api "io.dropwizard.metrics:metrics-json"
testImplementation project(":eventmesh-spi")
+ testImplementation project(":eventmesh-common")
testImplementation 'io.openmessaging:openmessaging-api'
testImplementation 'io.dropwizard.metrics:metrics-core'
testImplementation "io.dropwizard.metrics:metrics-healthchecks"
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
index 5123f8d..7f603d8 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/build.gradle
@@ -27,26 +27,33 @@ configurations {
}
List rocketmq = [
- "org.apache.rocketmq:rocketmq-client:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-common:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-store:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-test:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
- "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-client:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-common:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-store:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-test:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
+ "org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
]
dependencies {
- api project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ compileOnly project(":eventmesh-common")
+ implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation rocketmq
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation project(":eventmesh-common")
+
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.powermock:powermock-module-junit4"
+ testImplementation "org.powermock:powermock-api-mockito2"
+
testImplementation rocketmq
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
index c34a991..dbad619 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
+++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
@@ -17,31 +17,29 @@
package org.apache.eventmesh.connector.rocketmq.producer;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.api.Message;
-import io.openmessaging.api.MessageBuilder;
-import io.openmessaging.api.OnExceptionContext;
-import io.openmessaging.api.Producer;
-import io.openmessaging.api.SendCallback;
-import io.openmessaging.api.SendResult;
-import io.openmessaging.api.exception.OMSRuntimeException;
-
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
+
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.RequestCallback;
-import org.apache.rocketmq.client.producer.RequestResponseFuture;
-import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.MessageBuilder;
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.Producer;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+import io.openmessaging.api.exception.OMSRuntimeException;
public class ProducerImpl extends AbstractOMSProducer implements Producer {
@@ -63,7 +61,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
super.getRocketmqProducer().setPollNameServerInterval(60000);
super.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory()
- .getNettyClientConfig().setClientAsyncSemaphoreValue(eventMeshServerAsyncAccumulationThreshold);
+ .getNettyClientConfig()
+ .setClientAsyncSemaphoreValue(eventMeshServerAsyncAccumulationThreshold);
super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10);
}
@@ -73,11 +72,12 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
try {
- org.apache.rocketmq.client.producer.SendResult sendResultRMQ = this.rocketmqProducer.send(msgRMQ);
- message.setMsgID(sendResultRMQ.getMsgId());
+ org.apache.rocketmq.client.producer.SendResult sendResultRmq =
+ this.rocketmqProducer.send(msgRMQ);
+ message.setMsgID(sendResultRmq.getMsgId());
SendResult sendResult = new SendResult();
- sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());
- sendResult.setMessageId(sendResultRMQ.getMsgId());
+ sendResult.setTopic(sendResultRmq.getMessageQueue().getTopic());
+ sendResult.setMessageId(sendResultRmq.getMsgId());
return sendResult;
} catch (Exception e) {
log.error(String.format("Send message Exception, %s", message), e);
@@ -114,7 +114,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
public void request(Message message, RRCallback rrCallback, long timeout)
- throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
@@ -133,7 +133,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
public void onException(Throwable e) {
String topic = message.getTopic();
String msgId = message.getMsgID();
- OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
+ OMSRuntimeException onsEx =
+ ProducerImpl.this.checkProducerException(topic, msgId, e);
OnExceptionContext context = new OnExceptionContext();
context.setTopic(topic);
context.setMessageId(msgId);
@@ -144,25 +145,28 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
};
}
- private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message, final SendCallback sendCallback) {
- org.apache.rocketmq.client.producer.SendCallback rmqSendCallback = new org.apache.rocketmq.client.producer.SendCallback() {
- @Override
- public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
- sendCallback.onSuccess(OMSUtil.sendResultConvert(sendResult));
- }
-
- @Override
- public void onException(Throwable e) {
- String topic = message.getTopic();
- String msgId = message.getMsgID();
- OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
- OnExceptionContext context = new OnExceptionContext();
- context.setTopic(topic);
- context.setMessageId(msgId);
- context.setException(onsEx);
- sendCallback.onException(context);
- }
- };
+ private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(
+ final Message message, final SendCallback sendCallback) {
+ org.apache.rocketmq.client.producer.SendCallback rmqSendCallback =
+ new org.apache.rocketmq.client.producer.SendCallback() {
+ @Override
+ public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+ sendCallback.onSuccess(OMSUtil.sendResultConvert(sendResult));
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ String topic = message.getTopic();
+ String msgId = message.getMsgID();
+ OMSRuntimeException onsEx =
+ ProducerImpl.this.checkProducerException(topic, msgId, e);
+ OnExceptionContext context = new OnExceptionContext();
+ context.setTopic(topic);
+ context.setMessageId(msgId);
+ context.setException(onsEx);
+ sendCallback.onException(context);
+ }
+ };
return rmqSendCallback;
}
diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle
index b9e1423..74fec4d 100644
--- a/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle
+++ b/eventmesh-connector-plugin/eventmesh-connector-standalone/build.gradle
@@ -16,7 +16,9 @@
*/
dependencies {
- api project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ compileOnly project(":eventmesh-common")
+ implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation project(":eventmesh-common")
}
\ No newline at end of file
diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle
index 32bfc3c..f62d019 100644
--- a/eventmesh-examples/build.gradle
+++ b/eventmesh-examples/build.gradle
@@ -22,11 +22,17 @@ dependencies {
// compile log4j2, sl4j
// testCompile log4j2, sl4j
implementation project(":eventmesh-sdk-java")
+ implementation project(":eventmesh-common")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation 'org.springframework.boot:spring-boot-starter-web'
+ implementation 'io.netty:netty-all'
+
testImplementation project(":eventmesh-sdk-java")
+ testImplementation project(":eventmesh-common")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
testImplementation 'org.springframework.boot:spring-boot-starter-web'
+ testImplementation 'io.netty:netty-all'
+
}
configurations.all {
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
index 92ca09d..aa400ce 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java
@@ -17,9 +17,12 @@
package org.apache.eventmesh.http.demo.sub.controller;
-import com.alibaba.fastjson.JSONObject;
-
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.http.demo.sub.service.SubService;
+
+import java.util.HashMap;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,7 +31,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-
@RestController
@RequestMapping("/sub")
public class SubController {
@@ -40,12 +42,12 @@ public class SubController {
@RequestMapping(value = "/test", method = RequestMethod.POST)
public String subTest(@RequestBody String message) {
- logger.info("=======receive message======= {}", JSONObject.toJSONString(message));
+ logger.info("=======receive message======= {}", JsonUtils.serialize(message));
subService.consumeMessage(message);
- JSONObject result = new JSONObject();
- result.put("retCode", 1);
- return result.toJSONString();
+ Map<String, Object> map = new HashMap<>();
+ map.put("retCode", 1);
+ return JsonUtils.serialize(map);
}
}
diff --git a/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle b/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle
index 0d41042..926cdba 100644
--- a/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle
+++ b/eventmesh-registry-plugin/eventmesh-registry-api/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- api project(":eventmesh-spi")
+ compileOnly project(":eventmesh-spi")
testImplementation project(":eventmesh-spi")
}
diff --git a/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle b/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle
index ec1fbb2..33422de 100644
--- a/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle
+++ b/eventmesh-registry-plugin/eventmesh-registry-namesrv/build.gradle
@@ -15,7 +15,7 @@
* limitations under the License.
*/
dependencies {
- api project(":eventmesh-registry-plugin:eventmesh-registry-api")
+ implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
}
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index df1d212..2a58e49 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -16,13 +16,19 @@
*/
dependencies {
- api 'io.opentelemetry:opentelemetry-api'
- api 'io.opentelemetry:opentelemetry-sdk'
- api 'io.opentelemetry:opentelemetry-sdk-metrics'
- api 'io.opentelemetry:opentelemetry-exporter-prometheus'
- api 'io.prometheus:simpleclient'
- api 'io.prometheus:simpleclient_httpserver'
- api 'io.cloudevents:cloudevents-core'
+ implementation 'io.opentelemetry:opentelemetry-api'
+ implementation 'io.opentelemetry:opentelemetry-sdk'
+ implementation 'io.opentelemetry:opentelemetry-sdk-metrics'
+ implementation 'io.opentelemetry:opentelemetry-exporter-prometheus'
+ implementation 'io.prometheus:simpleclient'
+ implementation 'io.prometheus:simpleclient_httpserver'
+ implementation 'io.cloudevents:cloudevents-core'
+
+ implementation "org.apache.httpcomponents:httpclient"
+ implementation 'io.netty:netty-all'
+
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-spi")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone")
implementation project(":eventmesh-security-plugin:eventmesh-security-api")
@@ -30,10 +36,19 @@ dependencies {
implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")
+ testImplementation project(":eventmesh-common")
+ testImplementation project(":eventmesh-spi")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone")
testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl")
testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv")
+
+ testImplementation "org.apache.httpcomponents:httpclient"
+ testImplementation "io.netty:netty-all"
+
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.powermock:powermock-module-junit4"
+ testImplementation "org.powermock:powermock-api-mockito2"
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
index 8857481..4c23663 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/ConsumerManager.java
@@ -17,6 +17,16 @@
package org.apache.eventmesh.runtime.core.protocol.http.consumer;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
+import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
+import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
+import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupTopicConfChangeEvent;
+import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
+
+import org.apache.commons.lang3.StringUtils;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,30 +40,27 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
-import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupTopicConf;
-import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupStateEvent;
-import org.apache.eventmesh.runtime.core.consumergroup.event.ConsumerGroupTopicConfChangeEvent;
-import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.eventbus.Subscribe;
+
public class ConsumerManager {
private EventMeshHTTPServer eventMeshHTTPServer;
- private ConcurrentHashMap<String /** consumerGroup */, ConsumerGroupManager> consumerTable = new ConcurrentHashMap<String, ConsumerGroupManager>();
+ /**
+ * consumerGroup to ConsumerGroupManager.
+ */
+ private ConcurrentHashMap<String, ConsumerGroupManager> consumerTable =
+ new ConcurrentHashMap<>();
private static final int DEFAULT_UPDATE_TIME = 3 * 30 * 1000;
public Logger logger = LoggerFactory.getLogger(this.getClass());
- private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
public ConsumerManager(EventMeshHTTPServer eventMeshHTTPServer) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
@@ -71,7 +78,8 @@ public class ConsumerManager {
public void run() {
logger.info("clientInfo check start.....");
synchronized (eventMeshHTTPServer.localClientInfoMapping) {
- Map<String, List<Client>> clientInfoMap = eventMeshHTTPServer.localClientInfoMapping;
+ Map<String, List<Client>> clientInfoMap =
+ eventMeshHTTPServer.localClientInfoMapping;
if (clientInfoMap.size() > 0) {
for (String key : clientInfoMap.keySet()) {
String consumerGroup = key.split("@")[0];
@@ -82,9 +90,11 @@ public class ConsumerManager {
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
//The time difference is greater than 3 heartbeat cycles
- if (System.currentTimeMillis() - client.lastUpTime.getTime() > DEFAULT_UPDATE_TIME) {
- logger.warn("client {} lastUpdate time {} over three heartbeat cycles",
- JSONObject.toJSONString(client), client.lastUpTime);
+ if (System.currentTimeMillis() - client.lastUpTime.getTime()
+ > DEFAULT_UPDATE_TIME) {
+ logger.warn(
+ "client {} lastUpdate time {} over three heartbeat cycles",
+ JsonUtils.serialize(client), client.lastUpTime);
clientIterator.remove();
isChange = true;
}
@@ -92,13 +102,15 @@ public class ConsumerManager {
if (isChange) {
if (clientList.size() > 0) {
//change url
- logger.info("consumerGroup {} client info changing", consumerGroup);
+ logger.info("consumerGroup {} client info changing",
+ consumerGroup);
Map<String, List<String>> idcUrls = new HashMap<>();
Set<String> clientUrls = new HashSet<>();
for (Client client : clientList) {
clientUrls.add(client.url);
if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+ idcUrls.get(client.idc)
+ .add(StringUtils.deleteWhitespace(client.url));
} else {
List<String> urls = new ArrayList<>();
urls.add(client.url);
@@ -106,14 +118,19 @@ public class ConsumerManager {
}
}
synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
- ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
- Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
+ ConsumerGroupConf consumerGroupConf =
+ eventMeshHTTPServer.localConsumerGroupMapping
+ .get(consumerGroup);
+ Map<String, ConsumerGroupTopicConf> map =
+ consumerGroupConf.getConsumerGroupTopicConf();
for (String topicKey : map.keySet()) {
if (StringUtils.equals(topic, topicKey)) {
- ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
+ ConsumerGroupTopicConf latestTopicConf =
+ new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(topic);
- latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
+ latestTopicConf.setSubscriptionItem(
+ map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);
latestTopicConf.setIdcUrls(idcUrls);
@@ -121,26 +138,31 @@ public class ConsumerManager {
map.put(topic, latestTopicConf);
}
}
- eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
- logger.info("consumerGroup {} client info changed, consumerGroupConf {}", consumerGroup,
- JSONObject.toJSONString(consumerGroupConf));
+ eventMeshHTTPServer.localConsumerGroupMapping
+ .put(consumerGroup, consumerGroupConf);
+ logger.info(
+ "consumerGroup {} client info changed, "
+ + "consumerGroupConf {}", consumerGroup,
+ JsonUtils.serialize(consumerGroupConf));
try {
- notifyConsumerManager(consumerGroup, consumerGroupConf, eventMeshHTTPServer.localConsumerGroupMapping);
+ notifyConsumerManager(consumerGroup, consumerGroupConf);
} catch (Exception e) {
e.printStackTrace();
}
}
} else {
- logger.info("consumerGroup {} client info removed", consumerGroup);
+ logger.info("consumerGroup {} client info removed",
+ consumerGroup);
//remove
try {
- notifyConsumerManager(consumerGroup, null, eventMeshHTTPServer.localConsumerGroupMapping);
+ notifyConsumerManager(consumerGroup, null);
} catch (Exception e) {
e.printStackTrace();
}
- eventMeshHTTPServer.localConsumerGroupMapping.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s));
+ eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+ .removeIf(s -> StringUtils.equals(consumerGroup, s));
}
}
@@ -154,9 +176,11 @@ public class ConsumerManager {
/**
* notify ConsumerManager groupLevel
*/
- public void notifyConsumerManager(String consumerGroup, ConsumerGroupConf latestConsumerGroupConfig,
- ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping) throws Exception {
- ConsumerGroupManager cgm = eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
+ public void notifyConsumerManager(String consumerGroup,
+ ConsumerGroupConf latestConsumerGroupConfig)
+ throws Exception {
+ ConsumerGroupManager cgm =
+ eventMeshHTTPServer.getConsumerManager().getConsumer(consumerGroup);
if (latestConsumerGroupConfig == null) {
ConsumerGroupStateEvent notification = new ConsumerGroupStateEvent();
notification.action = ConsumerGroupStateEvent.ConsumerGroupStateAction.DELETE;
@@ -207,8 +231,10 @@ public class ConsumerManager {
* @param consumerGroupConfig
* @throws Exception
*/
- public synchronized void addConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
- ConsumerGroupManager cgm = new ConsumerGroupManager(eventMeshHTTPServer, consumerGroupConfig);
+ public synchronized void addConsumer(String consumerGroup,
+ ConsumerGroupConf consumerGroupConfig) throws Exception {
+ ConsumerGroupManager cgm =
+ new ConsumerGroupManager(eventMeshHTTPServer, consumerGroupConfig);
cgm.init();
cgm.start();
consumerTable.put(consumerGroup, cgm);
@@ -217,8 +243,10 @@ public class ConsumerManager {
/**
* restart consumer
*/
- public synchronized void restartConsumer(String consumerGroup, ConsumerGroupConf consumerGroupConfig) throws Exception {
- if(consumerTable.containsKey(consumerGroup)) {
+ public synchronized void restartConsumer(String consumerGroup,
+ ConsumerGroupConf consumerGroupConfig)
+ throws Exception {
+ if (consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.get(consumerGroup);
cgm.refresh(consumerGroupConfig);
}
@@ -239,9 +267,10 @@ public class ConsumerManager {
*/
public synchronized void delConsumer(String consumerGroup) throws Exception {
logger.info("start delConsumer with consumerGroup {}", consumerGroup);
- if(consumerTable.containsKey(consumerGroup)) {
+ if (consumerTable.containsKey(consumerGroup)) {
ConsumerGroupManager cgm = consumerTable.remove(consumerGroup);
- logger.info("start unsubscribe topic with consumer group manager {}", JSONObject.toJSONString(cgm));
+ logger.info("start unsubscribe topic with consumer group manager {}",
+ JsonUtils.serialize(cgm));
cgm.unsubscribe(consumerGroup);
cgm.shutdown();
}
@@ -252,25 +281,30 @@ public class ConsumerManager {
public void onChange(ConsumerGroupTopicConfChangeEvent event) {
try {
logger.info("onChange event:{}", event);
- if (event.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.NEW) {
+ if (event.action
+ == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.NEW) {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
}
- manager.getConsumerGroupConfig().getConsumerGroupTopicConf().put(event.topic, event.newTopicConf);
+ manager.getConsumerGroupConfig().getConsumerGroupTopicConf()
+ .put(event.topic, event.newTopicConf);
return;
}
- if (event.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.CHANGE) {
+ if (event.action
+ == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.CHANGE) {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
}
- manager.getConsumerGroupConfig().getConsumerGroupTopicConf().replace(event.topic, event.newTopicConf);
+ manager.getConsumerGroupConfig().getConsumerGroupTopicConf()
+ .replace(event.topic, event.newTopicConf);
return;
}
- if (event.action == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.DELETE) {
+ if (event.action
+ == ConsumerGroupTopicConfChangeEvent.ConsumerGroupTopicConfChangeAction.DELETE) {
ConsumerGroupManager manager = getConsumer(event.consumerGroup);
if (Objects.isNull(manager)) {
return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 38f3f25..79f391e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -17,10 +17,6 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import com.alibaba.fastjson.JSON;
-import io.netty.channel.ChannelHandlerContext;
-import io.openmessaging.api.Message;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
@@ -32,6 +28,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -43,10 +40,16 @@ import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageConte
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.OMSUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
+import io.netty.channel.ChannelHandlerContext;
+import io.openmessaging.api.Message;
public class SendSyncMessageProcessor implements HttpRequestProcessor {
@@ -65,48 +68,59 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
}
@Override
- public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
+ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+ throws Exception {
HttpCommand responseEventMeshCommand;
- cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+ cmdLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
- SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
- SendMessageRequestBody sendMessageRequestBody = (SendMessageRequestBody) asyncContext.getRequest().getBody();
+ SendMessageRequestHeader sendMessageRequestHeader =
+ (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
+ SendMessageRequestBody sendMessageRequestBody =
+ (SendMessageRequestBody) asyncContext.getRequest().getBody();
SendMessageResponseHeader sendMessageResponseHeader =
- SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
- IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
+ SendMessageResponseHeader
+ .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+ IPUtil.getLocalAddress(),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
if (StringUtils.isBlank(sendMessageRequestHeader.getIdc())
- || StringUtils.isBlank(sendMessageRequestHeader.getPid())
- || !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
- || StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
+ || StringUtils.isBlank(sendMessageRequestHeader.getPid())
+ || !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
+ || StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
if (StringUtils.isBlank(sendMessageRequestBody.getBizSeqNo())
- || StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
- || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
- || StringUtils.isBlank(sendMessageRequestBody.getTopic())
- || StringUtils.isBlank(sendMessageRequestBody.getContent())
- || (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
+ || StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
+ || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
+ || StringUtils.isBlank(sendMessageRequestBody.getTopic())
+ || StringUtils.isBlank(sendMessageRequestBody.getContent())
+ || (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
//do acl check
- if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = sendMessageRequestHeader.getUsername();
String pass = sendMessageRequestHeader.getPasswd();
@@ -115,12 +129,13 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
String topic = sendMessageRequestBody.getTopic();
try {
Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
- }catch (Exception e){
- //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+ } catch (Exception e) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
+ e.getMessage()));
asyncContext.onComplete(responseEventMeshCommand);
aclLogger.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
return;
@@ -129,67 +144,84 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
- .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
+ .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS,
+ TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR.getErrMsg()));
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPDiscard();
asyncContext.onComplete(responseEventMeshCommand);
return;
}
String producerGroup = sendMessageRequestBody.getProducerGroup();
- EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
+ EventMeshProducer eventMeshProducer =
+ eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.getStarted().get()) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg()));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
- if (StringUtils.isNotBlank(sendMessageRequestBody.getTtl()) && StringUtils.isNumeric(sendMessageRequestBody.getTtl())) {
+ if (StringUtils.isNotBlank(sendMessageRequestBody.getTtl())
+ && StringUtils.isNumeric(sendMessageRequestBody.getTtl())) {
ttl = sendMessageRequestBody.getTtl();
}
Message omsMsg = new Message();
try {
// body
- omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
+ omsMsg.setBody(
+ sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
// topic
omsMsg.setTopic(sendMessageRequestBody.getTopic());
- omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION, sendMessageRequestBody.getTopic());
+ omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION,
+ sendMessageRequestBody.getTopic());
if (!StringUtils.isBlank(sendMessageRequestBody.getTag())) {
omsMsg.putUserProperties("Tag", sendMessageRequestBody.getTag());
}
// ttl
omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_TIMEOUT, ttl);
// bizNo
- omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS, sendMessageRequestBody.getBizSeqNo());
+ omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS,
+ sendMessageRequestBody.getBizSeqNo());
omsMsg.putUserProperties("msgType", "persistent");
- omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
-// omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
if (messageLogger.isDebugEnabled()) {
- messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
- sendMessageRequestBody.getTopic());
+ messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}",
+ sendMessageRequestBody.getBizSeqNo(),
+ sendMessageRequestBody.getTopic());
}
- omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
} catch (Exception e) {
- messageLogger.error("msg2MQMsg err, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
+ messageLogger
+ .error("msg2MQMsg err, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
sendMessageRequestBody.getTopic(), e);
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
- final SendMessageContext sendMessageContext = new SendMessageContext(sendMessageRequestBody.getBizSeqNo(), omsMsg, eventMeshProducer, eventMeshHTTPServer);
+ final SendMessageContext sendMessageContext =
+ new SendMessageContext(sendMessageRequestBody.getBizSeqNo(), omsMsg, eventMeshProducer,
+ eventMeshHTTPServer);
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsg();
long startTime = System.currentTimeMillis();
@@ -202,46 +234,58 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
httpLogger.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
- eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+ eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+ System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
+ // ignore
}
}
};
LiteMessage liteMessage = new LiteMessage(sendMessageRequestBody.getBizSeqNo(),
- sendMessageRequestBody.getUniqueId(), sendMessageRequestBody.getTopic(),
- sendMessageRequestBody.getContent())
- .setProp(sendMessageRequestBody.getExtFields());
+ sendMessageRequestBody.getUniqueId(), sendMessageRequestBody.getTopic(),
+ sendMessageRequestBody.getContent())
+ .setProp(sendMessageRequestBody.getExtFields());
try {
eventMeshProducer.request(sendMessageContext, new RRCallback() {
@Override
public void onSuccess(Message omsMsg) {
- omsMsg.getUserProperties().put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, omsMsg.getSystemProperties("BORN_TIMESTAMP"));
- omsMsg.getUserProperties().put(EventMeshConstants.STORE_TIMESTAMP, omsMsg.getSystemProperties("STORE_TIMESTAMP"));
- omsMsg.getUserProperties().put(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
- messageLogger.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- System.currentTimeMillis() - startTime,
- sendMessageRequestBody.getTopic(),
- sendMessageRequestBody.getBizSeqNo(),
- sendMessageRequestBody.getUniqueId());
+ omsMsg.getUserProperties().put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP,
+ omsMsg.getSystemProperties("BORN_TIMESTAMP"));
+ omsMsg.getUserProperties().put(EventMeshConstants.STORE_TIMESTAMP,
+ omsMsg.getSystemProperties("STORE_TIMESTAMP"));
+ omsMsg.getUserProperties().put(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
+ messageLogger.info(
+ "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+ + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+ sendMessageRequestBody.getTopic(),
+ sendMessageRequestBody.getBizSeqNo(),
+ sendMessageRequestBody.getUniqueId());
try {
- final String rtnMsg = new String(omsMsg.getBody(), EventMeshConstants.DEFAULT_CHARSET);
- omsMsg.getUserProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ final String rtnMsg =
+ new String(omsMsg.getBody(), EventMeshConstants.DEFAULT_CHARSET);
+ omsMsg.getUserProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
HttpCommand succ = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
- JSON.toJSONString(new SendMessageResponseBody.ReplyMessage(omsMsg.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), rtnMsg,
- OMSUtil.combineProp(omsMsg.getSystemProperties(),
- omsMsg.getUserProperties()))
- )));
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
+ JsonUtils.serialize(new SendMessageResponseBody.ReplyMessage(omsMsg
+ .getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION),
+ rtnMsg,
+ OMSUtil.combineProp(omsMsg.getSystemProperties(),
+ omsMsg.getUserProperties()))
+ )));
asyncContext.onComplete(succ, handler);
} catch (Exception ex) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(
+ EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err, handler);
messageLogger.warn("message|mq2eventMesh|RSP", ex);
}
@@ -250,35 +294,41 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
@Override
public void onException(Throwable e) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err, handler);
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
- messageLogger.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- System.currentTimeMillis() - startTime,
- sendMessageRequestBody.getTopic(),
- sendMessageRequestBody.getBizSeqNo(),
- sendMessageRequestBody.getUniqueId(), e);
+ messageLogger.error(
+ "message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+ + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime,
+ sendMessageRequestBody.getTopic(),
+ sendMessageRequestBody.getBizSeqNo(),
+ sendMessageRequestBody.getUniqueId(), e);
}
}, Integer.valueOf(ttl));
} catch (Exception ex) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- sendMessageResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2)));
+ sendMessageResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err);
eventMeshHTTPServer.getHttpRetryer().pushRetry(sendMessageContext.delay(10000));
long endTime = System.currentTimeMillis();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
- messageLogger.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- endTime - startTime,
- sendMessageRequestBody.getTopic(),
- sendMessageRequestBody.getBizSeqNo(),
- sendMessageRequestBody.getUniqueId(), ex);
+ messageLogger.error(
+ "message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+ endTime - startTime,
+ sendMessageRequestBody.getTopic(),
+ sendMessageRequestBody.getBizSeqNo(),
+ sendMessageRequestBody.getUniqueId(), ex);
}
return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 14e185e..4be4fb2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -17,17 +17,6 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import com.alibaba.fastjson.JSONObject;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
@@ -38,6 +27,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.SubscribeRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.SubscribeResponseHeader;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -49,9 +39,23 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelHandlerContext;
+
public class SubscribeProcessor implements HttpRequestProcessor {
public Logger httpLogger = LoggerFactory.getLogger("http");
@@ -65,63 +69,76 @@ public class SubscribeProcessor implements HttpRequestProcessor {
}
@Override
- public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
+ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+ throws Exception {
HttpCommand responseEventMeshCommand;
- httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
- SubscribeRequestHeader subscribeRequestHeader = (SubscribeRequestHeader) asyncContext.getRequest().getHeader();
- SubscribeRequestBody subscribeRequestBody = (SubscribeRequestBody) asyncContext.getRequest().getBody();
+ httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+ SubscribeRequestHeader subscribeRequestHeader =
+ (SubscribeRequestHeader) asyncContext.getRequest().getHeader();
+ SubscribeRequestBody subscribeRequestBody =
+ (SubscribeRequestBody) asyncContext.getRequest().getBody();
SubscribeResponseHeader subscribeResponseHeader =
- SubscribeResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
- IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
-
+ SubscribeResponseHeader
+ .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+ IPUtil.getLocalAddress(),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(subscribeRequestHeader.getIdc())
- || StringUtils.isBlank(subscribeRequestHeader.getPid())
- || !StringUtils.isNumeric(subscribeRequestHeader.getPid())
- || StringUtils.isBlank(subscribeRequestHeader.getSys())) {
+ || StringUtils.isBlank(subscribeRequestHeader.getPid())
+ || !StringUtils.isNumeric(subscribeRequestHeader.getPid())
+ || StringUtils.isBlank(subscribeRequestHeader.getSys())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- subscribeResponseHeader,
- SubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ subscribeResponseHeader,
+ SubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
//validate body
if (StringUtils.isBlank(subscribeRequestBody.getUrl())
- || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())
- || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) {
+ || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())
+ || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- subscribeResponseHeader,
- SubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+ subscribeResponseHeader,
+ SubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
//do acl check
- if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = subscribeRequestHeader.getUsername();
String pass = subscribeRequestHeader.getPasswd();
String subsystem = subscribeRequestHeader.getSys();
int requestCode = Integer.valueOf(subscribeRequestHeader.getCode());
- for(SubscriptionItem item : subTopicList) {
+ for (SubscriptionItem item : subTopicList) {
try {
- Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(), requestCode);
+ Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(),
+ requestCode);
} catch (Exception e) {
- //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- subscribeResponseHeader,
- SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ subscribeResponseHeader,
+ SendMessageResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(),
+ e.getMessage()));
asyncContext.onComplete(responseEventMeshCommand);
- aclLogger.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+ aclLogger
+ .warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
return;
}
}
@@ -135,7 +152,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
for (SubscriptionItem subTopic : subTopicList) {
- List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + subTopic.getTopic());
+ List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+ .get(consumerGroup + "@" + subTopic.getTopic());
if (CollectionUtils.isEmpty(groupTopicClients)) {
httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic);
@@ -151,7 +169,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
idcUrls.put(client.idc, urls);
}
}
- ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+ ConsumerGroupConf consumerGroupConf =
+ eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
if (consumerGroupConf == null) {
// new subscription
consumerGroupConf = new ConsumerGroupConf(consumerGroup);
@@ -168,7 +187,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
consumerGroupConf.setConsumerGroupTopicConf(map);
} else {
// already subscribed
- Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
+ Map<String, ConsumerGroupTopicConf> map =
+ consumerGroupConf.getConsumerGroupTopicConf();
for (String key : map.keySet()) {
if (StringUtils.equals(subTopic.getTopic(), key)) {
ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
@@ -191,8 +211,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
- eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup),
- eventMeshHTTPServer.localConsumerGroupMapping);
+ eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
+ eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
@Override
@@ -202,26 +222,32 @@ public class SubscribeProcessor implements HttpRequestProcessor {
httpLogger.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
- eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+ eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+ System.currentTimeMillis()
+ - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
+ // ignore
}
}
};
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+ EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- subscribeResponseHeader,
- SubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ subscribeResponseHeader,
+ SubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err);
long endTime = System.currentTimeMillis();
- httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- endTime - startTime,
- JSONObject.toJSONString(subscribeRequestBody.getTopics()),
- subscribeRequestBody.getUrl(), e);
+ httpLogger.error(
+ "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}"
+ + "|bizSeqNo={}|uniqueId={}", endTime - startTime,
+ JsonUtils.serialize(subscribeRequestBody.getTopics()),
+ subscribeRequestBody.getUrl(), e);
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
}
@@ -234,8 +260,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
}
private void registerClient(SubscribeRequestHeader subscribeRequestHeader, String consumerGroup,
- List<SubscriptionItem> subscriptionItems, String url) {
- for(SubscriptionItem item: subscriptionItems) {
+ List<SubscriptionItem> subscriptionItems, String url) {
+ for (SubscriptionItem item : subscriptionItems) {
Client client = new Client();
client.env = subscribeRequestHeader.getEnv();
client.idc = subscribeRequestHeader.getIdc();
@@ -250,7 +276,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
String groupTopicKey = client.consumerGroup + "@" + client.topic;
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
- List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+ List<Client> localClients =
+ eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
if (StringUtils.equals(localClient.url, client.url)) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
index e5feb76..df3921d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java
@@ -17,18 +17,6 @@
package org.apache.eventmesh.runtime.core.protocol.http.processor;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import com.alibaba.fastjson.JSONObject;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.UnSubscribeRequestBody;
@@ -37,6 +25,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.UnSubscribeRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.UnSubscribeResponseHeader;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -47,9 +36,24 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelHandlerContext;
+
public class UnSubscribeProcessor implements HttpRequestProcessor {
public Logger httpLogger = LoggerFactory.getLogger("http");
@@ -61,40 +65,50 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
}
@Override
- public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
+ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
+ throws Exception {
HttpCommand responseEventMeshCommand;
- httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
- EventMeshConstants.PROTOCOL_HTTP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
- UnSubscribeRequestHeader unSubscribeRequestHeader = (UnSubscribeRequestHeader) asyncContext.getRequest().getHeader();
- UnSubscribeRequestBody unSubscribeRequestBody = (UnSubscribeRequestBody) asyncContext.getRequest().getBody();
+ httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}",
+ RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())),
+ EventMeshConstants.PROTOCOL_HTTP,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), IPUtil.getLocalAddress());
+ UnSubscribeRequestHeader unSubscribeRequestHeader =
+ (UnSubscribeRequestHeader) asyncContext.getRequest().getHeader();
+ UnSubscribeRequestBody unSubscribeRequestBody =
+ (UnSubscribeRequestBody) asyncContext.getRequest().getBody();
UnSubscribeResponseHeader unSubscribeResponseHeader =
- UnSubscribeResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
- IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
- eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
-
+ UnSubscribeResponseHeader
+ .buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
+ IPUtil.getLocalAddress(),
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
+ eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//validate header
if (StringUtils.isBlank(unSubscribeRequestHeader.getIdc())
- || StringUtils.isBlank(unSubscribeRequestHeader.getPid())
- || !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())
- || StringUtils.isBlank(unSubscribeRequestHeader.getSys())) {
+ || StringUtils.isBlank(unSubscribeRequestHeader.getPid())
+ || !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())
+ || StringUtils.isBlank(unSubscribeRequestHeader.getSys())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- unSubscribeResponseHeader,
- UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
+ unSubscribeResponseHeader,
+ UnSubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
//validate body
if (StringUtils.isBlank(unSubscribeRequestBody.getUrl())
- || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())
- || StringUtils.isBlank(unSubscribeRequestBody.getConsumerGroup())) {
+ || CollectionUtils.isEmpty(unSubscribeRequestBody.getTopics())
+ || StringUtils.isBlank(unSubscribeRequestBody.getConsumerGroup())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- unSubscribeResponseHeader,
- UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
+ unSubscribeResponseHeader,
+ UnSubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
@@ -115,8 +129,10 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
httpLogger.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
- eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
+ eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(
+ System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
+ // ignore
}
}
};
@@ -127,12 +143,14 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);
for (String unSubTopic : unSubTopicList) {
- List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping.get(consumerGroup + "@" + unSubTopic);
+ List<Client> groupTopicClients = eventMeshHTTPServer.localClientInfoMapping
+ .get(consumerGroup + "@" + unSubTopic);
Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
Client client = clientIterator.next();
- if (StringUtils.equals(client.pid, pid) && StringUtils.equals(client.url, unSubscribeUrl)) {
- httpLogger.warn("client {} start unsubscribe", JSONObject.toJSONString(client));
+ if (StringUtils.equals(client.pid, pid)
+ && StringUtils.equals(client.url, unSubscribeUrl)) {
+ httpLogger.warn("client {} start unsubscribe", JsonUtils.serialize(client));
clientIterator.remove();
}
}
@@ -145,7 +163,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
if (!StringUtils.equals(unSubscribeUrl, client.url)) {
clientUrls.add(client.url);
if (idcUrls.containsKey(client.idc)) {
- idcUrls.get(client.idc).add(StringUtils.deleteWhitespace(client.url));
+ idcUrls.get(client.idc)
+ .add(StringUtils.deleteWhitespace(client.url));
} else {
List<String> urls = new ArrayList<>();
urls.add(client.url);
@@ -155,15 +174,19 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
}
synchronized (eventMeshHTTPServer.localConsumerGroupMapping) {
- ConsumerGroupConf consumerGroupConf = eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
- Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
+ ConsumerGroupConf consumerGroupConf =
+ eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup);
+ Map<String, ConsumerGroupTopicConf> map =
+ consumerGroupConf.getConsumerGroupTopicConf();
for (String topicKey : map.keySet()) {
// only modify the topic to subscribe
if (StringUtils.equals(unSubTopic, topicKey)) {
- ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
+ ConsumerGroupTopicConf latestTopicConf =
+ new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(unSubTopic);
- latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
+ latestTopicConf
+ .setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);
latestTopicConf.setIdcUrls(idcUrls);
@@ -171,7 +194,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
map.put(unSubTopic, latestTopicConf);
}
}
- eventMeshHTTPServer.localConsumerGroupMapping.put(consumerGroup, consumerGroupConf);
+ eventMeshHTTPServer.localConsumerGroupMapping
+ .put(consumerGroup, consumerGroupConf);
}
} else {
isChange = false;
@@ -181,51 +205,64 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
long startTime = System.currentTimeMillis();
if (isChange) {
try {
- eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup),
- eventMeshHTTPServer.localConsumerGroupMapping);
+ eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
+ eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup));
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+ EventMeshRetCode.SUCCESS.getRetCode(),
+ EventMeshRetCode.SUCCESS.getErrMsg());
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- unSubscribeResponseHeader,
- UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ unSubscribeResponseHeader,
+ UnSubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err);
long endTime = System.currentTimeMillis();
- httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- endTime - startTime,
- JSONObject.toJSONString(unSubscribeRequestBody.getTopics()),
- unSubscribeRequestBody.getUrl(), e);
+ httpLogger.error(
+ "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ + "|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime,
+ JsonUtils.serialize(unSubscribeRequestBody.getTopics()),
+ unSubscribeRequestBody.getUrl(), e);
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
- eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
+ eventMeshHTTPServer.metrics.summaryMetrics
+ .recordSendMsgCost(endTime - startTime);
}
} else {
//remove
try {
- eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, null, eventMeshHTTPServer.localConsumerGroupMapping);
+ eventMeshHTTPServer.getConsumerManager()
+ .notifyConsumerManager(consumerGroup, null);
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
- EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg());
+ EventMeshRetCode.SUCCESS.getRetCode(),
+ EventMeshRetCode.SUCCESS.getErrMsg());
asyncContext.onComplete(responseEventMeshCommand, handler);
// clean ClientInfo
- eventMeshHTTPServer.localClientInfoMapping.keySet().removeIf(s -> StringUtils.contains(s, consumerGroup));
+ eventMeshHTTPServer.localClientInfoMapping.keySet()
+ .removeIf(s -> StringUtils.contains(s, consumerGroup));
// clean ConsumerGroupInfo
- eventMeshHTTPServer.localConsumerGroupMapping.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s));
+ eventMeshHTTPServer.localConsumerGroupMapping.keySet()
+ .removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
- unSubscribeResponseHeader,
- UnSubscribeResponseBody.buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
- EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
+ unSubscribeResponseHeader,
+ UnSubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err);
long endTime = System.currentTimeMillis();
- httpLogger.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- endTime - startTime,
- JSONObject.toJSONString(unSubscribeRequestBody.getTopics()),
- unSubscribeRequestBody.getUrl(), e);
+ httpLogger.error(
+ "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ + "|topic={}|bizSeqNo={}|uniqueId={}", endTime - startTime,
+ JsonUtils.serialize(unSubscribeRequestBody.getTopics()),
+ unSubscribeRequestBody.getUrl(), e);
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
- eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
+ eventMeshHTTPServer.metrics.summaryMetrics
+ .recordSendMsgCost(endTime - startTime);
}
}
}
@@ -236,9 +273,10 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
return false;
}
- private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader, String consumerGroup,
- List<String> topicList, String url) {
- for(String topic: topicList) {
+ private void registerClient(UnSubscribeRequestHeader unSubscribeRequestHeader,
+ String consumerGroup,
+ List<String> topicList, String url) {
+ for (String topic : topicList) {
Client client = new Client();
client.env = unSubscribeRequestHeader.getEnv();
client.idc = unSubscribeRequestHeader.getIdc();
@@ -252,7 +290,8 @@ public class UnSubscribeProcessor implements HttpRequestProcessor {
String groupTopicKey = client.consumerGroup + "@" + client.topic;
if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) {
- List<Client> localClients = eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
+ List<Client> localClients =
+ eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey);
boolean isContains = false;
for (Client localClient : localClients) {
if (StringUtils.equals(localClient.url, client.url)) {
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
index 1b834d9..58711b1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/inf/Client.java
@@ -20,10 +20,6 @@ package org.apache.eventmesh.runtime.core.protocol.http.processor.inf;
import java.util.Date;
-import com.alibaba.fastjson.JSONObject;
-
-import org.apache.commons.lang3.StringUtils;
-
public class Client {
public String env;
@@ -48,44 +44,20 @@ public class Client {
public Date lastUpTime;
- public static Client buildClientFromJSONObject(JSONObject jsonObject) {
- if (jsonObject == null) {
- return null;
- }
-
- Client client = null;
- try {
- client = new Client();
-
- client.env = StringUtils.trim(jsonObject.getString("env"));
- client.consumerGroup = StringUtils.trim(jsonObject.getString("groupName"));
- client.topic = StringUtils.trim(jsonObject.getString("topic"));
- client.url = StringUtils.trim(jsonObject.getString("url"));
- client.sys = StringUtils.trim(jsonObject.getString("sys"));
- client.idc = StringUtils.trim(jsonObject.getString("idc"));
- client.ip = StringUtils.trim(jsonObject.getString("ip"));
- client.pid = StringUtils.trim(jsonObject.getString("pid"));
- client.hostname = StringUtils.trim(jsonObject.getString("hostname"));
- client.apiVersion = StringUtils.trim(jsonObject.getString("apiversion"));
- } catch (Exception ex) {
- }
- return client;
- }
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("endPoint={env=").append(env)
- .append(",idc=").append(idc)
- .append(",consumerGroup=").append(consumerGroup)
- .append(",topic=").append(topic)
- .append(",url=").append(url)
- .append(",sys=").append(sys)
- .append(",ip=").append(ip)
- .append(",pid=").append(pid)
- .append(",hostname=").append(hostname)
- .append(",apiVersion=").append(apiVersion)
- .append(",registerTime=").append("}");
+ .append(",idc=").append(idc)
+ .append(",consumerGroup=").append(consumerGroup)
+ .append(",topic=").append(topic)
+ .append(",url=").append(url)
+ .append(",sys=").append(sys)
+ .append(",ip=").append(ip)
+ .append(",pid=").append(pid)
+ .append(",hostname=").append(hostname)
+ .append(",apiVersion=").append(apiVersion)
+ .append(",registerTime=").append("}");
return sb.toString();
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index f50c3ee..e0e0349 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -17,33 +17,23 @@
package org.apache.eventmesh.runtime.core.protocol.http.push;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.RandomStringUtil;
+import org.apache.eventmesh.common.exception.JsonException;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.OMSUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
@@ -52,9 +42,21 @@ import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Sets;
+
public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public Logger messageLogger = LoggerFactory.getLogger("message");
@@ -67,7 +69,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public String currPushUrl;
- public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
+ public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext,
+ Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
super(handleMsgContext);
this.waitingRequests = waitingRequests;
}
@@ -94,37 +97,50 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
builder.addHeader(ProtocolKey.REQUEST_CODE, requestCode);
builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
- builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshCluster);
+ builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
+ handleMsgContext.getEventMeshHTTPServer()
+ .getEventMeshHttpConfiguration().eventMeshCluster);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtil.getLocalAddress());
- builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
- builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC, handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
+ builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
+ handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
+ builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
+ handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
- handleMsgContext.getMsg().getUserProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
+ handleMsgContext.getMsg().getUserProperties()
+ .put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
String content = "";
try {
- content = new String(handleMsgContext.getMsg().getBody(), EventMeshConstants.DEFAULT_CHARSET);
+ content =
+ new String(handleMsgContext.getMsg().getBody(), EventMeshConstants.DEFAULT_CHARSET);
} catch (Exception ex) {
return;
}
- List<NameValuePair> body = new ArrayList<NameValuePair>();
+ List<NameValuePair> body = new ArrayList<>();
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
- body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, RandomStringUtil.generateNum(20)));
+ body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
+ RandomStringUtil.generateNum(20)));
} else {
- body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO, handleMsgContext.getBizSeqNo()));
+ body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
+ handleMsgContext.getBizSeqNo()));
}
if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
- body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, RandomStringUtil.generateNum(20)));
+ body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
+ RandomStringUtil.generateNum(20)));
} else {
- body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID, handleMsgContext.getUniqueId()));
+ body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
+ handleMsgContext.getUniqueId()));
}
- body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO, handleMsgContext.getMsgRandomNo()));
+ body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
+ handleMsgContext.getMsgRandomNo()));
body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
- body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS, JSON.toJSONString(OMSUtil.getMessageProp(handleMsgContext.getMsg()))));
+ body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
+ JsonUtils.serialize(OMSUtil.getMessageProp(handleMsgContext.getMsg()))));
try {
builder.setEntity(new UrlEncodedFormEntity(body));
@@ -139,7 +155,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
addToWaitingMap(this);
cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
- IPUtil.getLocalAddress(), currPushUrl);
+ IPUtil.getLocalAddress(), currPushUrl);
try {
httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
@@ -150,8 +166,10 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPPushTimeCost(cost);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
eventMeshHTTPServer.metrics.summaryMetrics.recordHttpPushMsgFailed();
- messageLogger.info("message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ messageLogger.info(
+ "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+ + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
delayRetry();
if (isComplete()) {
@@ -160,14 +178,18 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
} else {
String res = "";
try {
- res = EntityUtils.toString(response.getEntity(), Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
+ res = EntityUtils.toString(response.getEntity(),
+ Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
} catch (IOException e) {
handleMsgContext.finish();
return new Object();
}
ClientRetCode result = processResponseContent(res);
- messageLogger.info("message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}", result, currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ messageLogger.info(
+ "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+ + "|uniqueId={}|cost={}",
+ result, currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
if (result == ClientRetCode.OK) {
complete();
if (isComplete()) {
@@ -195,10 +217,13 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
});
if (messageLogger.isDebugEnabled()) {
- messageLogger.debug("message|eventMesh2client|url={}|topic={}|msg={}", currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getMsg());
+ messageLogger.debug("message|eventMesh2client|url={}|topic={}|msg={}", currPushUrl,
+ handleMsgContext.getTopic(),
+ handleMsgContext.getMsg());
} else {
- messageLogger.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}", currPushUrl, handleMsgContext.getTopic(),
+ messageLogger
+ .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
+ currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
}
} catch (IOException e) {
@@ -215,13 +240,16 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("asyncPushRequest={")
- .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
- .append(",startIdx=").append(startIdx)
- .append(",retryTimes=").append(retryTimes)
- .append(",uniqueId=").append(handleMsgContext.getUniqueId())
- .append(",executeTime=").append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
- .append(",lastPushTime=").append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
- .append(",createTime=").append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
+ .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
+ .append(",startIdx=").append(startIdx)
+ .append(",retryTimes=").append(retryTimes)
+ .append(",uniqueId=").append(handleMsgContext.getUniqueId())
+ .append(",executeTime=")
+ .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
+ .append(",lastPushTime=")
+ .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
+ .append(",createTime=")
+ .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
return sb.toString();
}
@@ -231,21 +259,26 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
}
try {
- JSONObject ret = JSONObject.parseObject(content);
- Integer retCode = ret.getInteger("retCode");
+ Map<String, Object> ret =
+ JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
+ });
+ Integer retCode = (Integer) ret.get("retCode");
if (retCode != null && ClientRetCode.contains(retCode)) {
return ClientRetCode.get(retCode);
}
return ClientRetCode.FAIL;
} catch (NumberFormatException e) {
- messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
- } catch (JSONException e) {
- messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ } catch (JsonException e) {
+ messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
} catch (Throwable t) {
- messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl, handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
}
}
@@ -255,7 +288,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
return;
}
- waitingRequests.put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
+ waitingRequests
+ .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
return;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 87f7b9b..02c2388 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -17,16 +17,13 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;
-import com.alibaba.fastjson.JSON;
-import io.openmessaging.api.*;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -40,15 +37,31 @@ import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStre
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.HttpTinyClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.*;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.openmessaging.api.AsyncConsumeContext;
+import io.openmessaging.api.AsyncMessageListener;
+import io.openmessaging.api.Message;
+import io.openmessaging.api.OnExceptionContext;
+import io.openmessaging.api.SendCallback;
+import io.openmessaging.api.SendResult;
+
public class ClientGroupWrapper {
public static Logger logger = LoggerFactory.getLogger(ClientGroupWrapper.class);
@@ -88,7 +101,8 @@ public class ClientGroupWrapper {
private MQConsumerWrapper broadCastMsgConsumer;
- private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();
+ private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping =
+ new ConcurrentHashMap<String, Set<Session>>();
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
@@ -105,9 +119,12 @@ public class ClientGroupWrapper {
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
- this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
- this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
- this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.persistentMsgConsumer = new MQConsumerWrapper(
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.broadCastMsgConsumer = new MQConsumerWrapper(
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.mqProducerWrapper = new MQProducerWrapper(
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
}
public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -128,13 +145,14 @@ public class ClientGroupWrapper {
return has;
}
- public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback) throws Exception {
+ public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback)
+ throws Exception {
mqProducerWrapper.send(upStreamMsgContext.getMsg(), sendCallback);
return true;
}
public void request(UpStreamMsgContext upStreamMsgContext, RRCallback rrCallback, long timeout)
- throws Exception {
+ throws Exception {
mqProducerWrapper.request(upStreamMsgContext.getMsg(), rrCallback, timeout);
}
@@ -147,10 +165,12 @@ public class ClientGroupWrapper {
@Override
public void onException(OnExceptionContext context) {
- String bizSeqNo = upStreamMsgContext.getMsg().getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
+ String bizSeqNo = upStreamMsgContext.getMsg()
+ .getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_KEYS);
logger.error("reply err! topic:{}, bizSeqNo:{}, client:{}",
- upStreamMsgContext.getMsg().getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), bizSeqNo,
- upStreamMsgContext.getSession().getClient(), context.getException());
+ upStreamMsgContext.getMsg()
+ .getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION), bizSeqNo,
+ upStreamMsgContext.getSession().getClient(), context.getException());
}
});
return true;
@@ -161,7 +181,8 @@ public class ClientGroupWrapper {
}
public boolean addSubscription(String topic, Session session) throws Exception {
- if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ if (session == null || !StringUtils.equalsIgnoreCase(consumerGroup,
+ EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("addSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -175,12 +196,16 @@ public class ClientGroupWrapper {
}
r = topic2sessionInGroupMapping.get(topic).add(session);
if (r) {
- logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+ logger.info("addSubscription success, group:{} topic:{} client:{}", consumerGroup,
+ topic, session.getClient());
} else {
- logger.warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+ logger
+ .warn("addSubscription fail, group:{} topic:{} client:{}", consumerGroup, topic,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+ logger
+ .error("addSubscription error! topic:{} client:{}", topic, session.getClient(), e);
throw new Exception("addSubscription fail");
} finally {
this.groupLock.writeLock().unlock();
@@ -190,7 +215,8 @@ public class ClientGroupWrapper {
public boolean removeSubscription(String topic, Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(consumerGroup,
+ EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("removeSubscription param error,topic:{},session:{}", topic, session);
return false;
}
@@ -201,17 +227,23 @@ public class ClientGroupWrapper {
if (topic2sessionInGroupMapping.containsKey(topic)) {
r = topic2sessionInGroupMapping.get(topic).remove(session);
if (r) {
- logger.info("removeSubscription remove session success, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+ logger.info(
+ "removeSubscription remove session success, group:{} topic:{} client:{}",
+ consumerGroup, topic, session.getClient());
} else {
- logger.warn("removeSubscription remove session failed, group:{} topic:{} client:{}", consumerGroup, topic, session.getClient());
+ logger.warn(
+ "removeSubscription remove session failed, group:{} topic:{} client:{}",
+ consumerGroup, topic, session.getClient());
}
}
if (CollectionUtils.size(topic2sessionInGroupMapping.get(topic)) == 0) {
topic2sessionInGroupMapping.remove(topic);
- logger.info("removeSubscription remove topic success, group:{} topic:{}", consumerGroup, topic);
+ logger.info("removeSubscription remove topic success, group:{} topic:{}",
+ consumerGroup, topic);
}
} catch (Exception e) {
- logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(), e);
+ logger.error("removeSubscription error! topic:{} client:{}", topic, session.getClient(),
+ e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -224,9 +256,9 @@ public class ClientGroupWrapper {
}
Properties keyValue = new Properties();
-// KeyValue keyValue = OMS.newKeyValue();
keyValue.put("producerGroup", producerGroup);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil
+ .buildMeshTcpClientID(sysId, "PUB", eventMeshTCPConfiguration.eventMeshCluster));
//TODO for defibus
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
@@ -264,7 +296,8 @@ public class ClientGroupWrapper {
public boolean addGroupConsumerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(consumerGroup,
+ EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("addGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -274,10 +307,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.add(session);
if (r) {
- logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup, session.getClient());
+ logger.info("addGroupConsumerSession success, group:{} client:{}", consumerGroup,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup, session.getClient(), e);
+ logger.error("addGroupConsumerSession error! group:{} client:{}", consumerGroup,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -286,7 +321,8 @@ public class ClientGroupWrapper {
public boolean addGroupProducerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(producerGroup, EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(producerGroup,
+ EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
logger.error("addGroupProducerSession param error,session:{}", session);
return false;
}
@@ -296,10 +332,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.add(session);
if (r) {
- logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup, session.getClient());
+ logger.info("addGroupProducerSession success, group:{} client:{}", producerGroup,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup, session.getClient(), e);
+ logger.error("addGroupProducerSession error! group:{} client:{}", producerGroup,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -308,7 +346,8 @@ public class ClientGroupWrapper {
public boolean removeGroupConsumerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(consumerGroup, EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(consumerGroup,
+ EventMeshUtil.buildClientGroup(session.getClient().getConsumerGroup()))) {
logger.error("removeGroupConsumerSession param error,session:{}", session);
return false;
}
@@ -318,10 +357,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupConsumerSessions.remove(session);
if (r) {
- logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup, session.getClient());
+ logger.info("removeGroupConsumerSession success, group:{} client:{}", consumerGroup,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup, session.getClient(), e);
+ logger.error("removeGroupConsumerSession error! group:{} client:{}", consumerGroup,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -330,7 +371,8 @@ public class ClientGroupWrapper {
public boolean removeGroupProducerSession(Session session) {
if (session == null
- || !StringUtils.equalsIgnoreCase(producerGroup, EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
+ || !StringUtils.equalsIgnoreCase(producerGroup,
+ EventMeshUtil.buildClientGroup(session.getClient().getProducerGroup()))) {
logger.error("removeGroupProducerSession param error,session:{}", session);
return false;
}
@@ -340,10 +382,12 @@ public class ClientGroupWrapper {
this.groupLock.writeLock().lockInterruptibly();
r = groupProducerSessions.remove(session);
if (r) {
- logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup, session.getClient());
+ logger.info("removeGroupProducerSession success, group:{} client:{}", producerGroup,
+ session.getClient());
}
} catch (Exception e) {
- logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup, session.getClient(), e);
+ logger.error("removeGroupProducerSession error! group:{} client:{}", producerGroup,
+ session.getClient(), e);
} finally {
this.groupLock.writeLock().unlock();
}
@@ -360,73 +404,11 @@ public class ClientGroupWrapper {
keyValue.put("isBroadcast", "false");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil
+ .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
persistentMsgConsumer.init(keyValue);
-// persistentMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
-//
-// @Override
-// public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context) {
-//
-// if (msg == null)
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//
-// eventMeshTcpMonitor.getMq2eventMeshMsgNum().incrementAndGet();
-// String topic = msg.getTopic();
-// msg.putUserProperty(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-// msg.putUserProperty(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, accessConfiguration.eventMeshServerIp);
-// msg.putUserProperty(EventMeshConstants.BORN_TIMESTAMP, String.valueOf(msg.getBornTimestamp()));
-// msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(msg.getStoreTimestamp()));
-//
-// if (!EventMeshUtil.isValidRMBTopic(topic)) {
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-//
-// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions);
-// String bizSeqNo = EventMeshUtil.getMessageBizSeq(msg);
-// if(session == null){
-// try {
-// Integer sendBackTimes = MapUtils.getInteger(msg.getProperties(), EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, new Integer(0));
-// String sendBackFromEventMeshIp = MapUtils.getString(msg.getProperties(), EventMeshConstants.EVENTMESH_SEND_BACK_IP, "");
-// logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}", groupName, topic, bizSeqNo);
-//
-// if (sendBackTimes >= eventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackMaxTimes) {
-// logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getAccessConfiguration().eventMeshTcpSendBackMaxTimes, groupName, topic, bizSeqNo);
-// } else {
-// sendBackTimes++;
-// msg.putUserProperty(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, sendBackTimes.toString());
-// msg.putUserProperty(EventMeshConstants.EVENTMESH_SEND_BACK_IP, sendBackFromEventMeshIp);
-// sendMsgBackToBroker(msg, bizSeqNo);
-// }
-// } catch (Exception e){
-// logger.warn("handle msg exception when no session found", e);
-// }
-//
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-//
-// DownStreamMsgContext downStreamMsgContext =
-// new DownStreamMsgContext(msg, session, persistentMsgConsumer, (EventMeshConsumeConcurrentlyContext)context, false);
-//
-// if(downstreamMap.size() < eventMeshTCPServer.getAccessConfiguration().eventMeshTcpDownStreamMapSize){
-// downstreamMap.putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext);
-// }else{
-// logger.warn("downStreamMap is full,group:{}", groupName);
-// }
-//
-// if (session.isCanDownStream()) {
-// session.downstreamMsg(downStreamMsgContext);
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH;
-// }
-//
-// logger.warn("session is busy,dispatch retry,seq:{}, session:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.session.getClient(), bizSeqNo);
-// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-// downStreamMsgContext.delay(delayTime);
-// eventMeshTcpRetryer.pushRetry(downStreamMsgContext);
-//
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH;
-// }
-// });
+
inited4Persistent.compareAndSet(false, true);
logger.info("init persistentMsgConsumer success, group:{}", consumerGroup);
}
@@ -449,59 +431,10 @@ public class ClientGroupWrapper {
keyValue.put("isBroadcast", "true");
keyValue.put("consumerGroup", consumerGroup);
keyValue.put("eventMeshIDC", eventMeshTCPConfiguration.eventMeshIDC);
- keyValue.put("instanceName", EventMeshUtil.buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
+ keyValue.put("instanceName", EventMeshUtil
+ .buildMeshTcpClientID(sysId, "SUB", eventMeshTCPConfiguration.eventMeshCluster));
broadCastMsgConsumer.init(keyValue);
-// broadCastMsgConsumer.registerMessageListener(new EventMeshMessageListenerConcurrently() {
-// @Override
-// public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMeshConsumeConcurrentlyContext context) {
-// if (msg == null)
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-//
-// eventMeshTcpMonitor.getMq2eventMeshMsgNum().incrementAndGet();
-//
-// String topic = msg.getTopic();
-//
-// msg.putUserProperty(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
-// msg.putUserProperty(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, accessConfiguration.eventMeshServerIp);
-// msg.putUserProperty(EventMeshConstants.BORN_TIMESTAMP, String.valueOf(msg.getBornTimestamp()));
-// msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(msg.getStoreTimestamp()));
-//
-// if (!EventMeshUtil.isValidRMBTopic(topic)) {
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-//
-// if(CollectionUtils.isEmpty(groupConsumerSessions)){
-// logger.warn("found no session to downstream broadcast msg");
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-// }
-//
-// Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
-//
-// while (sessionsItr.hasNext()) {
-// Session session = sessionsItr.next();
-//
-// if (!session.isAvailable(topic)) {
-// logger.warn("downstream broadcast msg,session is not available,client:{}",session.getClient());
-// continue;
-// }
-//
-// DownStreamMsgContext downStreamMsgContext =
-// new DownStreamMsgContext(msg, session, broadCastMsgConsumer, context, false);
-//
-// if (session.isCanDownStream()) {
-// session.downstreamMsg(downStreamMsgContext);
-// continue;
-// }
-//
-// logger.warn("downstream broadcast msg,session is busy,dispatch retry,seq:{}, session:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.session.getClient(), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
-// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills;
-// downStreamMsgContext.delay(delayTime);
-// eventMeshTcpRetryer.pushRetry(downStreamMsgContext);
-// }
-//
-// return EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH;
-// }
-// });
+
inited4Broadcast.compareAndSet(false, true);
logger.info("init broadCastMsgConsumer success, group:{}", consumerGroup);
}
@@ -523,15 +456,17 @@ public class ClientGroupWrapper {
public void consume(Message message, AsyncConsumeContext context) {
eventMeshTcpMonitor.getMq2EventMeshMsgNum().incrementAndGet();
- String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
- message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
-
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
+ String topic =
+ message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+ message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
+ message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp);
+
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+ (EventMeshAsyncConsumeContext) context;
if (CollectionUtils.isEmpty(groupConsumerSessions)) {
logger.warn("found no session to downstream broadcast msg");
-// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-// context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
@@ -539,31 +474,35 @@ public class ClientGroupWrapper {
Iterator<Session> sessionsItr = groupConsumerSessions.iterator();
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(message, null, broadCastMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
+ new DownStreamMsgContext(message, null, broadCastMsgConsumer,
+ eventMeshAsyncConsumeContext.getAbstractContext(), false,
+ subscriptionItem);
while (sessionsItr.hasNext()) {
Session session = sessionsItr.next();
if (!session.isAvailable(topic)) {
- logger.warn("downstream broadcast msg,session is not available,client:{}", session.getClient());
+ logger
+ .warn("downstream broadcast msg,session is not available,client:{}",
+ session.getClient());
continue;
}
downStreamMsgContext.session = session;
//downstream broadcast msg asynchronously
- eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService().submit(new Runnable() {
- @Override
- public void run() {
- //msg put in eventmesh,waiting client ack
- session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
- session.downstreamMsg(downStreamMsgContext);
- }
- });
+ eventMeshTCPServer.getBroadcastMsgDownstreamExecutorService()
+ .submit(new Runnable() {
+ @Override
+ public void run() {
+ //msg put in eventmesh,waiting client ack
+ session.getPusher()
+ .unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
+ session.downstreamMsg(downStreamMsgContext);
+ }
+ });
}
-// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-// context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
}
};
@@ -573,51 +512,72 @@ public class ClientGroupWrapper {
@Override
public void consume(Message message, AsyncConsumeContext context) {
eventMeshTcpMonitor.getMq2EventMeshMsgNum().incrementAndGet();
- String topic = message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
- message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
- message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, eventMeshTCPConfiguration.eventMeshServerIp);
-
- EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) context;
- Session session = downstreamDispatchStrategy.select(consumerGroup, topic, groupConsumerSessions);
+ String topic =
+ message.getSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION);
+ message.getSystemProperties().put(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()));
+ message.getSystemProperties().put(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp);
+
+ EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext =
+ (EventMeshAsyncConsumeContext) context;
+ Session session = downstreamDispatchStrategy
+ .select(consumerGroup, topic, groupConsumerSessions);
String bizSeqNo = EventMeshUtil.getMessageBizSeq(message);
if (session == null) {
try {
Integer sendBackTimes = new Integer(0);
String sendBackFromEventMeshIp = "";
- if (StringUtils.isNotBlank(message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES))) {
- sendBackTimes = Integer.valueOf(message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES));
+ if (StringUtils.isNotBlank(message.getSystemProperties(
+ EventMeshConstants.EVENTMESH_SEND_BACK_TIMES))) {
+ sendBackTimes = Integer.valueOf(message.getSystemProperties(
+ EventMeshConstants.EVENTMESH_SEND_BACK_TIMES));
}
- if (StringUtils.isNotBlank(message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP))) {
- sendBackFromEventMeshIp = message.getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
+ if (StringUtils.isNotBlank(message
+ .getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP))) {
+ sendBackFromEventMeshIp = message
+ .getSystemProperties(EventMeshConstants.EVENTMESH_SEND_BACK_IP);
}
- logger.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", consumerGroup, topic, bizSeqNo, sendBackTimes, sendBackFromEventMeshIp);
-
- if (sendBackTimes >= eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
- logger.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes, consumerGroup, topic, bizSeqNo);
+ logger.error(
+ "found no session to downstream msg,groupName:{}, topic:{}, "
+ + "bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}",
+ consumerGroup, topic, bizSeqNo, sendBackTimes,
+ sendBackFromEventMeshIp);
+
+ if (sendBackTimes >= eventMeshTCPServer
+ .getEventMeshTCPConfiguration().eventMeshTcpSendBackMaxTimes) {
+ logger.error(
+ "sendBack to broker over max times:{}, groupName:{}, topic:{}, "
+ + "bizSeqNo:{}", eventMeshTCPServer
+ .getEventMeshTCPConfiguration()
+ .eventMeshTcpSendBackMaxTimes,
+ consumerGroup, topic, bizSeqNo);
} else {
sendBackTimes++;
- message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, sendBackTimes.toString());
- message.getSystemProperties().put(EventMeshConstants.EVENTMESH_SEND_BACK_IP, eventMeshTCPConfiguration.eventMeshServerIp);
+ message.getSystemProperties()
+ .put(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES,
+ sendBackTimes.toString());
+ message.getSystemProperties()
+ .put(EventMeshConstants.EVENTMESH_SEND_BACK_IP,
+ eventMeshTCPConfiguration.eventMeshServerIp);
sendMsgBackToBroker(message, bizSeqNo);
}
} catch (Exception e) {
logger.warn("handle msg exception when no session found", e);
}
-// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
-// context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
}
DownStreamMsgContext downStreamMsgContext =
- new DownStreamMsgContext(message, session, persistentMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, subscriptionItem);
+ new DownStreamMsgContext(message, session, persistentMsgConsumer,
+ eventMeshAsyncConsumeContext.getAbstractContext(), false,
+ subscriptionItem);
//msg put in eventmesh,waiting client ack
session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
session.downstreamMsg(downStreamMsgContext);
-// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
-// context.ack();
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
}
};
@@ -690,7 +650,8 @@ public class ClientGroupWrapper {
return downstreamDispatchStrategy;
}
- public void setDownstreamDispatchStrategy(DownstreamDispatchStrategy downstreamDispatchStrategy) {
+ public void setDownstreamDispatchStrategy(
+ DownstreamDispatchStrategy downstreamDispatchStrategy) {
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
}
@@ -700,23 +661,25 @@ public class ClientGroupWrapper {
private String pushMsgToEventMesh(Message msg, String ip, int port) throws Exception {
StringBuilder targetUrl = new StringBuilder();
- targetUrl.append("http://").append(ip).append(":").append(port).append("/eventMesh/msg/push");
+ targetUrl.append("http://").append(ip).append(":").append(port)
+ .append("/eventMesh/msg/push");
HttpTinyClient.HttpResult result = null;
try {
- logger.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl.toString(), msg.toString());
+ logger.info("pushMsgToEventMesh,targetUrl:{},msg:{}", targetUrl.toString(),
+ msg.toString());
List<String> paramValues = new ArrayList<String>();
paramValues.add("msg");
- paramValues.add(JSON.toJSONString(msg));
+ paramValues.add(JsonUtils.serialize(msg));
paramValues.add("group");
paramValues.add(consumerGroup);
result = HttpTinyClient.httpPost(
- targetUrl.toString(),
- null,
- paramValues,
- "UTF-8",
- 3000);
+ targetUrl.toString(),
+ null,
+ paramValues,
+ "UTF-8",
+ 3000);
} catch (Exception e) {
logger.error("httpPost " + targetUrl + " is fail,", e);
//throw new RuntimeException("httpPost " + targetUrl + " is fail," , e);
@@ -727,7 +690,8 @@ public class ClientGroupWrapper {
return result.content;
} else {
- throw new Exception("httpPost targetUrl[" + targetUrl + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
+ throw new Exception("httpPost targetUrl[" + targetUrl
+ + "] is not OK when getContentThroughHttp, httpResult: " + result + ".");
}
}
@@ -742,22 +706,23 @@ public class ClientGroupWrapper {
long startTime = System.currentTimeMillis();
long taskExcuteTime = startTime;
- send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime), new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- logger.info("consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
- }
+ send(new UpStreamMsgContext(null, msg, null, startTime, taskExcuteTime),
+ new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ logger.info(
+ "consumerGroup:{} consume fail, sendMessageBack success, bizSeqno:{}, "
+ + "topic:{}", consumerGroup, bizSeqNo, topic);
+ }
- @Override
- public void onException(OnExceptionContext context) {
- logger.warn("consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", consumerGroup, bizSeqNo, topic);
- }
+ @Override
+ public void onException(OnExceptionContext context) {
+ logger.warn(
+ "consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{},"
+ + " topic:{}", consumerGroup, bizSeqNo, topic);
+ }
-// @Override
-// public void onException(Throwable e) {
-// logger.warn("consumerGroup:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", groupName, bizSeqNo, topic);
-// }
- });
+ });
eventMeshTcpMonitor.getEventMesh2mqMsgNum().incrementAndGet();
} catch (Exception e) {
logger.warn("try send msg back to broker failed");
diff --git a/eventmesh-schema-registry/build.gradle b/eventmesh-schema-registry/build.gradle
index 96a33a1..6a9c064 100644
--- a/eventmesh-schema-registry/build.gradle
+++ b/eventmesh-schema-registry/build.gradle
@@ -15,19 +15,3 @@
* limitations under the License.
*/
-plugins {
-}
-
-group 'org.apache.eventmesh'
-version '1.2.0-SNAPSHOT'
-
-repositories {
- mavenCentral()
-}
-
-dependencies {
-}
-
-test {
- useJUnitPlatform()
-}
diff --git a/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle b/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle
index d97e5d8..15e630e 100644
--- a/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle
+++ b/eventmesh-schema-registry/eventmesh-schema-registry-server/build.gradle
@@ -15,9 +15,6 @@
* limitations under the License.
*/
-group 'org.apache.eventmesh'
-version '1.2.0-SNAPSHOT'
-
configurations {
compileOnly {
extendsFrom annotationProcessor
diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle
index c11729d..e57a607 100644
--- a/eventmesh-sdk-java/build.gradle
+++ b/eventmesh-sdk-java/build.gradle
@@ -17,5 +17,23 @@
dependencies {
implementation project(":eventmesh-common")
+ implementation project(":eventmesh-common")
+
+ implementation "com.fasterxml.jackson.core:jackson-databind"
+ implementation "com.fasterxml.jackson.core:jackson-core"
+ implementation "com.fasterxml.jackson.core:jackson-annotations"
+
+ implementation "io.netty:netty-all"
+ implementation "org.apache.httpcomponents:httpclient"
+
testImplementation project(":eventmesh-common")
+ testImplementation project(":eventmesh-common")
+
+ testImplementation "com.fasterxml.jackson.core:jackson-databind"
+ testImplementation "com.fasterxml.jackson.core:jackson-core"
+ testImplementation "com.fasterxml.jackson.core:jackson-annotations"
+
+ testImplementation "io.netty:netty-all"
+ testImplementation "org.apache.httpcomponents:httpclient"
+
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index 61d3c48..7915d40 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -17,25 +17,6 @@
package org.apache.eventmesh.client.http.consumer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.*;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import io.netty.handler.codec.http.HttpMethod;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.client.http.AbstractLiteClient;
import org.apache.eventmesh.client.http.EventMeshRetObj;
import org.apache.eventmesh.client.http.RemotingServer;
@@ -57,12 +38,28 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
-import org.apache.eventmesh.common.protocol.tcp.Subscription;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import io.netty.handler.codec.http.HttpMethod;
+
public class LiteConsumer extends AbstractLiteClient {
public Logger logger = LoggerFactory.getLogger(LiteConsumer.class);
@@ -77,11 +74,13 @@ public class LiteConsumer extends AbstractLiteClient {
private LiteMessageListener messageListener;
- protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
+ protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4,
+ new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception {
super(liteClientConfig);
- this.consumeExecutor = ThreadPoolFactory.createThreadPoolExecutor(liteClientConfig.getConsumeThreadCore(),
+ this.consumeExecutor =
+ ThreadPoolFactory.createThreadPoolExecutor(liteClientConfig.getConsumeThreadCore(),
liteClientConfig.getConsumeThreadMax(), "eventMesh-client-consume-");
this.eventMeshClientConfig = liteClientConfig;
// this.remotingServer = new RemotingServer(10106, consumeExecutor);
@@ -100,7 +99,8 @@ public class LiteConsumer extends AbstractLiteClient {
@Override
public void start() throws Exception {
- Preconditions.checkState(eventMeshClientConfig != null, "eventMeshClientConfig can't be null");
+ Preconditions
+ .checkState(eventMeshClientConfig != null, "eventMeshClientConfig can't be null");
Preconditions.checkState(consumeExecutor != null, "consumeExecutor can't be null");
// Preconditions.checkState(messageListener != null, "messageListener can't be null");
logger.info("LiteConsumer starting");
@@ -134,15 +134,18 @@ public class LiteConsumer extends AbstractLiteClient {
String target = selectEventMesh();
String subRes = "";
- try (CloseableHttpClient httpClient = setHttpClient()){
+ try (CloseableHttpClient httpClient = setHttpClient()) {
subRes = HttpUtil.post(httpClient, target, subscribeParam);
}
if (logger.isDebugEnabled()) {
- logger.debug("subscribe message by await, targetEventMesh:{}, cost:{}ms, subscribeParam:{}, rtn:{}", target, System.currentTimeMillis() - startTime, JSON.toJSONString(subscribeParam), subRes);
+ logger.debug(
+ "subscribe message by await, targetEventMesh:{}, cost:{}ms, subscribeParam:{}, "
+ + "rtn:{}", target, System.currentTimeMillis() - startTime,
+ JsonUtils.serialize(subscribeParam), subRes);
}
- EventMeshRetObj ret = JSON.parseObject(subRes, EventMeshRetObj.class);
+ EventMeshRetObj ret = JsonUtils.deserialize(subRes, EventMeshRetObj.class);
if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
return Boolean.TRUE;
@@ -152,53 +155,58 @@ public class LiteConsumer extends AbstractLiteClient {
}
- private RequestParam generateSubscribeRequestParam(List<SubscriptionItem> topicList, String url) {
+ private RequestParam generateSubscribeRequestParam(List<SubscriptionItem> topicList,
+ String url) {
// final LiteMessage liteMessage = new LiteMessage();
// liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30))
// .setContent("subscribe message")
// .setUniqueId(RandomStringUtils.randomNumeric(30));
RequestParam requestParam = new RequestParam(HttpMethod.POST);
- requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.SUBSCRIBE.getRequestCode()))
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
- .addBody(SubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
- .addBody(SubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
- .addBody(SubscribeRequestBody.URL, url);
+ requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+ String.valueOf(RequestCode.SUBSCRIBE.getRequestCode()))
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+ .addBody(SubscribeRequestBody.TOPIC, JsonUtils.serialize(topicList))
+ .addBody(SubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+ .addBody(SubscribeRequestBody.URL, url);
return requestParam;
}
private RequestParam generateHeartBeatRequestParam(List<SubscriptionItem> topics, String url) {
List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = new ArrayList<>();
for (SubscriptionItem item : topics) {
- HeartbeatRequestBody.HeartbeatEntity heartbeatEntity = new HeartbeatRequestBody.HeartbeatEntity();
+ HeartbeatRequestBody.HeartbeatEntity heartbeatEntity =
+ new HeartbeatRequestBody.HeartbeatEntity();
heartbeatEntity.topic = item.getTopic();
heartbeatEntity.url = url;
heartbeatEntities.add(heartbeatEntity);
}
RequestParam requestParam = new RequestParam(HttpMethod.POST);
- requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.HEARTBEAT.getRequestCode()))
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
- .addBody(HeartbeatRequestBody.CLIENTTYPE, ClientType.SUB.name())
- .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
- .addBody(HeartbeatRequestBody.HEARTBEATENTITIES, JSON.toJSONString(heartbeatEntities));
+ requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+ String.valueOf(RequestCode.HEARTBEAT.getRequestCode()))
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+ .addBody(HeartbeatRequestBody.CLIENTTYPE, ClientType.SUB.name())
+ .addBody(HeartbeatRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+ .addBody(HeartbeatRequestBody.HEARTBEATENTITIES,
+ JsonUtils.serialize(heartbeatEntities));
return requestParam;
}
@@ -221,10 +229,12 @@ public class LiteConsumer extends AbstractLiteClient {
}
if (logger.isDebugEnabled()) {
- logger.debug("heartBeat message by await, targetEventMesh:{}, cost:{}ms, rtn:{}", target, System.currentTimeMillis() - startTime, res);
+ logger.debug(
+ "heartBeat message by await, targetEventMesh:{}, cost:{}ms, rtn:{}",
+ target, System.currentTimeMillis() - startTime, res);
}
- EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+ EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
@@ -239,7 +249,7 @@ public class LiteConsumer extends AbstractLiteClient {
public boolean unsubscribe(List<String> topicList, String url) throws Exception {
Set<String> unSub = new HashSet<>(topicList);
Iterator<SubscriptionItem> itr = subscription.iterator();
- while(itr.hasNext()) {
+ while (itr.hasNext()) {
SubscriptionItem item = itr.next();
if (unSub.contains(item.getTopic())) {
itr.remove();
@@ -257,10 +267,13 @@ public class LiteConsumer extends AbstractLiteClient {
}
if (logger.isDebugEnabled()) {
- logger.debug("unSubscribe message by await, targetEventMesh:{}, cost:{}ms, unSubscribeParam:{}, rtn:{}", target, System.currentTimeMillis() - startTime, JSON.toJSONString(unSubscribeParam), unSubRes);
+ logger.debug(
+ "unSubscribe message by await, targetEventMesh:{}, cost:{}ms, unSubscribeParam:{}, "
+ + "rtn:{}", target, System.currentTimeMillis() - startTime,
+ JsonUtils.serialize(unSubscribeParam), unSubRes);
}
- EventMeshRetObj ret = JSON.parseObject(unSubRes, EventMeshRetObj.class);
+ EventMeshRetObj ret = JsonUtils.deserialize(unSubRes, EventMeshRetObj.class);
if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
return Boolean.TRUE;
@@ -271,24 +284,26 @@ public class LiteConsumer extends AbstractLiteClient {
private RequestParam generateUnSubscribeRequestParam(List<String> topicList, String url) {
RequestParam requestParam = new RequestParam(HttpMethod.POST);
- requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.UNSUBSCRIBE.getRequestCode()))
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
- .addBody(UnSubscribeRequestBody.TOPIC, JSONObject.toJSONString(topicList))
- .addBody(UnSubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
- .addBody(UnSubscribeRequestBody.URL, url);
+ requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+ String.valueOf(RequestCode.UNSUBSCRIBE.getRequestCode()))
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, eventMeshClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, eventMeshClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, eventMeshClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, eventMeshClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, eventMeshClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshClientConfig.getPassword())
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+ .addBody(UnSubscribeRequestBody.TOPIC, JsonUtils.serialize(topicList))
+ .addBody(UnSubscribeRequestBody.CONSUMERGROUP, eventMeshClientConfig.getConsumerGroup())
+ .addBody(UnSubscribeRequestBody.URL, url);
return requestParam;
}
- public void registerMessageListener(LiteMessageListener messageListener) throws EventMeshException {
+ public void registerMessageListener(LiteMessageListener messageListener)
+ throws EventMeshException {
this.messageListener = messageListener;
remotingServer.registerMessageListener(this.messageListener);
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
index a240b95..376e6e6 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java
@@ -17,29 +17,11 @@
package org.apache.eventmesh.client.http.producer;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Preconditions;
-
-import io.netty.handler.codec.http.HttpMethod;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.client.http.AbstractLiteClient;
import org.apache.eventmesh.client.http.EventMeshRetObj;
import org.apache.eventmesh.client.http.conf.LiteClientConfig;
import org.apache.eventmesh.client.http.http.HttpUtil;
import org.apache.eventmesh.client.http.http.RequestParam;
-import org.apache.eventmesh.client.http.ssl.MyX509TrustManager;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.LiteMessage;
@@ -49,14 +31,21 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
+import io.netty.handler.codec.http.HttpMethod;
+
public class LiteProducer extends AbstractLiteClient {
public Logger logger = LoggerFactory.getLogger(LiteProducer.class);
@@ -70,7 +59,8 @@ public class LiteProducer extends AbstractLiteClient {
@Override
public void start() throws Exception {
Preconditions.checkState(liteClientConfig != null, "liteClientConfig can't be null");
- Preconditions.checkState(liteClientConfig.getLiteEventMeshAddr() != null, "liteClientConfig.liteServerAddr can't be null");
+ Preconditions.checkState(liteClientConfig.getLiteEventMeshAddr() != null,
+ "liteClientConfig.liteServerAddr can't be null");
if (started.get()) {
return;
}
@@ -100,27 +90,29 @@ public class LiteProducer extends AbstractLiteClient {
start();
}
Preconditions.checkState(StringUtils.isNotBlank(message.getTopic()),
- "eventMeshMessage[topic] invalid");
+ "eventMeshMessage[topic] invalid");
Preconditions.checkState(StringUtils.isNotBlank(message.getContent()),
- "eventMeshMessage[content] invalid");
+ "eventMeshMessage[content] invalid");
RequestParam requestParam = new RequestParam(HttpMethod.POST);
- requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
- .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
- .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
- .addBody(SendMessageRequestBody.CONTENT, message.getContent())
- .addBody(SendMessageRequestBody.TTL, message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
- .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
- .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
+ requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+ String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(Constants.DEFAULT_HTTP_TIME_OUT)
+ .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
+ .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
+ .addBody(SendMessageRequestBody.CONTENT, message.getContent())
+ .addBody(SendMessageRequestBody.TTL,
+ message.getPropKey(Constants.EVENTMESH_MESSAGE_CONST_TTL))
+ .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
+ .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
@@ -132,10 +124,10 @@ public class LiteProducer extends AbstractLiteClient {
if (logger.isDebugEnabled()) {
logger.debug("publish async message, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
- target, System.currentTimeMillis() - startTime, message, res);
+ target, System.currentTimeMillis() - startTime, message, res);
}
- EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+ EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
return Boolean.TRUE;
@@ -157,27 +149,28 @@ public class LiteProducer extends AbstractLiteClient {
start();
}
Preconditions.checkState(StringUtils.isNotBlank(message.getTopic()),
- "eventMeshMessage[topic] invalid");
+ "eventMeshMessage[topic] invalid");
Preconditions.checkState(StringUtils.isNotBlank(message.getContent()),
- "eventMeshMessage[content] invalid");
+ "eventMeshMessage[content] invalid");
RequestParam requestParam = new RequestParam(HttpMethod.POST);
- requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(timeout)
- .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
- .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
- .addBody(SendMessageRequestBody.CONTENT, message.getContent())
- .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
- .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
- .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
+ requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+ String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(timeout)
+ .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
+ .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
+ .addBody(SendMessageRequestBody.CONTENT, message.getContent())
+ .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
+ .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
+ .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
@@ -188,16 +181,18 @@ public class LiteProducer extends AbstractLiteClient {
}
if (logger.isDebugEnabled()) {
- logger.debug("publish sync message by await, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}", target, System.currentTimeMillis() - startTime, message, res);
+ logger.debug(
+ "publish sync message by await, targetEventMesh:{}, cost:{}ms, message:{}, rtn:{}",
+ target, System.currentTimeMillis() - startTime, message, res);
}
- EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+ EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
LiteMessage eventMeshMessage = new LiteMessage();
SendMessageResponseBody.ReplyMessage replyMessage =
- JSON.parseObject(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
+ JsonUtils.deserialize(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
eventMeshMessage.setContent(replyMessage.body).setProp(replyMessage.properties)
- .setTopic(replyMessage.topic);
+ .setTopic(replyMessage.topic);
return eventMeshMessage;
}
@@ -209,39 +204,42 @@ public class LiteProducer extends AbstractLiteClient {
start();
}
Preconditions.checkState(StringUtils.isNotBlank(message.getTopic()),
- "eventMeshMessage[topic] invalid");
+ "eventMeshMessage[topic] invalid");
Preconditions.checkState(StringUtils.isNotBlank(message.getContent()),
- "eventMeshMessage[content] invalid");
+ "eventMeshMessage[content] invalid");
Preconditions.checkState(ObjectUtils.allNotNull(rrCallback),
- "rrCallback invalid");
+ "rrCallback invalid");
RequestParam requestParam = new RequestParam(HttpMethod.POST);
- requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
- .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
- .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
- .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
- .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
- .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
- .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
- .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
- .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
- .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
- .setTimeout(timeout)
- .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
- .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
- .addBody(SendMessageRequestBody.CONTENT, message.getContent())
- .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
- .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
- .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
+ requestParam.addHeader(ProtocolKey.REQUEST_CODE,
+ String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()))
+ .addHeader(ProtocolKey.ClientInstanceKey.ENV, liteClientConfig.getEnv())
+ .addHeader(ProtocolKey.ClientInstanceKey.IDC, liteClientConfig.getIdc())
+ .addHeader(ProtocolKey.ClientInstanceKey.IP, liteClientConfig.getIp())
+ .addHeader(ProtocolKey.ClientInstanceKey.PID, liteClientConfig.getPid())
+ .addHeader(ProtocolKey.ClientInstanceKey.SYS, liteClientConfig.getSys())
+ .addHeader(ProtocolKey.ClientInstanceKey.USERNAME, liteClientConfig.getUserName())
+ .addHeader(ProtocolKey.ClientInstanceKey.PASSWD, liteClientConfig.getPassword())
+ .addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
+ .addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
+ .setTimeout(timeout)
+ .addBody(SendMessageRequestBody.PRODUCERGROUP, liteClientConfig.getProducerGroup())
+ .addBody(SendMessageRequestBody.TOPIC, message.getTopic())
+ .addBody(SendMessageRequestBody.CONTENT, message.getContent())
+ .addBody(SendMessageRequestBody.TTL, String.valueOf(timeout))
+ .addBody(SendMessageRequestBody.BIZSEQNO, message.getBizSeqNo())
+ .addBody(SendMessageRequestBody.UNIQUEID, message.getUniqueId());
long startTime = System.currentTimeMillis();
String target = selectEventMesh();
try (CloseableHttpClient httpClient = setHttpClient()) {
- HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
+ HttpUtil.post(httpClient, null, target, requestParam,
+ new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout));
}
if (logger.isDebugEnabled()) {
- logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, System.currentTimeMillis() - startTime, message);
+ logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target,
+ System.currentTimeMillis() - startTime, message);
}
}
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java
index aba948b..1dcefe5 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/RRCallbackResponseHandlerAdapter.java
@@ -17,27 +17,30 @@
package org.apache.eventmesh.client.http.producer;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-
-import com.alibaba.fastjson.JSON;
-
-import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.client.http.EventMeshRetObj;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.LiteMessage;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
+import org.apache.eventmesh.common.utils.JsonUtils;
+
+import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * RRCallbackResponseHandlerAdapter.
+ */
public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String> {
public Logger logger = LoggerFactory.getLogger(RRCallbackResponseHandlerAdapter.class);
@@ -50,7 +53,8 @@ public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String>
private long timeout;
- public RRCallbackResponseHandlerAdapter(LiteMessage liteMessage, RRCallback rrCallback, long timeout) {
+ public RRCallbackResponseHandlerAdapter(LiteMessage liteMessage, RRCallback rrCallback,
+ long timeout) {
this.liteMessage = liteMessage;
this.rrCallback = rrCallback;
this.timeout = timeout;
@@ -58,26 +62,29 @@ public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String>
}
@Override
- public String handleResponse(HttpResponse response) throws ClientProtocolException, IOException {
+ public String handleResponse(HttpResponse response)
+ throws ClientProtocolException, IOException {
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
rrCallback.onException(new EventMeshException(response.toString()));
return response.toString();
}
if (System.currentTimeMillis() - createTime > timeout) {
- String err = String.format("response too late, bizSeqNo=%s, uniqueId=%s, createTime=%s, ttl=%s, cost=%sms",
- liteMessage.getBizSeqNo(),
- liteMessage.getUniqueId(),
- DateFormatUtils.format(createTime, Constants.DATE_FORMAT),
- timeout,
- System.currentTimeMillis() - createTime);
+ String err = String.format(
+ "response too late, bizSeqNo=%s, uniqueId=%s, createTime=%s, ttl=%s, cost=%sms",
+ liteMessage.getBizSeqNo(),
+ liteMessage.getUniqueId(),
+ DateFormatUtils.format(createTime, Constants.DATE_FORMAT),
+ timeout,
+ System.currentTimeMillis() - createTime);
logger.warn(err);
rrCallback.onException(new EventMeshException(err));
return err;
}
- String res = EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
- EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
+ String res =
+ EntityUtils.toString(response.getEntity(), Charset.forName(Constants.DEFAULT_CHARSET));
+ EventMeshRetObj ret = JsonUtils.deserialize(res, EventMeshRetObj.class);
if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
rrCallback.onException(new EventMeshException(ret.getRetCode(), ret.getRetMsg()));
return res;
@@ -86,9 +93,9 @@ public class RRCallbackResponseHandlerAdapter implements ResponseHandler<String>
LiteMessage liteMessage = new LiteMessage();
try {
SendMessageResponseBody.ReplyMessage replyMessage =
- JSON.parseObject(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
+ JsonUtils.deserialize(ret.getRetMsg(), SendMessageResponseBody.ReplyMessage.class);
liteMessage.setContent(replyMessage.body).setProp(replyMessage.properties)
- .setTopic(replyMessage.topic);
+ .setTopic(replyMessage.topic);
rrCallback.onSuccess(liteMessage);
} catch (Exception ex) {
rrCallback.onException(new EventMeshException(ex));
diff --git a/eventmesh-security-plugin/eventmesh-security-acl/build.gradle b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
index 1a18796..1ee3c75 100644
--- a/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- api project(":eventmesh-security-plugin:eventmesh-security-api")
+ implementation project(":eventmesh-security-plugin:eventmesh-security-api")
testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
index 0d41042..926cdba 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- api project(":eventmesh-spi")
+ compileOnly project(":eventmesh-spi")
testImplementation project(":eventmesh-spi")
}
diff --git a/eventmesh-spi/build.gradle b/eventmesh-spi/build.gradle
index f5ce59f..7c61ebc 100644
--- a/eventmesh-spi/build.gradle
+++ b/eventmesh-spi/build.gradle
@@ -15,6 +15,6 @@
* limitations under the License.
*/
dependencies {
- api project(":eventmesh-common")
+ compileOnly project(":eventmesh-common")
testImplementation project(":eventmesh-common")
}
\ No newline at end of file
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
index 3692bc1..0ac3d40 100644
--- a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/loader/JarExtensionClassLoader.java
@@ -17,12 +17,8 @@
package org.apache.eventmesh.spi.loader;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
import org.apache.eventmesh.spi.EventMeshSPI;
import org.apache.eventmesh.spi.ExtensionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -31,6 +27,7 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -38,6 +35,12 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
/**
* Load extension from '${eventMeshPluginDir}', the default loading directory is './plugin'
*/
@@ -46,30 +49,36 @@ public class JarExtensionClassLoader implements ExtensionClassLoader {
private static final Logger logger = LoggerFactory.getLogger(JarExtensionClassLoader.class);
private static final ConcurrentHashMap<Class<?>, Map<String, Class<?>>> EXTENSION_CLASS_CACHE =
- new ConcurrentHashMap<>(16);
+ new ConcurrentHashMap<>(16);
- private static final String EVENTMESH_EXTENSION_PLUGIN_DIR = System.getProperty("eventMeshPluginDir",
+ private static final String EVENTMESH_EXTENSION_PLUGIN_DIR =
+ System.getProperty("eventMeshPluginDir",
Joiner.on(File.separator).join(Lists.newArrayList(".", "plugin")));
// META-INF/eventmesh
- private static final String EVENTMESH_EXTENSION_META_DIR = Paths.get("META-INF", "eventmesh").toString();
+ private static final String EVENTMESH_EXTENSION_META_DIR =
+ Paths.get("META-INF", "eventmesh").toString();
@Override
- public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType, String extensionInstanceName) {
- return EXTENSION_CLASS_CACHE.computeIfAbsent(extensionType, t -> doLoadExtensionClass(t, extensionInstanceName));
+ public <T> Map<String, Class<?>> loadExtensionClass(Class<T> extensionType,
+ String extensionInstanceName) {
+ return EXTENSION_CLASS_CACHE
+ .computeIfAbsent(extensionType, t -> doLoadExtensionClass(t, extensionInstanceName));
}
- private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType, String extensionInstanceName) {
+ private <T> Map<String, Class<?>> doLoadExtensionClass(Class<T> extensionType,
+ String extensionInstanceName) {
Map<String, Class<?>> extensionMap = new HashMap<>(16);
- EventMeshSPI eventMeshSPIAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
+ EventMeshSPI eventMeshSpiAnnotation = extensionType.getAnnotation(EventMeshSPI.class);
String pluginDir = Paths.get(
- EVENTMESH_EXTENSION_PLUGIN_DIR,
- eventMeshSPIAnnotation.eventMeshExtensionType().getExtensionTypeName(),
- extensionInstanceName
+ EVENTMESH_EXTENSION_PLUGIN_DIR,
+ eventMeshSpiAnnotation.eventMeshExtensionType().getExtensionTypeName(),
+ extensionInstanceName
).toString();
- String extensionFileName = EVENTMESH_EXTENSION_META_DIR + File.separator + extensionType.getName();
+ String extensionFileName =
+ EVENTMESH_EXTENSION_META_DIR + File.separator + extensionType.getName();
EventMeshUrlClassLoader urlClassLoader = EventMeshUrlClassLoader.getInstance();
urlClassLoader.addUrls(loadJarPathFromResource(pluginDir));
try {
@@ -106,10 +115,15 @@ public class JarExtensionClassLoader implements ExtensionClassLoader {
pluginUrls.addAll(loadJarPathFromResource(file.getPath()));
}
}
+ // TODO: Sort the path here just to guarantee load the ConsumeMessageConcurrentlyService
+ // defined in EventMesh rather than defined in rocketmq
+ pluginUrls.sort(Comparator.comparing(URL::getPath));
return pluginUrls;
}
- private static <T> Map<String, Class<?>> loadResources(URLClassLoader urlClassLoader, URL url, Class<T> extensionType) throws IOException {
+ private static <T> Map<String, Class<?>> loadResources(URLClassLoader urlClassLoader, URL url,
+ Class<T> extensionType)
+ throws IOException {
Map<String, Class<?>> extensionMap = new HashMap<>();
try (InputStream inputStream = url.openStream()) {
Properties properties = new Properties();
@@ -119,11 +133,13 @@ public class JarExtensionClassLoader implements ExtensionClassLoader {
String extensionClassStr = (String) extensionClass;
try {
Class<?> targetClass = urlClassLoader.loadClass(extensionClassStr);
- logger.info("load extension class success, extensionType: {}, extensionClass: {}",
+ logger
+ .info("load extension class success, extensionType: {}, extensionClass: {}",
extensionType, targetClass);
if (!extensionType.isAssignableFrom(targetClass)) {
throw new ExtensionException(
- String.format("class: %s is not subClass of %s", targetClass, extensionType));
+ String.format("class: %s is not subClass of %s", targetClass,
+ extensionType));
}
extensionMap.put(extensionNameStr, targetClass);
} catch (ClassNotFoundException e) {
diff --git a/style/checkStyle.xml b/style/checkStyle.xml
index 98ec78f..8c0a35d 100644
--- a/style/checkStyle.xml
+++ b/style/checkStyle.xml
@@ -327,20 +327,20 @@
<module name="AtclauseOrder">
<property name="tagOrder" value="@param, @return, @throws, @deprecated"/>
<property name="target"
- value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
+ value="CLASS_DEF, ENUM_DEF, METHOD_DEF, VARIABLE_DEF"/>
</module>
<module name="JavadocMethod">
<property name="accessModifiers" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
<property name="allowedAnnotations" value="Override, Test"/>
- <property name="tokens" value="METHOD_DEF, CTOR_DEF, ANNOTATION_FIELD_DEF, COMPACT_CTOR_DEF"/>
+ <property name="tokens" value="METHOD_DEF, ANNOTATION_FIELD_DEF"/>
</module>
<module name="MissingJavadocMethod">
<property name="scope" value="public"/>
<property name="minLineCount" value="2"/>
<property name="allowedAnnotations" value="Override, Test"/>
- <property name="tokens" value="METHOD_DEF, CTOR_DEF, ANNOTATION_FIELD_DEF,
+ <property name="tokens" value="METHOD_DEF,
COMPACT_CTOR_DEF"/>
</module>
<module name="MissingJavadocType">
@@ -362,11 +362,6 @@
<module name="CommentsIndentation">
<property name="tokens" value="SINGLE_LINE_COMMENT, BLOCK_COMMENT_BEGIN"/>
</module>
- <module name="SuppressionXpathFilter">
- <property name="file" value="${org.checkstyle.google.suppressionxpathfilter.config}"
- default="checkstyle-xpath-suppressions.xml" />
- <property name="optional" value="true"/>
- </module>
</module>
</module>
diff --git a/tool/license/allowed-licenses.txt b/tool/license/allowed-licenses.txt
index 976d6c3..580554d 100644
--- a/tool/license/allowed-licenses.txt
+++ b/tool/license/allowed-licenses.txt
@@ -24,7 +24,7 @@
},
{
"moduleLicense": "Apache License, Version 2.0",
- "moduleVersion": "1.2.71",
+ "moduleVersion": "1.2.69",
"moduleName": "com.alibaba:fastjson"
},
{
@@ -372,7 +372,7 @@
},
{
"moduleLicense": "Apache License 2.0",
- "moduleVersion": "3.24.0-GA",
+ "moduleVersion": "3.20.0-GA",
"moduleName": "org.javassist:javassist"
},
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org