You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/02/25 11:43:00 UTC
[incubator-eventmesh] branch master updated: [ISSUE #3242]Refactor EventMeshProducer
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new abfc7b973 [ISSUE #3242]Refactor EventMeshProducer
new 693b3c4de Merge pull request #3243 from mxsm/eventmesh-3242
abfc7b973 is described below
commit abfc7b973c4b4d037ee2367f7cf59c90b76b5dbc
Author: mxsm <lj...@gmail.com>
AuthorDate: Sat Feb 25 10:01:44 2023 +0800
[ISSUE #3242]Refactor EventMeshProducer
---
.../http/processor/BatchSendMessageProcessor.java | 2 +-
.../processor/BatchSendMessageV2Processor.java | 2 +-
.../http/processor/ReplyMessageProcessor.java | 2 +-
.../http/processor/SendAsyncEventProcessor.java | 2 +-
.../http/processor/SendAsyncMessageProcessor.java | 2 +-
.../processor/SendAsyncRemoteEventProcessor.java | 2 +-
.../http/processor/SendSyncMessageProcessor.java | 2 +-
.../protocol/http/producer/EventMeshProducer.java | 32 ++++++++++------------
.../protocol/http/producer/ProducerManager.java | 2 +-
9 files changed, 23 insertions(+), 25 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 5150f54a1..8ae17b524 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -205,7 +205,7 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
- if (!batchEventMeshProducer.getStarted().get()) {
+ if (!batchEventMeshProducer.isStarted()) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageBatchResponseHeader,
SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getRetCode(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 467399ae7..c8ad12d93 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -211,7 +211,7 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
EventMeshProducer batchEventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
- if (!batchEventMeshProducer.getStarted().get()) {
+ if (!batchEventMeshProducer.isStarted()) {
responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageBatchV2ResponseHeader,
SendMessageBatchV2ResponseBody
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index c5b122bb3..7b508d69a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -168,7 +168,7 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
replyMessageResponseHeader,
ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 5366973b7..0f4168de7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -195,7 +195,7 @@ public class SendAsyncEventProcessor implements AsyncHttpProcessor {
final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index e2cd2bb3d..fccbc07f6 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -211,7 +211,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
responseEventMeshCommand = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 7a70348bb..c6ea1a93c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -237,7 +237,7 @@ public class SendAsyncRemoteEventProcessor implements AsyncHttpProcessor {
final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 3b018189d..bb7e4d4dd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -214,7 +214,7 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
final EventMeshProducer eventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index 931d232cb..6dfc4cdbe 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -41,14 +41,16 @@ public class EventMeshProducer {
return inited;
}
- public AtomicBoolean getStarted() {
- return started;
+ public boolean isStarted() {
+ return started.get();
}
protected ProducerGroupConf producerGroupConfig;
protected EventMeshHTTPConfiguration eventMeshHttpConfiguration;
+ protected MQProducerWrapper mqProducerWrapper;
+
public void send(SendMessageContext sendMsgContext, SendCallback sendCallback) throws Exception {
mqProducerWrapper.send(sendMsgContext.getEvent(), sendCallback);
}
@@ -63,14 +65,15 @@ public class EventMeshProducer {
return true;
}
- protected MQProducerWrapper mqProducerWrapper;
-
public MQProducerWrapper getMqProducerWrapper() {
return mqProducerWrapper;
}
- public synchronized void init(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
- ProducerGroupConf producerGroupConfig) throws Exception {
+ public void init(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
+ ProducerGroupConf producerGroupConfig) throws Exception {
+ if (!inited.compareAndSet(false, true)) {
+ return;
+ }
this.producerGroupConfig = producerGroupConfig;
this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
@@ -83,32 +86,27 @@ public class EventMeshProducer {
keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.getEventMeshIDC());
mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.getEventMeshConnectorPluginType());
mqProducerWrapper.init(keyValue);
- inited.compareAndSet(false, true);
log.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName());
}
- public synchronized void start() throws Exception {
- if (started.get()) {
+ public void start() throws Exception {
+
+ if (!started.compareAndSet(false, true)) {
return;
}
-
mqProducerWrapper.start();
- started.compareAndSet(false, true);
log.info("EventMeshProducer [{}] started.............", producerGroupConfig.getGroupName());
}
- public synchronized void shutdown() throws Exception {
- if (!inited.get()) {
+ public void shutdown() throws Exception {
+ if (!inited.compareAndSet(true, false)) {
return;
}
-
- if (!started.get()) {
+ if (!started.compareAndSet(true, false)) {
return;
}
mqProducerWrapper.shutdown();
- inited.compareAndSet(true, false);
- started.compareAndSet(true, false);
log.info("EventMeshProducer [{}] shutdown.............", producerGroupConfig.getGroupName());
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
index dce75e5f8..3be9b98df 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
@@ -58,7 +58,7 @@ public class ProducerManager {
eventMeshProducer = producerTable.get(producerGroup);
- if (!eventMeshProducer.getStarted().get()) {
+ if (!eventMeshProducer.isStarted()) {
eventMeshProducer.start();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org