You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2021/07/12 08:11:34 UTC
[incubator-eventmesh] branch develop updated: Merge 1.3.0 to
develop. (#430)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new b7e7172 Merge 1.3.0 to develop. (#430)
b7e7172 is described below
commit b7e71724a794a371dab92b46d70c95ac50d446f6
Author: Zonglei Dong <do...@apache.org>
AuthorDate: Mon Jul 12 16:11:26 2021 +0800
Merge 1.3.0 to develop. (#430)
* merge develop into 1.3.0 branch (#401)
* upgrade gradle to 6.8.3
* [ISSUE #265]rename wemq and access to eventmesh
* [ISSUE #265]rename proxy field to eventmesh
* merge conflict
* remove warn
* [ISSUE #265] Specification of code structure and file naming
* remove eventmesh-registry module
* enabled http/tcp monitor logs
* Update README.md
* Changed com.webank to org.apache
Changed 'eventmesh-connector-api' code package name to apache.
* issue #277:refactor eventmesh-common package with org.apache
* issue #277:rename package with org.apache
* Refactor 'eventmesh-connector-rocketmq' package name to org.apache
* Update README.zh-CN.md
* [ISSUE #282]Refactor 'eventmesh-starter' package name to org.apache
* Update codeStyle.xml
* Refactor 'eventmesh-test' package name to org.apache #283
* Refactor 'eventmesh-test' package name to org.apache #283
* Refactor 'eventmesh-test' package name to org.apache #283
* Refactor 'eventmesh-test' package name to org.apache #283
* Refactor 'eventmesh-test' package name to org.apache #283
* refactor runtime module package com.webank to org.apache
* refactor(eventmesh-sdk-java):rename to org.apache(#281)
* [ISSUE #281]refactor(eventmesh-sdk-java):rename to org.apache
* add licenses of apache for runtime module
* add NOTICE
* bugfix for event-mesh-test module
* change package name to org.apache
* format README.md
* change package name to org.apache
* fix ISSUE #296:add licenses in each source file under the eventmesh-sdk-java
* [ISSUE #294]add licenses in file under eventmesh-connector-rocketmq module
* [ISSUE #293]Lack of licenses in each source file under the eventmesh-connector-api module
* [ISSUE #298]Lack of licenses in each source file under the eventmesh-test module
* [ISSUE #297]Lack of licenses in each source file under the eventmesh-starter module
* Create .asf.yaml
* Update .asf.yaml
* Update .asf.yaml (#316)
* Update README.md
* Update .asf.yaml
* bugfix build.gradle tar task (#318)
Co-authored-by: jonyang(杨军) <jo...@webank.com>
* [ISSUE #322] Rename package name "com.webank.eventmesh" to "org.apache.eventmesh" (#319)
* rename org.apache.runtime to com.webank.runtime
* rename com.webank.eventmesh to org.apache.eventmesh
* fix(docs): change the travis location
* Create DISCLAIMER-WIP
* Delete CNAME
* Delete _config.yml
* Delete package.json
* Add files via upload
* Add files via upload
* Delete eventmesh-multi-runtime.jpg
* Update README.md
* Update eventmesh-runtime-quickstart.md
* Update README.zh-CN.md
* Update eventmesh-runtime-quickstart.zh-CN.md
* [ISSUE #325]Update gradle configuration for publishing package to maven repository (#326)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* upgrade gradle to 7.0 and fix test bug. (#327)
* bugfix build.gradle tar task
* merge
* upgrade to gradle 7.0
* bugfix gradle task spotbugs
* bugfix eventmesh-connector-rocketmq testImplementation
* upgrade to gradle7.0
* refactor runtime module test and spotbugs error
* bugfix sign task
Co-authored-by: jonyang(杨军) <jo...@webank.com>
Co-authored-by: jonyangx <jo...@gmail.com>
* remove unused files
Signed-off-by: qqeasonchen <qq...@gmail.com>
* update build.gradle and gradle.properties for publish to maven repository (#330)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* Update README.md
* Update README.zh-CN.md
* update quickstart md files for gradle version (#332)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* remove dead docs
Signed-off-by: qqeasonchen <qq...@gmail.com>
* [ISSUE #329]Missing Log4j dependency (#336)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* [ISSUE #329]Missing Log4j dependency
* [ISSUE #331] Fix dead links in docs (#334)
fixed #331
* Doc modification #328 (#335)
change vm params
* Update README.md
* [Issue #337] Fix Http Test Subscriber startup issue by moving the Thread.sleep into the child thread (#338)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
Co-authored-by: j00441484 <ji...@huawei.com>
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook (#343)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Address code review comment for Subscriber Demo App
Co-authored-by: j00441484 <ji...@huawei.com>
* [ISSUE #348] Setup automated workflows for greetings (#347)
* Setup automated workflows for greetings
* Remove '@apache/eventmesh-committers'
* Add LGTM Badges ISSUE#353 (#354)
LGTM is a variant analysis platform that automatically checks code for real CVEs and vulnerabilities. Learn more at https://lgtm.com/help/lgtm/about-lgtm .
Here are some alerts in our project reported by LGTM: https://lgtm.com/projects/g/apache/incubator-eventmesh/alerts/?mode=list
I'd like to add LGTM badges in the README.md, it makes easier for people who want to get alerts and then contribute to EventMesh.
* [ISSUE #355] Setup Github workflows for CodeQL scans (#356)
* Setup CodeQL scans
* disable autorun
* add a step for setting up JDK
* add codeql
* fix step Build
* fix strategy
* add events: schedule & workflow_dispatch
* [Issue #344] Fixing racing condition issue in SubscribeProcessor and UnSubscribeProcessor (#345)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #344] Fixing racing condition issue in SubscribeProcessor and UnSubscribeProcessor
* [Issue #344] Fix import statements
* [Issue #337] Address code review comment for Subscriber Demo App
* [Issue #344] Enhance client registration logic in SubscribeProcessor and UnsubscriberProcessor
* [Issue #344] Minor code clean up in SubscribeProcessor and UnsubscriberProcessor
* [Issue #344] Fix NullPointerException in ConsumerManager occurs during subscribe/unsunscribe iteration testing
* [Issue #344] Fix bugs in subscribe/unsunscribe code path
* [Issue #344] use client.pid instead of client.ip for client comparasion in UnSubscribeProcessor
Co-authored-by: j00441484 <ji...@huawei.com>
* update eventmesh-runtime.png (#358)
* update eventmesh-runtime.png
* [Issue #333] Support multiple load balance strategy in sdk (#342)
* Support multiple load balance strategy in sdk #333
* Fix ut
* add log
* update eventmesh-panels.png (#362)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* [ISSUE #329]Missing Log4j dependency
* update eventmesh-runtime.png
* update eventmesh-panels.png
* update eventmesh-panels.png (#363)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* [ISSUE #329]Missing Log4j dependency
* update eventmesh-runtime.png
* update eventmesh-panels.png
* Migrate CI to Github Actions and enable coverage report (#365)
* add: requirements for lightweight EventMesh SDK with CloudEvents (#370)
This commit only includes a brief introduction and requirements.
Design details can be followed up in a later commit.
Signed-off-by: Yuzhou Mao <my...@umich.edu>
* Add files via upload
* Update README.md
* [Issue #368] Fix Racing condition and memory leak issue in EventMesh SDK LiteConsumer and LiteProducer (#369)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Address code review comment for Subscriber Demo App
* [Issue #368] Fix Racing condition and memory leak issue in EventMesh SDK LiteConsumer and LiteProducer
* [Issue #368] fix build issue
* [Issue #368] use try with resource statement for HttpClient
* [Issue #368] fix TLS1.1 and use TLS1.2 in HttpClient
Co-authored-by: j00441484 <ji...@huawei.com>
* [ISSUE #350]optimize flow control in downstreaming msg (#352)
* modify:optimize flow control in downstreaming msg
* modify:optimize stategy of selecting session in downstream msg
* modify:optimize msg downstream,msg store in session
* modify:fix bug:not a @Sharable handler
* [ISSUE #380] Remove gitee-mirror.yml from Github workflows (#381)
* Update README.md
* [ISSUE #310] add github action for check license (#313)
* add github action for check license
* fix syntax and name ci for Check license
* fix github action branch typo
* [ISSUE #310] Enable Github Actions for license check and fix license headers (#377)
* add github action for check license
* fix syntax and name ci for Check license
* fix github action branch typo
* enable github actions for license check
* add necessary headers
* update badges
Co-authored-by: Lan Liang <gc...@gmail.com>
* [Issue #382] Fix java.lang.NumberFormatException when parsing Long (#383)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Address code review comment for Subscriber Demo App
* [Issue #368] Fix Racing condition and memory leak issue in EventMesh SDK LiteConsumer and LiteProducer
* [Issue #368] fix build issue
* [Issue #368] use try with resource statement for HttpClient
* [Issue #368] fix TLS1.1 and use TLS1.2 in HttpClient
* [Issue #382] Fix java.lang.NumberFormatException when parsing Long
* [Issue #382] Fix java.lang.NumberFormatException when parsing Integer
Co-authored-by: j00441484 <ji...@huawei.com>
* [ISSUE #378] downstream broadcast msg asynchronously (#379)
* modify:optimize flow control in downstreaming msg
* modify:optimize stategy of selecting session in downstream msg
* modify:optimize msg downstream,msg store in session
* modify:fix bug:not a @Sharable handler
* modify:downstream broadcast msg asynchronously
closed #378
* [ISSUE #359] Split handler from controller (#359) (#360)
* [ISSUE #359] Split handler from controller (#359)
* add license header
* add ut
* [ISSUE #384] RedirectClientByIpPortHandlerTest.java doesn't have the Apache license header (#385)
close #384
* Update README.md
* Update README.zh-CN.md
* Update README.zh-CN.md
* Update README.zh-CN.md
* [Issue #386] fixing ConsumerGroup Queue Consumer Offset not synced up issue (#387)
* [Issue #337] Fix HttpSubscriber startup issue
* [Issue #337] test commit
* [Issue #337] revert test commit
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook
* [Issue #337] Address code review comment for Subscriber Demo App
* [Issue #386] fixing ConsumerGroup Queuen Consumer Offset not synced up issue
* [Issue #386] adding license header to new file
* [Issue #386] Fix license header missing issue
Co-authored-by: j00441484 <ji...@huawei.com>
* [ISSUE #366 ] remove custom-format topic concept (#388)
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* remove custom-format topic concept
* [ISSUE #366] remove custom concept [dcn&®ion] (#390)
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
close #366
* [ISSUE #391] Optimize interface design in eventmesh-connector-api (#392)
* modify:optimize flow control in downstreaming msg
* modify:optimize stategy of selecting session in downstream msg
* modify:optimize msg downstream,msg store in session
* modify:fix bug:not a @Sharable handler
* modify:downstream broadcast msg asynchronously
* modify:remove unneccessary interface in eventmesh-connector-api
* modify:fix conflict
* modify:add license in EventMeshAction
close #391
* miss group name set for userAgent (#395)
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* remove custom concept{dcn}
* bugfix : miss group set
* bugfix : miss group set
* [ISSUE #393]:perf topic name in test file (#394)
close #393
* support unsubscribe topics while delconsumer in http mode (#396)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* [ISSUE #329]Missing Log4j dependency
* update eventmesh-runtime.png
* support unsubscribe topics while delconsumer in http mode
* [ISSUE #397]Remove subscription session failed error (#398)
* [ISSUE #325]Update gradle configuration for publishing package to maven repository
* update build.gradle
* update build.gradle and gradle.properties
* update build.gradle and gradle.properties for publish to maven repository
* * update gradle version for instructions
* fix: dist task exception
* [ISSUE #329]Missing Log4j dependency
* update eventmesh-runtime.png
* support unsubscribe topics while delconsumer in http mode
* [ISSUE #397]Remove subscription session failed error
* [ISSUE #397]Remove subscription session failed error
close #397
Co-authored-by: jonyang(杨军) <jo...@webank.com>
Co-authored-by: MajorHe1 <he...@gmail.com>
Co-authored-by: mike_xwm <mi...@126.com>
Co-authored-by: Eason Chen <qq...@gmail.com>
Co-authored-by: Heng Du <du...@apache.org>
Co-authored-by: Udesh Liyanaarachchi <>
Co-authored-by: keranbingaa <39...@qq.com>
Co-authored-by: sunxi <su...@163.com>
Co-authored-by: sanchen <sa...@chenyiliu.com>
Co-authored-by: surilli(李慧敏) <su...@webank.com>
Co-authored-by: Lan Liang <gc...@gmail.com>
Co-authored-by: zhangxiaopengmm <xi...@163.com>
Co-authored-by: nanoxiong <xi...@163.com>
Co-authored-by: chenyi19851209 <40...@qq.com>
Co-authored-by: yangjun <ya...@gmail.com>
Co-authored-by: Steve Yurong Su <st...@outlook.com>
Co-authored-by: von gosling <vo...@apache.org>
Co-authored-by: jonyangx <jo...@gmail.com>
Co-authored-by: ruanwenjun <86...@qq.com>
Co-authored-by: jinrongluo <ka...@gmail.com>
Co-authored-by: j00441484 <ji...@huawei.com>
Co-authored-by: Yuzhou Mao <yu...@gmail.com>
Co-authored-by: lrhkobe <34...@users.noreply.github.com>
Co-authored-by: Steve Yurong Su <ro...@apache.org>
Co-authored-by: Lan <li...@163.com>
* [ISSUE #411] Enable CI workflows running on [0-9]+.[0-9]+.[0-9]+** branches (#413)
* [Issues #405]code polish and fix typo (#404)
* code polish and fix typo
* merge remote 1.3.0
* [ISSUE #374] Add unit test class. (#402)
* [ISSUE #374] add unit test for LiteMessage class.
* [ISSUE #374] add unit test for HttpCommand class.
* [ISSUE #374] add unit test for httpResponse method with REQ cmd type.
* [ISSUE-#374] Add unit test class. (#414)
* [ISSUE #374] add unit test for CommonConfiguration class.
* [ISSUE #374] add unit test for ConfigurationWraper class.
* [ISSUE #374] add unit test for Weight class.
* [ISSUE #374] add unit test for CommonConfiguration class.
* [ISSUE #367]Enhance SPI plugins (#419)
* [ISSUE #374] add unit test for http protocol header client class. (#420)
* fix typo (#423)
* [ISSUE #418]Refactor the plugin load code (#421)
* [ISSUE #418]Refactor the plugin load code
* fix ut
* [ISSUE #405]modify the doc (#424)
* modify the doc
* modify the doc
* Merege 1.3.0 to develop.
* Merege 1.3.0 to develop.
* Merge 1.3.0 to develop, fix check style problem.
Co-authored-by: wqliang <wq...@users.noreply.github.com>
Co-authored-by: jonyang(杨军) <jo...@webank.com>
Co-authored-by: MajorHe1 <he...@gmail.com>
Co-authored-by: mike_xwm <mi...@126.com>
Co-authored-by: Eason Chen <qq...@gmail.com>
Co-authored-by: Heng Du <du...@apache.org>
Co-authored-by: keranbingaa <39...@qq.com>
Co-authored-by: sunxi <su...@163.com>
Co-authored-by: sanchen <sa...@chenyiliu.com>
Co-authored-by: surilli(李慧敏) <su...@webank.com>
Co-authored-by: Lan Liang <gc...@gmail.com>
Co-authored-by: zhangxiaopengmm <xi...@163.com>
Co-authored-by: nanoxiong <xi...@163.com>
Co-authored-by: chenyi19851209 <40...@qq.com>
Co-authored-by: yangjun <ya...@gmail.com>
Co-authored-by: Steve Yurong Su <st...@outlook.com>
Co-authored-by: von gosling <vo...@apache.org>
Co-authored-by: jonyangx <jo...@gmail.com>
Co-authored-by: ruanwenjun <86...@qq.com>
Co-authored-by: jinrongluo <ka...@gmail.com>
Co-authored-by: j00441484 <ji...@huawei.com>
Co-authored-by: Yuzhou Mao <yu...@gmail.com>
Co-authored-by: lrhkobe <34...@users.noreply.github.com>
Co-authored-by: Steve Yurong Su <ro...@apache.org>
Co-authored-by: Lan <li...@163.com>
Co-authored-by: YuDong Tang <58...@qq.com>
---
.../eventmesh-runtime-quickstart.zh-CN.md | 34 +++---
.../instructions/eventmesh-runtime-quickstart.md | 36 ++++---
docs/images/project-structure.png | Bin 63401 -> 54777 bytes
eventmesh-common/gradle.properties | 2 +-
.../org/apache/eventmesh/common/ThreadUtil.java | 2 +-
.../common/config/CommonConfiguration.java | 105 ++----------------
.../loadbalance/RandomLoadBalanceSelector.java | 2 +-
.../WeightRoundRobinLoadBalanceSelector.java | 2 +-
.../apache/eventmesh/common/LiteMessageTest.java | 74 +++++++++++++
.../eventmesh/common/command/HttpCommandTest.java | 99 +++++++++++++++++
.../common/config/CommonConfigurationTest.java | 32 +++---
.../common/config/ConfigurationWraperTest.java | 29 ++---
.../eventmesh/common/loadbalance/WeightTest.java | 49 +++++----
.../protocol/http/body/BaseResponseBodyTest.java | 47 ++++----
.../http/header/BaseRequestHeaderTest.java | 33 +++---
.../http/header/BaseResponseHeaderTest.java | 21 ++--
.../header/client/AbstractRequestHeaderTest.java | 38 +++++++
.../header/client/AbstractResponseHeaderTest.java | 40 +++++++
.../header/client/HeartbeatRequestHeaderTest.java | 19 ++--
.../header/client/HeartbeatResponseHeaderTest.java | 21 ++--
.../http/header/client/RegRequestHeaderTest.java | 18 ++--
.../http/header/client/RegResponseHeaderTest.java | 17 +--
.../header/client/SubscribeRequestHeaderTest.java | 18 ++--
.../header/client/SubscribeResponseHeaderTest.java | 18 ++--
.../http/header/client/UnRegRequestHeaderTest.java | 19 ++--
.../header/client/UnRegResponseHeaderTest.java | 17 +--
.../client/UnSubscribeRequestHeaderTest.java | 18 ++--
.../client/UnSubscribeResponseHeaderTest.java | 18 ++--
.../src/test/resources/configuration.properties | 13 ++-
eventmesh-connector-api/build.gradle | 4 +-
eventmesh-connector-api/gradle.properties | 2 +-
.../eventmesh/api/consumer/MeshMQPushConsumer.java | 2 +
.../eventmesh/api/producer/MeshMQProducer.java | 2 +
.../rocketmq/config/ClientConfiguration.java | 16 +--
.../connector/rocketmq/utils/OMSUtil.java | 6 ++
...pache.eventmesh.api.consumer.MeshMQPushConsumer | 7 +-
...rg.apache.eventmesh.api.producer.MeshMQProducer | 7 +-
eventmesh-runtime/build.gradle | 4 +-
eventmesh-runtime/conf/eventmesh.properties | 5 +-
.../configuration/EventMeshHTTPConfiguration.java | 20 ++--
.../runtime/core/plugin/MQConsumerWrapper.java | 8 ++
.../runtime/core/plugin/MQProducerWrapper.java | 10 +-
.../runtime/core/plugin/PluginFactory.java | 39 +++++++
.../protocol/http/consumer/EventMeshConsumer.java | 6 +-
.../protocol/http/producer/EventMeshProducer.java | 4 +-
.../tcp/client/group/ClientGroupWrapper.java | 12 ++-
.../client/http/consumer/LiteConsumer.java | 7 +-
.../client/tcp/common/EventMeshCommon.java | 2 +-
.../eventmesh/client/tcp/common/TcpClient.java | 6 +-
.../client/tcp/impl/SimplePubClientImpl.java | 11 +-
.../client/tcp/impl/SimpleSubClientImpl.java | 8 +-
settings.gradle => eventmesh-spi/build.gradle | 7 +-
.../gradle.properties | 3 +-
.../eventmesh/spi/EventMeshExtensionFactory.java | 46 ++++----
.../eventmesh/spi/EventMeshExtensionLoader.java | 118 +++++++++++++++++++++
.../org/apache/eventmesh/spi/EventMeshSPI.java | 22 ++--
.../apache/eventmesh/spi/ExtensionException.java | 20 ++--
.../spi/EventMeshExtensionFactoryTest.java | 18 ++--
.../java/org/apache/eventmesh/spi/ExtensionA.java | 13 +--
.../org/apache/eventmesh/spi/TestExtension.java | 11 +-
.../org.apache.eventmesh.spi.TestExtension | 6 +-
.../eventmesh/http/demo/SyncRequestInstance.java | 11 +-
settings.gradle | 9 +-
63 files changed, 878 insertions(+), 435 deletions(-)
diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
index adc9703..6ae41fa 100644
--- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
+++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
@@ -44,7 +44,7 @@ sh start.sh
### 2.1 依赖
-同上述步骤 1.1
+同上述步骤 1.1,但是只能在JDK 1.8下构建
### 2.2 下载源码
@@ -62,32 +62,32 @@ sh start.sh
- eventmesh-runtime : eventmesh运行时模块
- eventmesh-sdk-java : eventmesh java客户端sdk
- eventmesh-starter : eventmesh本地启动运行项目入口
+- eventmesh-spi : eventmesh SPI加载模块
-> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 下配置相关接口与实现类的映射文件
+> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件
-**2.3.2 配置VM启动参数**
+**2.3.2 配置插件**
-```java
--Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml
--Deventmesh.log.home=eventmesh-runtime/logs
--Deventmesh.home=eventmesh-runtime
--DconfPath=eventmesh-runtime/conf
-```
-> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
+在`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件
-**2.3.3 配置build.gradle文件**
+修改`confPath`目录下面的`eventMesh.properties`文件
-通过修改dependencies,compile project 项来指定项目启动后加载的插件
+加载**RocketMQ Connector**插件配置:
-修改`eventmesh-starter`模块下面的`build.gradle`文件
+```java
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
+```
-加载**RocketMQ**插件配置:
+**2.3.3 配置VM启动参数**
```java
-dependencies {
- compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-}
+-Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml
+-Deventmesh.log.home=eventmesh-runtime/logs
+-Deventmesh.home=eventmesh-runtime
+-DconfPath=eventmesh-runtime/conf
```
+> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
**2.3.4 启动运行**
diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md
index 0536e5b..62b315c 100644
--- a/docs/en/instructions/eventmesh-runtime-quickstart.md
+++ b/docs/en/instructions/eventmesh-runtime-quickstart.md
@@ -44,7 +44,7 @@ If you see "EventMeshTCPServer[port=10000] started....", you setup runtime succe
### 2.1 dependencies
-Same with 1.1
+Same with 1.1, but it can be only compiled in JDK 1.8
### 2.2 download sources
@@ -62,33 +62,35 @@ Same with 1.2
- eventmesh-runtime : eventmesh runtime module
- eventmesh-sdk-java : eventmesh java client sdk
- eventmesh-starter : eventmesh project local start entry
+- eventmesh-spi : eventmesh SPI load module
-> ps: The loading of connector plugin follows the Java SPI mechanism, it's necessary to configure the mapping file of
-related interface and implementation class under /main/resources/meta-inf/services in the corresponding module
+> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of
+related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module
-**2.3.2 Configure VM Options**
+**2.3.2 Configure plugin**
-```java
--Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml
-Deventmesh.log.home=eventmesh-runtime/logs
--Deventmesh.home=eventmesh-runtime
--DconfPath=eventmesh-runtime/conf
-```
-> ps: If you use Windows, you may need to replace the file separator to \
-**2.3.3 Configure build.gradle file**
+Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties`
-Specify the connector that will be loaded after the project start with updating compile project item in dependencies
-
-update `build.gradle` file under the `eventmesh-starter` module
+Modify the `eventMesh.properties` file in the `confPath` directory
load **rocketmq connector** configuration:
```java
-dependencies {
- compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
-}
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
+```
+
+**2.3.3 Configure VM Options**
+
+```java
+-Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml
+-Deventmesh.log.home=eventmesh-runtime/logs
+-Deventmesh.home=eventmesh-runtime
+-DconfPath=eventmesh-runtime/conf
```
+> ps: If you use Windows, you may need to replace the file separator to \
**2.3.4 Run**
diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png
index 252a953..efc5249 100644
Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ
diff --git a/eventmesh-common/gradle.properties b/eventmesh-common/gradle.properties
index b4202ce..4117dff 100644
--- a/eventmesh-common/gradle.properties
+++ b/eventmesh-common/gradle.properties
@@ -16,4 +16,4 @@
#
group=org.apache.eventmesh
version=1.2.0-SNAPSHOT
-jdk=1.7
+jdk=1.8
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java
index aecfb0e..895215d 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java
@@ -37,7 +37,7 @@ public class ThreadUtil {
/**
* get current process id only once.
*
- * @return
+ * @return process id
*/
public static long getPID() {
if (currentPID >= 0) {
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 08a44cb..1ae6b26 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -17,17 +17,10 @@
package org.apache.eventmesh.common.config;
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.IPUtil;
public class CommonConfiguration {
public String eventMeshEnv = "P";
@@ -35,7 +28,7 @@ public class CommonConfiguration {
public String eventMeshCluster = "LS";
public String eventMeshName = "";
public String sysID = "5477";
-
+ public String eventMeshConnectorPluginType = "rocketmq";
public String namesrvAddr = "";
public String clientUserName = "username";
@@ -84,8 +77,11 @@ public class CommonConfiguration {
eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
- eventMeshServerIp = getLocalAddr();
+ eventMeshServerIp = IPUtil.getLocalAddress();
}
+
+ eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
+ Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
}
}
@@ -105,94 +101,7 @@ public class CommonConfiguration {
public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills";
public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
- }
-
- public static String getLocalAddr() {
- //priority of networkInterface when generating client ip
- String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
- ArrayList<String> preferList = new ArrayList<String>();
- for (String eth : priority.split("<")) {
- preferList.add(eth);
- }
- NetworkInterface preferNetworkInterface = null;
-
- try {
- Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
- while (enumeration1.hasMoreElements()) {
- final NetworkInterface networkInterface = enumeration1.nextElement();
- if (!preferList.contains(networkInterface.getName())) {
- continue;
- } else if (preferNetworkInterface == null) {
- preferNetworkInterface = networkInterface;
- }
- //get the networkInterface that has higher priority
- else if (preferList.indexOf(networkInterface.getName())
- > preferList.indexOf(preferNetworkInterface.getName())) {
- preferNetworkInterface = networkInterface;
- }
- }
-
- // Traversal Network interface to get the first non-loopback and non-private address
- ArrayList<String> ipv4Result = new ArrayList<String>();
- ArrayList<String> ipv6Result = new ArrayList<String>();
-
- if (preferNetworkInterface != null) {
- final Enumeration<InetAddress> en = preferNetworkInterface.getInetAddresses();
- getIpResult(ipv4Result, ipv6Result, en);
- } else {
- Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
- while (enumeration.hasMoreElements()) {
- final NetworkInterface networkInterface = enumeration.nextElement();
- final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
- getIpResult(ipv4Result, ipv6Result, en);
- }
- }
-
- // prefer ipv4
- if (!ipv4Result.isEmpty()) {
- for (String ip : ipv4Result) {
- if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
- continue;
- }
-
- return ip;
- }
- return ipv4Result.get(ipv4Result.size() - 1);
- } else if (!ipv6Result.isEmpty()) {
- return ipv6Result.get(0);
- }
- //If failed to find,fall back to localhost
- final InetAddress localHost = InetAddress.getLocalHost();
- return normalizeHostAddress(localHost);
- } catch (SocketException e) {
- e.printStackTrace();
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
-
- return null;
- }
-
- public static String normalizeHostAddress(final InetAddress localHost) {
- if (localHost instanceof Inet6Address) {
- return "[" + localHost.getHostAddress() + "]";
- } else {
- return localHost.getHostAddress();
- }
- }
-
- private static void getIpResult(ArrayList<String> ipv4Result, ArrayList<String> ipv6Result,
- Enumeration<InetAddress> en) {
- while (en.hasMoreElements()) {
- final InetAddress address = en.nextElement();
- if (!address.isLoopbackAddress()) {
- if (address instanceof Inet6Address) {
- ipv6Result.add(normalizeHostAddress(address));
- } else {
- ipv4Result.add(normalizeHostAddress(address));
- }
- }
- }
+ public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
}
}
\ No newline at end of file
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java
index fc741a5..fb7a992 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java
@@ -28,7 +28,7 @@ import java.util.List;
* This selector use random strategy.
* Each selection will randomly give one from the given list
*
- * @param <T>
+ * @param <T> Target type
*/
public class RandomLoadBalanceSelector<T> implements LoadBalanceSelector<T> {
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java
index d6f2009..c031000 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java
@@ -27,7 +27,7 @@ import java.util.List;
* This selector use the weighted round robin strategy to select from list.
* If the weight is greater, the probability of being selected is larger.
*
- * @param <T>
+ * @param <T> Target type
*/
public class WeightRoundRobinLoadBalanceSelector<T> implements LoadBalanceSelector<T> {
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/LiteMessageTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/LiteMessageTest.java
new file mode 100644
index 0000000..2f206a8
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/LiteMessageTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LiteMessageTest {
+
+ @Test
+ public void testGetProp() {
+ LiteMessage message = createLiteMessage();
+ Assert.assertEquals(2L, message.getProp().size());
+ }
+
+ @Test
+ public void testSetProp() {
+ LiteMessage message = createLiteMessage();
+ Map<String, String> prop = new HashMap<>();
+ prop.put("key3", "value3");
+ message.setProp(prop);
+ Assert.assertEquals(1L, message.getProp().size());
+ Assert.assertEquals("value3", message.getPropKey("key3"));
+ }
+
+ @Test
+ public void testAddProp() {
+ LiteMessage message = createLiteMessage();
+ message.addProp("key3", "value3");
+ Assert.assertEquals(3L, message.getProp().size());
+ Assert.assertEquals("value1", message.getPropKey("key1"));
+ }
+
+ @Test
+ public void testGetPropKey() {
+ LiteMessage message = createLiteMessage();
+ Assert.assertEquals("value1", message.getPropKey("key1"));
+ }
+
+ @Test
+ public void testRemoveProp() {
+ LiteMessage message = createLiteMessage();
+ message.removeProp("key1");
+ Assert.assertEquals(1L, message.getProp().size());
+ Assert.assertNull(message.getPropKey("key1"));
+ }
+
+ private LiteMessage createLiteMessage() {
+ LiteMessage result = new LiteMessage();
+ Map<String, String> prop = new HashMap<>();
+ prop.put("key1", "value1");
+ prop.put("key2", "value2");
+ result.setProp(prop);
+ return result;
+ }
+}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/command/HttpCommandTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/command/HttpCommandTest.java
new file mode 100644
index 0000000..ce5ac0c
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/command/HttpCommandTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.command;
+
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.eventmesh.common.protocol.http.body.BaseResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.Body;
+import org.apache.eventmesh.common.protocol.http.header.Header;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HttpCommandTest {
+
+ @Mock
+ private Header header;
+
+ @Mock
+ private Body body;
+
+ private HttpCommand httpCommand;
+
+ @Before
+ public void before() {
+ httpCommand = new HttpCommand("POST", "1.1", "200");
+ }
+
+ @Test
+ public void testCreateHttpCommandResponseWithHeaderAndBody() {
+ HttpCommand command = httpCommand.createHttpCommandResponse(header, body);
+ Map<String, Object> headerMap = new HashMap<>();
+ headerMap.put("key1", "value1");
+ when(header.toMap()).thenReturn(headerMap);
+ Assert.assertEquals("1.1", command.getHttpVersion());
+ Assert.assertEquals("POST", command.getHttpMethod());
+ Assert.assertEquals("200", command.getRequestCode());
+ Assert.assertEquals("value1", command.getHeader().toMap().get("key1"));
+ }
+
+ @Test
+ public void testCreateHttpCommandResponseWithRetCodeAndRetMsg() {
+ HttpCommand command = httpCommand.createHttpCommandResponse(200, "SUCCESS");
+ Assert.assertThat(((BaseResponseBody) command.getBody()).getRetCode(), is(200));
+ Assert.assertEquals("SUCCESS", ((BaseResponseBody) command.getBody()).getRetMsg());
+ }
+
+ @Test
+ public void testAbstractDesc() {
+ HttpCommand command = httpCommand.createHttpCommandResponse(header, body);
+ String desc = command.abstractDesc();
+ Assert.assertTrue(desc.startsWith("httpCommand"));
+ }
+
+ @Test
+ public void testSimpleDesc() {
+ HttpCommand command = httpCommand.createHttpCommandResponse(header, body);
+ String desc = command.simpleDesc();
+ Assert.assertTrue(desc.startsWith("httpCommand"));
+ }
+
+ @Test
+ public void testHttpResponse() throws Exception {
+ HttpCommand command = httpCommand.createHttpCommandResponse(header, body);
+ DefaultFullHttpResponse response = command.httpResponse();
+ Assert.assertEquals("keep-alive", response.headers().get(HttpHeaderNames.CONNECTION));
+ }
+
+ @Test
+ public void testHttpResponseWithREQCmdType() throws Exception {
+ DefaultFullHttpResponse response = httpCommand.httpResponse();
+ Assert.assertNull(response);
+ }
+}
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java
similarity index 51%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java
index 09dd79f..7880cdd 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java
@@ -14,23 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-sourceCompatibility = 1.8
-List metrics = [
- "io.dropwizard.metrics:metrics-core:4.1.0",
- "io.dropwizard.metrics:metrics-healthchecks:4.1.0",
- "io.dropwizard.metrics:metrics-annotation:4.1.0",
- "io.dropwizard.metrics:metrics-json:4.1.0"
-]
+package org.apache.eventmesh.common.config;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+public class CommonConfigurationTest {
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+ private CommonConfiguration configuration;
+ @Before
+ public void before() {
+ String file = ConfigurationWraperTest.class.getResource("/configuration.properties").getFile();
+ ConfigurationWraper wraper = new ConfigurationWraper(file, false);
+ configuration = new CommonConfiguration(wraper);
+ }
-dependencies {
- implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
- testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
+ @Test
+ public void testInit() {
+ configuration.init();
+ Assert.assertEquals("value1", configuration.eventMeshEnv);
+ Assert.assertEquals("value2", configuration.eventMeshIDC);
+ Assert.assertEquals("3", configuration.sysID);
+ }
}
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigurationWraperTest.java
similarity index 57%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigurationWraperTest.java
index 09dd79f..7a89efa 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigurationWraperTest.java
@@ -14,23 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-sourceCompatibility = 1.8
-List metrics = [
- "io.dropwizard.metrics:metrics-core:4.1.0",
- "io.dropwizard.metrics:metrics-healthchecks:4.1.0",
- "io.dropwizard.metrics:metrics-annotation:4.1.0",
- "io.dropwizard.metrics:metrics-json:4.1.0"
-]
+package org.apache.eventmesh.common.config;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+public class ConfigurationWraperTest {
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+ private ConfigurationWraper wraper;
+ @Before
+ public void before() {
+ String file = ConfigurationWraperTest.class.getResource("/configuration.properties").getFile();
+ wraper = new ConfigurationWraper(file, false);
+ }
-dependencies {
- implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
- testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
+ @Test
+ public void testGetProp() {
+ Assert.assertEquals("value1", wraper.getProp("eventMesh.server.env"));
+ Assert.assertEquals("value2", wraper.getProp("eventMesh.server.idc"));
+ }
}
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightTest.java
similarity index 53%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightTest.java
index 5e60e0e..bdf7a84 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightTest.java
@@ -15,27 +15,30 @@
* limitations under the License.
*/
-package org.apache.eventmesh.api.consumer;
-
-import java.util.List;
-import java.util.Properties;
-
-import io.openmessaging.api.AsyncMessageListener;
-import io.openmessaging.api.Consumer;
-import io.openmessaging.api.Message;
-
-import org.apache.eventmesh.api.AbstractContext;
-
-public interface MeshMQPushConsumer extends Consumer {
-
- void init(Properties keyValue) throws Exception;
-
- void updateOffset(List<Message> msgs, AbstractContext context);
-
-// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);
-
- void subscribe(String topic, final AsyncMessageListener listener) throws Exception;
-
- @Override
- void unsubscribe(String topic);
+package org.apache.eventmesh.common.loadbalance;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WeightTest {
+
+ @Test
+ public void testDecreaseTotal() {
+ Weight weight = new Weight(null, 0);
+ weight.decreaseTotal(1);
+ Assert.assertEquals(-1, weight.getCurrentWeight().get());
+ }
+
+ @Test
+ public void testIncreaseCurrentWeight() {
+ Weight weight = new Weight(null, 10);
+ weight.increaseCurrentWeight();
+ Assert.assertEquals(10, weight.getCurrentWeight().get());
+ }
+
+ @Test
+ public void testGetCurrentWeight() {
+ Weight weight = new Weight(null, 0);
+ Assert.assertEquals(0, weight.getCurrentWeight().get());
+ }
}
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/body/BaseResponseBodyTest.java
similarity index 50%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/body/BaseResponseBodyTest.java
index 82ca583..edba782 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/body/BaseResponseBodyTest.java
@@ -15,30 +15,25 @@
* limitations under the License.
*/
-package org.apache.eventmesh.api.producer;
-
-import java.util.Properties;
-
-import io.openmessaging.api.Message;
-import io.openmessaging.api.Producer;
-import io.openmessaging.api.SendCallback;
-
-import org.apache.eventmesh.api.RRCallback;
-
-public interface MeshMQProducer extends Producer {
-
- void init(Properties properties) throws Exception;
-
- void send(Message message, SendCallback sendCallback) throws Exception;
-
- void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
-
- Message request(Message message, long timeout) throws Exception;
-
- boolean reply(final Message message, final SendCallback sendCallback) throws Exception;
-
- void checkTopicExist(String topic) throws Exception;
-
- void setExtFields();
-
+package org.apache.eventmesh.common.protocol.http.body;
+
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class BaseResponseBodyTest {
+
+ @Test
+ public void testToMap() {
+ BaseResponseBody body = new BaseResponseBody();
+ body.setRetCode(200);
+ body.setRetMsg("SUCCESS");
+ Assert.assertTrue(body.toMap().containsKey(ProtocolKey.RETCODE));
+ Assert.assertTrue(body.toMap().containsKey(ProtocolKey.RETMSG));
+ Assert.assertTrue(body.toMap().containsKey(ProtocolKey.RESTIME));
+ Assert.assertThat(body.toMap().get(ProtocolKey.RETCODE), is(200));
+ Assert.assertThat(body.toMap().get(ProtocolKey.RETMSG), is("SUCCESS"));
+ }
}
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseRequestHeaderTest.java
similarity index 53%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseRequestHeaderTest.java
index 5e60e0e..e511f19 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseRequestHeaderTest.java
@@ -15,27 +15,26 @@
* limitations under the License.
*/
-package org.apache.eventmesh.api.consumer;
+package org.apache.eventmesh.common.protocol.http.header;
-import java.util.List;
-import java.util.Properties;
-import io.openmessaging.api.AsyncMessageListener;
-import io.openmessaging.api.Consumer;
-import io.openmessaging.api.Message;
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.junit.Assert;
+import org.junit.Test;
-import org.apache.eventmesh.api.AbstractContext;
+import java.util.HashMap;
+import java.util.Map;
-public interface MeshMQPushConsumer extends Consumer {
+import static org.hamcrest.CoreMatchers.is;
- void init(Properties keyValue) throws Exception;
+public class BaseRequestHeaderTest {
- void updateOffset(List<Message> msgs, AbstractContext context);
-
-// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);
-
- void subscribe(String topic, final AsyncMessageListener listener) throws Exception;
-
- @Override
- void unsubscribe(String topic);
+ @Test
+ public void testToMap() {
+ Map<String, Object> headerParam = new HashMap<>();
+ headerParam.put(ProtocolKey.REQUEST_CODE, "200");
+ BaseRequestHeader header = BaseRequestHeader.buildHeader(headerParam);
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE));
+ Assert.assertThat(header.toMap().get(ProtocolKey.REQUEST_CODE), is("200"));
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseResponseHeaderTest.java
similarity index 59%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseResponseHeaderTest.java
index 2d1205d..f963eea 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseResponseHeaderTest.java
@@ -15,11 +15,20 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class BaseResponseHeaderTest {
+
+ @Test
+ public void testToMap() {
+ BaseResponseHeader header = BaseResponseHeader.buildHeader("200");
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE));
+ Assert.assertThat(header.toMap().get(ProtocolKey.REQUEST_CODE), is("200"));
+ }
}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractRequestHeaderTest.java
new file mode 100644
index 0000000..115aa12
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractRequestHeaderTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.protocol.http.header.client;
+
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.common.protocol.http.header.Header;
+import org.junit.Assert;
+
+public class AbstractRequestHeaderTest {
+
+ public void assertMapContent(Header header) {
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.LANGUAGE));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.VERSION));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.ENV));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.IDC));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.SYS));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.PID));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.IP));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.USERNAME));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.PASSWD));
+ }
+}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractResponseHeaderTest.java
new file mode 100644
index 0000000..0b5b0bd
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractResponseHeaderTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.protocol.http.header.client;
+
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.apache.eventmesh.common.protocol.http.header.Header;
+import org.junit.Assert;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class AbstractResponseHeaderTest {
+
+ public void assertMapContent(Header header) {
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV));
+ Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC));
+ Assert.assertThat(header.toMap().get(ProtocolKey.REQUEST_CODE), is(200));
+ Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER), is("CLUSTER"));
+ Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP), is("127.0.0.1"));
+ Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV), is("DEV"));
+ Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC), is("IDC"));
+ }
+}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeaderTest.java
similarity index 68%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeaderTest.java
index 2d1205d..7b1afae 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeaderTest.java
@@ -15,11 +15,18 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class HeartbeatRequestHeaderTest extends AbstractRequestHeaderTest {
+
+ @Test
+ public void testToMap() {
+ HeartbeatRequestHeader header = HeartbeatRequestHeader.buildHeader(new HashMap<>());
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeaderTest.java
similarity index 60%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeaderTest.java
index 2d1205d..d27aeaf 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeaderTest.java
@@ -15,11 +15,20 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class HeartbeatResponseHeaderTest extends AbstractResponseHeaderTest {
+
+ @Test
+ public void testToMap() {
+ HeartbeatResponseHeader header = HeartbeatResponseHeader.buildHeader(200,
+ "CLUSTER", "127.0.0.1", "DEV", "IDC");
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeaderTest.java
similarity index 69%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeaderTest.java
index 2d1205d..dbeb33b 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeaderTest.java
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class RegRequestHeaderTest extends AbstractRequestHeaderTest {
+
+ @Test
+ public void testToMap() {
+ RegRequestHeader header = RegRequestHeader.buildHeader(new HashMap<>());
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeaderTest.java
similarity index 68%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeaderTest.java
index 2d1205d..503c642 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeaderTest.java
@@ -15,11 +15,16 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.junit.Test;
+
+public class RegResponseHeaderTest extends AbstractResponseHeaderTest {
+
+ @Test
+ public void testToMap() {
+ RegResponseHeader header = RegResponseHeader.buildHeader(200,
+ "CLUSTER", "127.0.0.1", "DEV", "IDC");
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeaderTest.java
similarity index 68%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeaderTest.java
index 2d1205d..e883688 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeaderTest.java
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class SubscribeRequestHeaderTest extends AbstractRequestHeaderTest {
+
+ @Test
+ public void testToMap() {
+ SubscribeRequestHeader header = SubscribeRequestHeader.buildHeader(new HashMap<>());
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeaderTest.java
similarity index 67%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeaderTest.java
index 2d1205d..9d64633 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeaderTest.java
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+
+import org.junit.Test;
+
+public class SubscribeResponseHeaderTest extends AbstractResponseHeaderTest {
+
+ @Test
+ public void testToMap() {
+ SubscribeResponseHeader header = SubscribeResponseHeader.buildHeader(200,
+ "CLUSTER", "127.0.0.1", "DEV", "IDC");
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeaderTest.java
similarity index 69%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeaderTest.java
index 2d1205d..9e77a21 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeaderTest.java
@@ -15,11 +15,18 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class UnRegRequestHeaderTest extends AbstractRequestHeaderTest {
+
+ @Test
+ public void testToMap() {
+ UnRegRequestHeader header = UnRegRequestHeader.buildHeader(new HashMap<>());
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeaderTest.java
similarity index 68%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeaderTest.java
index 2d1205d..0c7b016 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeaderTest.java
@@ -15,11 +15,16 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.junit.Test;
+
+public class UnRegResponseHeaderTest extends AbstractResponseHeaderTest {
+
+ @Test
+ public void testToMap() {
+ UnRegResponseHeader header = UnRegResponseHeader.buildHeader(200,
+ "CLUSTER", "127.0.0.1", "DEV", "IDC");
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeaderTest.java
similarity index 68%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeaderTest.java
index 2d1205d..5ee7c9e 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeaderTest.java
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class UnSubscribeRequestHeaderTest extends AbstractRequestHeaderTest {
+
+ @Test
+ public void testToMap() {
+ UnSubscribeRequestHeader header = UnSubscribeRequestHeader.buildHeader(new HashMap<>());
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeaderTest.java
similarity index 67%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeaderTest.java
index 2d1205d..3e8cb70 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeaderTest.java
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.common.protocol.http.header.client;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+
+import org.junit.Test;
+
+public class UnSubscribeResponseHeaderTest extends AbstractResponseHeaderTest {
+
+ @Test
+ public void testToMap() {
+ UnSubscribeResponseHeader header = UnSubscribeResponseHeader.buildHeader(200,
+ "CLUSTER", "127.0.0.1", "DEV", "IDC");
+ assertMapContent(header);
+ }
}
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-common/src/test/resources/configuration.properties
similarity index 79%
copy from eventmesh-connector-api/gradle.properties
copy to eventmesh-common/src/test/resources/configuration.properties
index ae30087..76f29f2 100644
--- a/eventmesh-connector-api/gradle.properties
+++ b/eventmesh-common/src/test/resources/configuration.properties
@@ -14,8 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-group=org.apache.eventmesh
-version=1.2.0-SNAPSHOT
-jdk=1.7
-mavenUserName=
-mavenPassword=
\ No newline at end of file
+
+eventMesh.server.env=value1
+eventMesh.server.idc=value2
+eventMesh.sysid=3
+eventMesh.server.cluster=value4
+eventMesh.server.name=value5
+eventMesh.server.hostIp=value6
+eventMesh.connector.plugin.type=rocketmq
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-api/build.gradle
index 2d1205d..157048e 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-connector-api/build.gradle
@@ -20,6 +20,6 @@ List open_message = [
]
dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+ implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
+ testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
}
diff --git a/eventmesh-connector-api/gradle.properties b/eventmesh-connector-api/gradle.properties
index ae30087..9d1744e 100644
--- a/eventmesh-connector-api/gradle.properties
+++ b/eventmesh-connector-api/gradle.properties
@@ -16,6 +16,6 @@
#
group=org.apache.eventmesh
version=1.2.0-SNAPSHOT
-jdk=1.7
+jdk=1.8
mavenUserName=
mavenPassword=
\ No newline at end of file
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
index 5e60e0e..4ac1edb 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java
@@ -25,7 +25,9 @@ import io.openmessaging.api.Consumer;
import io.openmessaging.api.Message;
import org.apache.eventmesh.api.AbstractContext;
+import org.apache.eventmesh.spi.EventMeshSPI;
+@EventMeshSPI
public interface MeshMQPushConsumer extends Consumer {
void init(Properties keyValue) throws Exception;
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
index 82ca583..c717385 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
@@ -24,7 +24,9 @@ import io.openmessaging.api.Producer;
import io.openmessaging.api.SendCallback;
import org.apache.eventmesh.api.RRCallback;
+import org.apache.eventmesh.spi.EventMeshSPI;
+@EventMeshSPI
public interface MeshMQProducer extends Producer {
void init(Properties properties) throws Exception;
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
index 11a1858..6fa4104 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java
@@ -33,7 +33,7 @@ public class ClientConfiguration {
public Integer ackWindow = 1000;
public Integer pubWindow = 100;
public long consumeTimeout = 0L;
- public Integer pollNameServerInteval = 10 * 1000;
+ public Integer pollNameServerInterval = 10 * 1000;
public Integer heartbeatBrokerInterval = 30 * 1000;
public Integer rebalanceInterval = 20 * 1000;
@@ -104,18 +104,18 @@ public class ClientConfiguration {
String clientPollNamesrvIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL);
if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL));
- pollNameServerInteval = Integer.valueOf(clientPollNamesrvIntervalStr);
+ pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr);
}
- String clientHeartbeatBrokerIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVEL);
+ String clientHeartbeatBrokerIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL);
if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) {
- Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVEL));
+ Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL));
heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr);
}
- String clientRebalanceIntervalIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVEL);
+ String clientRebalanceIntervalIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL);
if (StringUtils.isNotEmpty(clientRebalanceIntervalIntervalStr)) {
- Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVEL));
+ Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL));
rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr);
}
}
@@ -144,9 +144,9 @@ public class ClientConfiguration {
public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL = "eventMesh.server.rocketmq.client.pollNameServerInterval";
- public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVEL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval";
+ public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval";
- public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVEL = "eventMesh.server.rocketmq.client.rebalanceInterval";
+ public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval";
}
}
\ No newline at end of file
diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
index ba35acf..906be5f 100644
--- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
+++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java
@@ -221,6 +221,9 @@ public class OMSUtil {
/**
* Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+ *
+ * @param rmqResult RocketMQ result
+ * @return send result
*/
public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
SendResult sendResult = new SendResult();
@@ -241,6 +244,9 @@ public class OMSUtil {
/**
* Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+ *
+ * @param <T> Target type
+ * @return Iterator
*/
public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
return new Iterator<T>() {
diff --git a/eventmesh-common/gradle.properties b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
similarity index 90%
copy from eventmesh-common/gradle.properties
copy to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
index b4202ce..0df2e28 100644
--- a/eventmesh-common/gradle.properties
+++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -13,7 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-group=org.apache.eventmesh
-version=1.2.0-SNAPSHOT
-jdk=1.7
+
+rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
\ No newline at end of file
diff --git a/eventmesh-common/gradle.properties b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
similarity index 90%
copy from eventmesh-common/gradle.properties
copy to eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
index b4202ce..ef4959d 100644
--- a/eventmesh-common/gradle.properties
+++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -13,7 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-group=org.apache.eventmesh
-version=1.2.0-SNAPSHOT
-jdk=1.7
+
+rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 09dd79f..e5bb065 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -31,6 +31,6 @@ List open_message = [
dependencies {
- implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
- testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
+ implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
+ testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
}
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 035b950..45fc193 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
-#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
\ No newline at end of file
+#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
+
+#connector plugin
+eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
index a73c24a..55e6896 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java
@@ -48,7 +48,7 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
public int eventMeshServerRetryThreadNum = 2;
- public int eventMeshServerPullRegistryIntervel = 30000;
+ public int eventMeshServerPullRegistryInterval = 30000;
public int eventMeshServerAsyncAccumulationThreshold = 1000;
@@ -62,7 +62,7 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
public int eventMeshServerClientManageBlockQSize = 1000;
- public int eventMeshServerBusyCheckIntervel = 1000;
+ public int eventMeshServerBusyCheckInterval = 1000;
public boolean eventMeshServerConsumerEnabled = false;
@@ -126,9 +126,9 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
eventMeshServerClientManageThreadNum = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerClientManageThreadNumStr));
}
- String eventMeshServerPullRegistryIntervelStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_PULL_REGISTRY_INTERVEL);
- if (StringUtils.isNotEmpty(eventMeshServerPullRegistryIntervelStr) && StringUtils.isNumeric(eventMeshServerPullRegistryIntervelStr)) {
- eventMeshServerPullRegistryIntervel = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerPullRegistryIntervelStr));
+ String eventMeshServerPullRegistryIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_PULL_REGISTRY_INTERVAL);
+ if (StringUtils.isNotEmpty(eventMeshServerPullRegistryIntervalStr) && StringUtils.isNumeric(eventMeshServerPullRegistryIntervalStr)) {
+ eventMeshServerPullRegistryInterval = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerPullRegistryIntervalStr));
}
String eventMeshServerAdminThreadNumStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ADMIN_THREAD_NUM);
@@ -161,9 +161,9 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
eventMeshServerClientManageBlockQSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerClientManageBlockQSizeStr));
}
- String eventMeshServerBusyCheckIntervelStr = configurationWraper.getProp(ConfKeys.KEY_EVENTMESH_BUSY_CHECK_INTERVEL);
- if (StringUtils.isNotEmpty(eventMeshServerBusyCheckIntervelStr) && StringUtils.isNumeric(eventMeshServerBusyCheckIntervelStr)) {
- eventMeshServerBusyCheckIntervel = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBusyCheckIntervelStr));
+ String eventMeshServerBusyCheckIntervalStr = configurationWraper.getProp(ConfKeys.KEY_EVENTMESH_BUSY_CHECK_INTERVAL);
+ if (StringUtils.isNotEmpty(eventMeshServerBusyCheckIntervalStr) && StringUtils.isNumeric(eventMeshServerBusyCheckIntervalStr)) {
+ eventMeshServerBusyCheckInterval = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBusyCheckIntervalStr));
}
String eventMeshServerConsumerEnabledStr = configurationWraper.getProp(ConfKeys.KEY_EVENTMESH_CONSUMER_ENABLED);
@@ -195,7 +195,7 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
public static String KEYS_EVENTMESH_ASYNC_ACCUMULATION_THRESHOLD = "eventMesh.server.async.accumulation.threshold";
- public static String KEY_EVENTMESH_BUSY_CHECK_INTERVEL = "eventMesh.server.busy.check.intervel";
+ public static String KEY_EVENTMESH_BUSY_CHECK_INTERVAL = "eventMesh.server.busy.check.interval";
public static String KEYS_EVENTMESH_SENDMSG_THREAD_NUM = "eventMesh.server.sendmsg.threads.num";
@@ -211,7 +211,7 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration {
public static String KEY_EVENTMESH_RETRY_THREAD_NUM = "eventMesh.server.retry.threads.num";
- public static String KEYS_EVENTMESH_PULL_REGISTRY_INTERVEL = "eventMesh.server.pull.registry.intervel";
+ public static String KEYS_EVENTMESH_PULL_REGISTRY_INTERVAL = "eventMesh.server.pull.registry.interval";
public static String KEY_EVENTMESH_RETRY_BLOCKQ_SIZE = "eventMesh.server.retry.blockQ.size";
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
index 080b7af..b09f9ed 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java
@@ -35,6 +35,14 @@ public class MQConsumerWrapper extends MQWrapper {
protected MeshMQPushConsumer meshMQPushConsumer;
+ public MQConsumerWrapper(String connectorPluginType) {
+ this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
+ if (meshMQPushConsumer == null) {
+ logger.error("can't load the meshMQPushConsumer plugin, please check.");
+ throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
+ }
+ }
+
public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
index 082ab3b..03c38a7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
@@ -34,6 +34,14 @@ public class MQProducerWrapper extends MQWrapper {
protected MeshMQProducer meshMQProducer;
+ public MQProducerWrapper(String connectorPluginType) {
+ this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
+ if (meshMQProducer == null) {
+ logger.error("can't load the meshMQProducer plugin, please check.");
+ throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
+ }
+ }
+
public synchronized void init(Properties keyValue) throws Exception {
if (inited.get()) {
return;
@@ -44,8 +52,8 @@ public class MQProducerWrapper extends MQWrapper {
logger.error("can't load the meshMQProducer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
}
- meshMQProducer.init(keyValue);
+ meshMQProducer.init(keyValue);
inited.compareAndSet(false, true);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
new file mode 100644
index 0000000..b114953
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.plugin;
+
+import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
+import org.apache.eventmesh.api.producer.MeshMQProducer;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+public class PluginFactory {
+
+ public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
+ return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
+ }
+
+ public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
+ return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
+ }
+
+ private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
+ return EventMeshExtensionFactory.getExtension(pluginType, pluginName);
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
index 8620e68..ef051b0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java
@@ -66,13 +66,15 @@ public class EventMeshConsumer {
private ConsumerGroupConf consumerGroupConf;
- private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper persistentMqConsumer;
- private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper broadcastMqConsumer;
public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.consumerGroupConf = consumerGroupConf;
+ this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
+ this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
}
private MessageHandler httpMessageHandler;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index cf41ca2..fe32180 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -69,7 +69,7 @@ public class EventMeshProducer {
return true;
}
- protected MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
+ protected MQProducerWrapper mqProducerWrapper;
public MQProducerWrapper getMqProducerWrapper() {
return mqProducerWrapper;
@@ -85,7 +85,7 @@ public class EventMeshProducer {
//TODO for defibus
keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC);
-
+ mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.eventMeshConnectorPluginType);
mqProducerWrapper.init(keyValue);
inited.compareAndSet(false, true);
logger.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName());
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
index 5829e49..310ea6d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java
@@ -50,6 +50,7 @@ import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
+import org.apache.eventmesh.runtime.core.plugin.PluginFactory;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
@@ -96,14 +97,16 @@ public class ClientGroupWrapper {
public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE);
- private MQConsumerWrapper persistentMsgConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper persistentMsgConsumer;
- private MQConsumerWrapper broadCastMsgConsumer = new MQConsumerWrapper();
+ private MQConsumerWrapper broadCastMsgConsumer;
private ConcurrentHashMap<String, Set<Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<String, Set<Session>>();
public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE);
+ private MQProducerWrapper mqProducerWrapper;
+
public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup,
EventMeshTCPServer eventMeshTCPServer,
DownstreamDispatchStrategy downstreamDispatchStrategy) {
@@ -115,6 +118,9 @@ public class ClientGroupWrapper {
this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer();
this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor();
this.downstreamDispatchStrategy = downstreamDispatchStrategy;
+ this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
+ this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType);
}
public ConcurrentHashMap<String, Set<Session>> getTopic2sessionInGroupMapping() {
@@ -163,8 +169,6 @@ public class ClientGroupWrapper {
return true;
}
- private MQProducerWrapper mqProducerWrapper = new MQProducerWrapper();
-
public MQProducerWrapper getMqProducerWrapper() {
return mqProducerWrapper;
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
index 7841a67..61d3c48 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java
@@ -96,7 +96,7 @@ public class LiteConsumer extends AbstractLiteClient {
// this.remotingServer = new RemotingServer(this.consumeExecutor);
}
- private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
+ private final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE);
@Override
public void start() throws Exception {
@@ -226,15 +226,14 @@ public class LiteConsumer extends AbstractLiteClient {
EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class);
- if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) {
- } else {
+ if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) {
throw new EventMeshException(ret.getRetCode(), ret.getRetMsg());
}
} catch (Exception e) {
logger.error("send heartBeat error", e);
}
}
- }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS);
+ }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
public boolean unsubscribe(List<String> topicList, String url) throws Exception {
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java
index 65aa40e..04c275d 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java
@@ -31,7 +31,7 @@ public class EventMeshCommon {
/**
* CLIENT端心跳间隔时间
*/
- public static int HEATBEAT = 30 * 1000;
+ public static int HEARTBEAT = 30 * 1000;
/**
* RR 废弃清理的时间间隔
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
index 7df1e29..5513e13 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
@@ -50,7 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class TcpClient implements Closeable {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
public int clientNo = (new Random()).nextInt(1000);
@@ -67,8 +67,6 @@ public abstract class TcpClient implements Closeable {
protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true));
- private ScheduledFuture<?> task;
-
public TcpClient(String host, int port) {
this.host = host;
this.port = port;
@@ -119,7 +117,7 @@ public abstract class TcpClient implements Closeable {
if (channel.isWritable()) {
channel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
- logger.warn("send msg failed", future.isSuccess(), future.cause());
+ logger.warn("send msg failed", future.cause());
}
});
} else {
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java
index da5691a..df05f4b 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
public class SimplePubClientImpl extends TcpClient implements SimplePubClient {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private UserAgent userAgent;
@@ -90,10 +90,10 @@ public class SimplePubClientImpl extends TcpClient implements SimplePubClient {
}
Package msg = MessageUtils.heartBeat();
io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- } catch (Exception e) {
+ } catch (Exception ignore) {
}
}
- }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS);
+ }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
private void goodbye() throws Exception {
@@ -176,16 +176,13 @@ public class SimplePubClientImpl extends TcpClient implements SimplePubClient {
Package pkg = MessageUtils.responseToClientAck(msg);
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
-
+ //TODO
}
RequestContext context = contexts.get(RequestContext._key(msg));
if (context != null) {
contexts.remove(context.getKey());
context.finish(msg);
- return;
- } else {
- return;
}
}
}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
index 38d52f6..7e341ca 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
- private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
private UserAgent userAgent;
@@ -101,10 +101,10 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
}
Package msg = MessageUtils.heartBeat();
SimpleSubClientImpl.this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- } catch (Exception e) {
+ } catch (Exception ignore) {
}
}
- }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS);
+ }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
private void goodbye() throws Exception {
@@ -172,10 +172,8 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient {
if (context != null) {
contexts.remove(context.getKey());
context.finish(msg);
- return;
} else {
logger.error("msg ignored,context not found.|{}|{}", cmd, msg);
- return;
}
}
}
diff --git a/settings.gradle b/eventmesh-spi/build.gradle
similarity index 77%
copy from settings.gradle
copy to eventmesh-spi/build.gradle
index ea31ffd..d973dce 100644
--- a/settings.gradle
+++ b/eventmesh-spi/build.gradle
@@ -13,9 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-rootProject.name = 'EventMesh'
-String jdkVersion = "${jdk}"
-include 'eventmesh-runtime','eventmesh-connector-rocketmq','eventmesh-sdk-java','eventmesh-common','eventmesh-connector-api','eventmesh-starter','eventmesh-test'
-
+ */
\ No newline at end of file
diff --git a/eventmesh-common/gradle.properties b/eventmesh-spi/gradle.properties
similarity index 97%
copy from eventmesh-common/gradle.properties
copy to eventmesh-spi/gradle.properties
index b4202ce..d0503c3 100644
--- a/eventmesh-common/gradle.properties
+++ b/eventmesh-spi/gradle.properties
@@ -16,4 +16,5 @@
#
group=org.apache.eventmesh
version=1.2.0-SNAPSHOT
-jdk=1.7
+jdk=1.8
+snapshot=false
diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
similarity index 50%
copy from eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
copy to eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
index 82ca583..6aea9db 100644
--- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java
@@ -15,30 +15,24 @@
* limitations under the License.
*/
-package org.apache.eventmesh.api.producer;
-
-import java.util.Properties;
-
-import io.openmessaging.api.Message;
-import io.openmessaging.api.Producer;
-import io.openmessaging.api.SendCallback;
-
-import org.apache.eventmesh.api.RRCallback;
-
-public interface MeshMQProducer extends Producer {
-
- void init(Properties properties) throws Exception;
-
- void send(Message message, SendCallback sendCallback) throws Exception;
-
- void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
-
- Message request(Message message, long timeout) throws Exception;
-
- boolean reply(final Message message, final SendCallback sendCallback) throws Exception;
-
- void checkTopicExist(String topic) throws Exception;
-
- void setExtFields();
-
+package org.apache.eventmesh.spi;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+public enum EventMeshExtensionFactory {
+ ;
+
+ public static <T> T getExtension(Class<T> extensionType, String extensionName) {
+ if (extensionType == null) {
+ throw new ExtensionException("extensionType is null");
+ }
+ if (StringUtils.isEmpty(extensionName)) {
+ throw new ExtensionException("extensionName is null");
+ }
+ if (!extensionType.isInterface() || !extensionType.isAnnotationPresent(EventMeshSPI.class)) {
+ throw new ExtensionException(String.format("extensionType:%s is invalided", extensionType));
+ }
+ return EventMeshExtensionLoader.getExtension(extensionType, extensionName);
+ }
}
diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
new file mode 100644
index 0000000..89696e0
--- /dev/null
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.spi;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+public enum EventMeshExtensionLoader {
+ ;
+
+ private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class);
+
+ private static final ConcurrentHashMap<Class<?>, ConcurrentHashMap<String, Class<?>>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16);
+
+ private static final ConcurrentHashMap<String, Object> EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16);
+
+ private static final String EVENTMESH_EXTENSION_DIR = "META-INF/eventmesh/";
+
+ @SuppressWarnings("unchecked")
+ public static <T> T getExtension(Class<T> extensionType, String extensionName) {
+ if (!hasLoadExtensionClass(extensionType)) {
+ loadExtensionClass(extensionType);
+ }
+ if (!hasInitializeExtension(extensionName)) {
+ initializeExtension(extensionType, extensionName);
+ }
+ return (T) EXTENSION_INSTANCE_CACHE.get(extensionName);
+ }
+
+ private static <T> void initializeExtension(Class<T> extensionType, String extensionName) {
+ ConcurrentHashMap<String, Class<?>> extensionClassMap = EXTENSION_CLASS_LOAD_CACHE.get(extensionType);
+ if (extensionClassMap == null) {
+ throw new ExtensionException(String.format("Extension type:%s has not been loaded", extensionType));
+ }
+ if (!extensionClassMap.containsKey(extensionName)) {
+ throw new ExtensionException(String.format("Extension name:%s has not been loaded", extensionName));
+ }
+ Class<?> aClass = extensionClassMap.get(extensionName);
+ try {
+ Object extensionObj = aClass.newInstance();
+ logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName);
+ EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new ExtensionException("Extension initialize error", e);
+ }
+ }
+
+ public static <T> void loadExtensionClass(Class<T> extensionType) {
+ String extensionFileName = EVENTMESH_EXTENSION_DIR + extensionType.getName();
+ ClassLoader classLoader = EventMeshExtensionLoader.class.getClassLoader();
+ try {
+ Enumeration<URL> extensionUrls = classLoader.getResources(extensionFileName);
+ if (extensionUrls != null) {
+ while (extensionUrls.hasMoreElements()) {
+ URL url = extensionUrls.nextElement();
+ loadResources(url, extensionType);
+ }
+ }
+ } catch (IOException e) {
+ throw new ExtensionException("load extension class error", e);
+ }
+
+
+ }
+
+ private static <T> void loadResources(URL url, Class<T> extensionType) throws IOException {
+ try (InputStream inputStream = url.openStream()) {
+ Properties properties = new Properties();
+ properties.load(inputStream);
+ properties.forEach((extensionName, extensionClass) -> {
+ String extensionNameStr = (String) extensionName;
+ String extensionClassStr = (String) extensionClass;
+ try {
+ Class<?> targetClass = Class.forName(extensionClassStr);
+ logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass);
+ if (!extensionType.isAssignableFrom(targetClass)) {
+ throw new ExtensionException(
+ String.format("class: %s is not subClass of %s", targetClass, extensionType));
+ }
+ EXTENSION_CLASS_LOAD_CACHE.computeIfAbsent(extensionType, k -> new ConcurrentHashMap<>())
+ .put(extensionNameStr, targetClass);
+ } catch (ClassNotFoundException e) {
+ throw new ExtensionException("load extension class error", e);
+ }
+ });
+ }
+ }
+
+ private static <T> boolean hasLoadExtensionClass(Class<T> extensionType) {
+ return EXTENSION_CLASS_LOAD_CACHE.containsKey(extensionType);
+ }
+
+ private static boolean hasInitializeExtension(String extensionName) {
+ return EXTENSION_INSTANCE_CACHE.containsKey(extensionName);
+ }
+}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshSPI.java
similarity index 67%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshSPI.java
index 2d1205d..0ea72d4 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshSPI.java
@@ -15,11 +15,21 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.spi;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Just as a marker for SPI
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface EventMeshSPI {
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
}
+
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/ExtensionException.java
similarity index 70%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-spi/src/main/java/org/apache/eventmesh/spi/ExtensionException.java
index 2d1205d..874f03d 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/ExtensionException.java
@@ -15,11 +15,19 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.spi;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+public class ExtensionException extends RuntimeException {
+
+ public ExtensionException(Exception e) {
+ super(e);
+ }
+
+ public ExtensionException(String message) {
+ super(message);
+ }
+
+ public ExtensionException(String message, Exception e) {
+ super(message, e);
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/EventMeshExtensionFactoryTest.java
similarity index 73%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-spi/src/test/java/org/apache/eventmesh/spi/EventMeshExtensionFactoryTest.java
index 2d1205d..649f4b1 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/EventMeshExtensionFactoryTest.java
@@ -15,11 +15,15 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.spi;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
-}
+import org.junit.Test;
+
+public class EventMeshExtensionFactoryTest {
+
+ @Test
+ public void getExtension() {
+ TestExtension extensionA = EventMeshExtensionFactory.getExtension(TestExtension.class, "extensionA");
+ extensionA.hello();
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/ExtensionA.java
similarity index 78%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-spi/src/test/java/org/apache/eventmesh/spi/ExtensionA.java
index 2d1205d..03513e6 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/ExtensionA.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.spi;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+public class ExtensionA implements TestExtension {
+
+ @Override
+ public void hello() {
+ System.out.println("I am ExtensionA");
+ }
}
diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/TestExtension.java
similarity index 78%
copy from eventmesh-connector-api/build.gradle
copy to eventmesh-spi/src/test/java/org/apache/eventmesh/spi/TestExtension.java
index 2d1205d..c0c9888 100644
--- a/eventmesh-connector-api/build.gradle
+++ b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/TestExtension.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-List open_message = [
- "io.openmessaging:openmessaging-api:2.2.1-pubsub"
-]
+package org.apache.eventmesh.spi;
-dependencies {
- implementation open_message,project(":eventmesh-common")
- testImplementation open_message,project(":eventmesh-common")
+@EventMeshSPI
+public interface TestExtension {
+
+ void hello();
}
diff --git a/eventmesh-common/gradle.properties b/eventmesh-spi/src/test/resources/META-INF/eventmesh/org.apache.eventmesh.spi.TestExtension
similarity index 92%
copy from eventmesh-common/gradle.properties
copy to eventmesh-spi/src/test/resources/META-INF/eventmesh/org.apache.eventmesh.spi.TestExtension
index b4202ce..3862ccb 100644
--- a/eventmesh-common/gradle.properties
+++ b/eventmesh-spi/src/test/resources/META-INF/eventmesh/org.apache.eventmesh.spi.TestExtension
@@ -13,7 +13,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-group=org.apache.eventmesh
-version=1.2.0-SNAPSHOT
-jdk=1.7
+
+extensionA=org.apache.eventmesh.spi.ExtensionA
\ No newline at end of file
diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
index 9d3af8f..329f2bc 100644
--- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
+++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java
@@ -34,10 +34,15 @@ public class SyncRequestInstance {
public static void main(String[] args) throws Exception {
LiteProducer liteProducer = null;
+ String eventMeshIPPort = "127.0.0.1:10105";
+ String topic = "EventMesh.SyncRequestInstance";
try {
- String eventMeshIPPort = args[0];
-
- final String topic = args[1];
+ if (args.length > 0 && StringUtils.isNotBlank(args[0])) {
+ eventMeshIPPort = args[0];
+ }
+ if (args.length > 1 && StringUtils.isNotBlank(args[1])) {
+ topic = args[1];
+ }
if (StringUtils.isBlank(eventMeshIPPort)) {
// if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105
diff --git a/settings.gradle b/settings.gradle
index ea31ffd..2b5e0af 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,5 +17,12 @@
rootProject.name = 'EventMesh'
String jdkVersion = "${jdk}"
-include 'eventmesh-runtime','eventmesh-connector-rocketmq','eventmesh-sdk-java','eventmesh-common','eventmesh-connector-api','eventmesh-starter','eventmesh-test'
+include 'eventmesh-runtime'
+include 'eventmesh-connector-rocketmq'
+include 'eventmesh-sdk-java'
+include 'eventmesh-common'
+include 'eventmesh-connector-api'
+include 'eventmesh-starter'
+include 'eventmesh-test'
+include 'eventmesh-spi'
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org