You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/11/08 09:36:29 UTC

[GitHub] [rocketmq] qqeasonchen commented on a change in pull request #1422: [RIP-16]Support request/response pattern

qqeasonchen commented on a change in pull request #1422: [RIP-16]Support request/response pattern
URL: https://github.com/apache/rocketmq/pull/1422#discussion_r344088310
 
 

 ##########
 File path: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 ##########
 @@ -1319,6 +1337,245 @@ public SendResult send(Message msg,
         return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
     }
 
+    public Message request(Message msg,
+        long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
+            RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setSendReqeustOk(false);
+                    requestResponseFuture.putResponseMessage(null);
+                    requestResponseFuture.setCause(e);
+                }
+            }, timeout - cost);
+
+            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+            if (responseMessage == null) {
+                if (requestResponseFuture.isSendRequestOk()) {
+                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+                } else {
+                    throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+                }
+            }
+            return responseMessage;
+        } finally {
+            RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
+        }
+    }
+
+    public void request(Message msg, final RequestCallback requestCallback, long timeout) throws RemotingException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
+            RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setCause(e);
+                    requestFail(requestUniqId);
+                }
+            }, timeout - cost);
+        } catch (Exception ex) {
+            log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
+            throw new RemotingSendRequestException(msg.getTopic(), ex);
+        }
+    }
+
+    public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException, RequestTimeoutException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
+            RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setSendReqeustOk(false);
+                    requestResponseFuture.putResponseMessage(null);
+                    requestResponseFuture.setCause(e);
+                }
+            }, timeout - cost);
+
+            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+            if (responseMessage == null) {
+                if (requestResponseFuture.isSendRequestOk()) {
+                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+                } else {
+                    throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+                }
+            }
+            return responseMessage;
+        } finally {
+            RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
+        }
+    }
+
+    public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final RequestCallback requestCallback, final long timeout) throws RemotingException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
+            RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setCause(e);
+                    requestFail(requestUniqId);
+                }
+            }, timeout - cost);
+        } catch (Exception ex) {
+            log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
+            throw new RemotingSendRequestException(msg.getTopic(), ex);
+        }
+    }
+
+    public Message request(final Message msg, final MessageQueue mq, final long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, null);
+            RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setSendReqeustOk(false);
+                    requestResponseFuture.putResponseMessage(null);
+                    requestResponseFuture.setCause(e);
+                }
+            }, null, timeout - cost);
+
+            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+            if (responseMessage == null) {
+                if (requestResponseFuture.isSendRequestOk()) {
+                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+                } else {
+                    throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+                }
+            }
+            return responseMessage;
+        } finally {
+            RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
+        }
+    }
+
+    public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+        throws RemotingException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String requestUniqId = msg.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(requestUniqId, timeout, requestCallback);
+            RequestFutureTable.getRequestFutureTable().put(requestUniqId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setCause(e);
+                    requestFail(requestUniqId);
+                }
+            }, null, timeout - cost);
+        } catch (Exception ex) {
+            log.warn("send request message to <{}> failed.", msg.getTopic(), ex);
+            throw new RemotingSendRequestException(msg.getTopic(), ex);
+        }
+    }
+
+    private void requestFail(final String requestUniqId) {
+        RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(requestUniqId);
+        if (responseFuture != null) {
+            responseFuture.setSendReqeustOk(false);
+            responseFuture.putResponseMessage(null);
+            try {
+                responseFuture.executeRequestCallback();
+            } catch (Exception e) {
+                log.warn("execute requestCallback in requestFail, and callback throw", e);
+            }
+        }
+    }
+
+    private void prepareSendRequest(final Message msg, long timeout) {
+        String requestUniqId = RequestIdUtil.createUniqueRequestId();
+        String requestClientId = this.getmQClientFactory().getClientId();
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId);
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO, requestClientId);
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
 
 Review comment:
   In addition, putting TTL into message is helpful to know timeout of a request when tracing by message.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services