You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2023/02/25 11:43:00 UTC

[incubator-eventmesh] branch master updated: [ISSUE #3242]Refactor EventMeshProducer

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new abfc7b973 [ISSUE #3242]Refactor EventMeshProducer
     new 693b3c4de Merge pull request #3243 from mxsm/eventmesh-3242
abfc7b973 is described below

commit abfc7b973c4b4d037ee2367f7cf59c90b76b5dbc
Author: mxsm <lj...@gmail.com>
AuthorDate: Sat Feb 25 10:01:44 2023 +0800

    [ISSUE #3242]Refactor EventMeshProducer
---
 .../http/processor/BatchSendMessageProcessor.java  |  2 +-
 .../processor/BatchSendMessageV2Processor.java     |  2 +-
 .../http/processor/ReplyMessageProcessor.java      |  2 +-
 .../http/processor/SendAsyncEventProcessor.java    |  2 +-
 .../http/processor/SendAsyncMessageProcessor.java  |  2 +-
 .../processor/SendAsyncRemoteEventProcessor.java   |  2 +-
 .../http/processor/SendSyncMessageProcessor.java   |  2 +-
 .../protocol/http/producer/EventMeshProducer.java  | 32 ++++++++++------------
 .../protocol/http/producer/ProducerManager.java    |  2 +-
 9 files changed, 23 insertions(+), 25 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index 5150f54a1..8ae17b524 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -205,7 +205,7 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
 
         batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
 
-        if (!batchEventMeshProducer.getStarted().get()) {
+        if (!batchEventMeshProducer.isStarted()) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
                 sendMessageBatchResponseHeader,
                 SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR.getRetCode(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 467399ae7..c8ad12d93 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -211,7 +211,7 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
         EventMeshProducer batchEventMeshProducer =
             eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
         batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
-        if (!batchEventMeshProducer.getStarted().get()) {
+        if (!batchEventMeshProducer.isStarted()) {
             responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageBatchV2ResponseHeader,
                 SendMessageBatchV2ResponseBody
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
index c5b122bb3..7b508d69a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java
@@ -168,7 +168,7 @@ public class ReplyMessageProcessor implements HttpRequestProcessor {
 
         EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
                 replyMessageResponseHeader,
                 ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 5366973b7..0f4168de7 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -195,7 +195,7 @@ public class SendAsyncEventProcessor implements AsyncHttpProcessor {
 
         final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
                 responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
             return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index e2cd2bb3d..fccbc07f6 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -211,7 +211,7 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
         EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             responseEventMeshCommand = request.createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR.getRetCode(),
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
index 7a70348bb..c6ea1a93c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java
@@ -237,7 +237,7 @@ public class SendAsyncRemoteEventProcessor implements AsyncHttpProcessor {
 
         final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
                 responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
             return;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 3b018189d..bb7e4d4dd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -214,7 +214,7 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
         final EventMeshProducer eventMeshProducer =
             eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
                 sendMessageResponseHeader,
                 SendMessageResponseBody
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
index 931d232cb..6dfc4cdbe 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
@@ -41,14 +41,16 @@ public class EventMeshProducer {
         return inited;
     }
 
-    public AtomicBoolean getStarted() {
-        return started;
+    public boolean isStarted() {
+        return started.get();
     }
 
     protected ProducerGroupConf producerGroupConfig;
 
     protected EventMeshHTTPConfiguration eventMeshHttpConfiguration;
 
+    protected MQProducerWrapper mqProducerWrapper;
+
     public void send(SendMessageContext sendMsgContext, SendCallback sendCallback) throws Exception {
         mqProducerWrapper.send(sendMsgContext.getEvent(), sendCallback);
     }
@@ -63,14 +65,15 @@ public class EventMeshProducer {
         return true;
     }
 
-    protected MQProducerWrapper mqProducerWrapper;
-
     public MQProducerWrapper getMqProducerWrapper() {
         return mqProducerWrapper;
     }
 
-    public synchronized void init(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
-        ProducerGroupConf producerGroupConfig) throws Exception {
+    public void init(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
+                                  ProducerGroupConf producerGroupConfig) throws Exception {
+        if (!inited.compareAndSet(false, true)) {
+            return;
+        }
         this.producerGroupConfig = producerGroupConfig;
         this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
 
@@ -83,32 +86,27 @@ public class EventMeshProducer {
         keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.getEventMeshIDC());
         mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.getEventMeshConnectorPluginType());
         mqProducerWrapper.init(keyValue);
-        inited.compareAndSet(false, true);
         log.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName());
     }
 
 
-    public synchronized void start() throws Exception {
-        if (started.get()) {
+    public void start() throws Exception {
+
+        if (!started.compareAndSet(false, true)) {
             return;
         }
-
         mqProducerWrapper.start();
-        started.compareAndSet(false, true);
         log.info("EventMeshProducer [{}] started.............", producerGroupConfig.getGroupName());
     }
 
-    public synchronized void shutdown() throws Exception {
-        if (!inited.get()) {
+    public void shutdown() throws Exception {
+        if (!inited.compareAndSet(true, false)) {
             return;
         }
-
-        if (!started.get()) {
+        if (!started.compareAndSet(true, false)) {
             return;
         }
         mqProducerWrapper.shutdown();
-        inited.compareAndSet(true, false);
-        started.compareAndSet(true, false);
         log.info("EventMeshProducer [{}] shutdown.............", producerGroupConfig.getGroupName());
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
index dce75e5f8..3be9b98df 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/ProducerManager.java
@@ -58,7 +58,7 @@ public class ProducerManager {
 
         eventMeshProducer = producerTable.get(producerGroup);
 
-        if (!eventMeshProducer.getStarted().get()) {
+        if (!eventMeshProducer.isStarted()) {
             eventMeshProducer.start();
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org