You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eventmesh.apache.org by GitBox <gi...@apache.org> on 2021/08/24 09:32:05 UTC

[GitHub] [incubator-eventmesh] ruanwenjun commented on a change in pull request #508: [ISSUE #490] Support service invocation

ruanwenjun commented on a change in pull request #508:
URL: https://github.com/apache/incubator-eventmesh/pull/508#discussion_r694682202



##########
File path: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java
##########
@@ -33,9 +33,9 @@
 
     void send(Message message, SendCallback sendCallback) throws Exception;
 
-    void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
+    void request(Message message, RRCallback rrCallback, long timeout) throws Exception;
 
-    Message request(Message message, long timeout) throws Exception;
+//    Message request(Message message, long timeout) throws Exception;

Review comment:
       ```suggestion
   ```

##########
File path: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java
##########
@@ -80,14 +80,14 @@ public void send(Message message, SendCallback sendCallback) throws Exception {
         meshMQProducer.send(message, sendCallback);
     }
 
-    public void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout)
+    public void request(Message message, RRCallback rrCallback, long timeout)
             throws Exception {
-        meshMQProducer.request(message, sendCallback, rrCallback, timeout);
+        meshMQProducer.request(message, rrCallback, timeout);
     }
 
-    public Message request(Message message, long timeout) throws Exception {
-        return meshMQProducer.request(message, timeout);
-    }
+//    public Message request(Message message, long timeout) throws Exception {
+//        return meshMQProducer.request(message, timeout);
+//    }

Review comment:
       ```suggestion
   ```

##########
File path: eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
##########
@@ -100,6 +113,49 @@ public void sendAsync(Message message, SendCallback sendCallback) {
         }
     }
 
+    public void request(Message message, RRCallback rrCallback, long timeout)
+            throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+//        try {
+            this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
+            org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
+            rocketmqProducer.request(msgRMQ, rrCallbackConvert(message, rrCallback), timeout);
+
+//        }catch (Exception e){
+//            String topic = message.getTopic();
+//            String msgId = message.getMsgID();
+//            OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
+//            OnExceptionContext context = new OnExceptionContext();
+//            context.setTopic(topic);
+//            context.setMessageId(msgId);
+//            context.setException(onsEx);
+//            sendCallback.onException(context);
+//        }
+
+    }

Review comment:
       ```suggestion
       public void request(Message message, RRCallback rrCallback, long timeout)
               throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
               this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
               org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
               rocketmqProducer.request(msgRMQ, rrCallbackConvert(message, rrCallback), timeout);
       }
   ```

##########
File path: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java
##########
@@ -55,14 +55,14 @@ public void send(SendMessageContext sendMsgContext, SendCallback sendCallback) t
         mqProducerWrapper.send(sendMsgContext.getMsg(), sendCallback);
     }
 
-    public void request(SendMessageContext sendMsgContext, SendCallback sendCallback, RRCallback rrCallback, long timeout)
+    public void request(SendMessageContext sendMsgContext, RRCallback rrCallback, long timeout)
             throws Exception {
-        mqProducerWrapper.request(sendMsgContext.getMsg(), sendCallback, rrCallback, timeout);
+        mqProducerWrapper.request(sendMsgContext.getMsg(), rrCallback, timeout);
     }
 
-    public Message request(SendMessageContext sendMessageContext, long timeout) throws Exception {
-        return mqProducerWrapper.request(sendMessageContext.getMsg(), timeout);
-    }
+//    public Message request(SendMessageContext sendMessageContext, long timeout) throws Exception {
+//        return mqProducerWrapper.request(sendMessageContext.getMsg(), timeout);
+//    }

Review comment:
       ```suggestion
   ```

##########
File path: eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java
##########
@@ -100,6 +113,49 @@ public void sendAsync(Message message, SendCallback sendCallback) {
         }
     }
 
+    public void request(Message message, RRCallback rrCallback, long timeout)
+            throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+//        try {
+            this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
+            org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
+            rocketmqProducer.request(msgRMQ, rrCallbackConvert(message, rrCallback), timeout);
+
+//        }catch (Exception e){
+//            String topic = message.getTopic();
+//            String msgId = message.getMsgID();
+//            OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
+//            OnExceptionContext context = new OnExceptionContext();
+//            context.setTopic(topic);
+//            context.setMessageId(msgId);
+//            context.setException(onsEx);
+//            sendCallback.onException(context);
+//        }
+
+    }

Review comment:
       It might be better to remove the unused code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org