You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/03/31 13:09:30 UTC

[rocketmq-spring] branch master updated (8e6f11c -> 7d50974)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git.


    from 8e6f11c  docs(readme):typo fix
     new 7d93931  Add Method:#syncSend(java.lang.String, java.util.Collection<T>) Fix the bug of BatchMessage syncSend without timeout
     new 0927fe8  Add Method:#syncSend(java.lang.String, java.util.Collection<T>) Fix the bug of BatchMessage syncSend without timeout
     new fefe366  Fix code style error due to mvn build failed
     new 4432a7b  Edit code style as Apache Rocket MQ
     new 4d498cb  Edit code style as Apache Rocket MQ
     new 7d50974  Edit code style as Apache Rocket MQ

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/rocketmq/spring/core/RocketMQTemplate.java   | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)


[rocketmq-spring] 06/06: Edit code style as Apache Rocket MQ

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git

commit 7d50974182952c68fc298ef150274cba38597c03
Author: GongZhengMe <79...@qq.com>
AuthorDate: Mon Mar 30 18:06:04 2020 +0800

    Edit code style as Apache Rocket MQ
---
 .../rocketmq/spring/core/RocketMQTemplate.java     | 349 ++++++++++-----------
 1 file changed, 166 insertions(+), 183 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 1ac4e78..6683f10 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -88,8 +88,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type        The type of T
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
@@ -98,8 +98,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload     the payload to be sent.
-     * @param type        The type of T
+     * @param payload the payload to be sent.
+     * @param type The type of T
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type) {
@@ -108,9 +108,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type        The type of T
-     * @param timeout     send timeout in millis
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
@@ -119,9 +119,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload     the payload to be sent.
-     * @param type        The type of T
-     * @param timeout     send timeout in millis
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
@@ -130,10 +130,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type        The type of T
-     * @param timeout     send timeout in millis
-     * @param delayLevel  message delay level(0 means no delay)
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
@@ -142,10 +142,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload     the payload to be sent.
-     * @param type        The type of T
-     * @param timeout     send timeout in millis
-     * @param delayLevel  message delay level(0 means no delay)
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
@@ -154,9 +154,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type        The type of T
-     * @param hashKey     needed when sending message orderly
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param hashKey needed when sending message orderly
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
@@ -165,9 +165,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload     the payload to be sent.
-     * @param type        The type of T
-     * @param hashKey     needed when sending message orderly
+     * @param payload the payload to be sent.
+     * @param type The type of T
+     * @param hashKey needed when sending message orderly
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
@@ -176,10 +176,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type        The type of T
-     * @param hashKey     needed when sending message orderly
-     * @param timeout     send timeout in millis
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type of T
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
@@ -188,8 +188,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload     the payload to be sent.
-     * @param type        The type of T
+     * @param payload the payload to be sent.
+     * @param type The type of T
      * @param hashKey
      * @return
      */
@@ -199,11 +199,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type        The type that receive
-     * @param hashKey     needed when sending message orderly
-     * @param timeout     send timeout in millis
-     * @param delayLevel  message delay level(0 means no delay)
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type The type that receive
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
@@ -221,14 +221,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             MessageExt replyMessage;
 
             if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
-                replyMessage = (MessageExt)producer.request(rocketMsg, timeout);
+                replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
             }
-            else {
-                replyMessage = (MessageExt)producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
-            }
-            return replyMessage != null ? (T)doConvertMessage(replyMessage, type) : null;
-        }
-        catch (Exception e) {
+            return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
+        } catch (Exception e) {
             log.error("send request message failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -236,11 +234,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload     the payload to be sent.
-     * @param type        The type that receive
-     * @param hashKey     needed when sending message orderly
-     * @param timeout     send timeout in millis
-     * @param delayLevel  message delay level(0 means no delay)
+     * @param payload the payload to be sent.
+     * @param type The type that receive
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
@@ -250,8 +248,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
      * @return
      */
@@ -261,8 +259,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param payload                      the payload to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
      * @return
      */
@@ -272,10 +270,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout                      send timeout in millis
+     * @param timeout send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -284,10 +282,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param payload                      the payload to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout                      send timeout in millis
+     * @param timeout send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -296,11 +294,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout                      send timeout in millis
-     * @param delayLevel                   message delay level(0 means no delay)
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -309,10 +307,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param payload                      the payload to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey                      needed when sending message orderly
+     * @param hashKey needed when sending message orderly
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -321,11 +319,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey                      needed when sending message orderly
-     * @param timeout                      send timeout in millis
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -334,11 +332,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param payload                      the payload to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey                      needed when sending message orderly
-     * @param timeout                      send timeout in millis
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -347,10 +345,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey                      needed when sending message orderly
+     * @param hashKey needed when sending message orderly
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -359,11 +357,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param payload                      the payload to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout                      send timeout in millis
-     * @param delayLevel                   message delay level(0 means no delay)
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -372,12 +370,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination                  formats: `topicName:tags`
-     * @param payload                      the payload to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param payload the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey                      needed when sending message orderly
-     * @param timeout                      send timeout in millis
-     * @param delayLevel                   message delay level(0 means no delay)
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -390,12 +388,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
      * <code>rocketMQLocalRequestCallback</code> will be executed. </p>
      *
-     * @param destination                  formats: `topicName:tags`
-     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey                      needed when sending message orderly
-     * @param timeout                      send timeout in millis
-     * @param delayLevel                   message delay level(0 means no delay)
+     * @param hashKey needed when sending message orderly
+     * @param timeout send timeout in millis
+     * @param delayLevel message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -417,7 +415,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             if (rocketMQLocalRequestCallback != null) {
                 requestCallback = new RequestCallback() {
                     @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
-                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt)message, getMessageType(rocketMQLocalRequestCallback)));
+                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
                     }
 
                     @Override public void onException(Throwable e) {
@@ -427,12 +425,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             }
             if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
                 producer.request(rocketMsg, requestCallback, timeout);
-            }
-            else {
+            } else {
                 producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
             }
-        }
-        catch (
+        } catch (
             Exception e) {
             log.error("send request message failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
@@ -451,7 +447,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * duplication issue.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
+     * @param message {@link org.springframework.messaging.Message}
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message) {
@@ -462,8 +458,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param timeout     send timeout with millis
+     * @param message {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -474,7 +470,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * syncSend batch messages
      *
      * @param destination formats: `topicName:tags`
-     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param messages Collection of {@link org.springframework.messaging.Message}
      * @return {@link SendResult}
      */
     public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
@@ -485,8 +481,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * syncSend batch messages in a given timeout.
      *
      * @param destination formats: `topicName:tags`
-     * @param messages    Collection of {@link org.springframework.messaging.Message}
-     * @param timeout     send timeout with millis
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -512,8 +508,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
             }
             return sendResult;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
             throw new MessagingException(e.getMessage(), e);
         }
@@ -523,9 +518,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param timeout     send timeout with millis
-     * @param delayLevel  level for the delay message
+     * @param message {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
+     * @param delayLevel level for the delay message
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -545,8 +540,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
             }
             return sendResult;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("syncSend failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -556,7 +550,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)}.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
+     * @param payload the Object to use as payload
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload) {
@@ -567,8 +561,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
-     * @param timeout     send timeout with millis
+     * @param payload the Object to use as payload
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -580,8 +574,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -592,9 +586,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
-     * @param timeout     send timeout with millis
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -611,8 +605,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
             }
             return sendResult;
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -622,8 +615,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -634,9 +627,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
-     * @param timeout     send timeout with millis
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param timeout send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -648,11 +641,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
      * addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
-     * @param delayLevel   level for the delay message
+     * @param timeout send timeout with millis
+     * @param delayLevel level for the delay message
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
         int delayLevel) {
@@ -666,8 +659,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 rocketMsg.setDelayTimeLevel(delayLevel);
             }
             producer.send(rocketMsg, sendCallback, timeout);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -676,10 +668,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
         asyncSend(destination, message, sendCallback, timeout, 0);
@@ -695,8 +687,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
      * message duplication and application developers are the one to resolve this potential issue.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -706,10 +698,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -719,8 +711,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)}.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -731,11 +723,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
      * addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
         long timeout) {
@@ -746,8 +738,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -756,9 +747,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param message      {@link org.springframework.messaging.Message}
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -768,9 +759,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -780,11 +771,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination  formats: `topicName:tags`
-     * @param payload      the Object to use as payload
-     * @param hashKey      use this key to select queue. for example: orderId, productId ...
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
-     * @param timeout      send timeout with millis
+     * @param timeout send timeout with millis
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
         long timeout) {
@@ -799,7 +790,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * One-way transmission is used for cases requiring moderate reliability, such as log collection.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
+     * @param message {@link org.springframework.messaging.Message}
      */
     public void sendOneWay(String destination, Message<?> message) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -809,8 +800,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.sendOneway(rocketMsg);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -820,7 +810,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #sendOneWay(String, Message)}
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
+     * @param payload the Object to use as payload
      */
     public void sendOneWay(String destination, Object payload) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -831,8 +821,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param message     {@link org.springframework.messaging.Message}
-     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param message {@link org.springframework.messaging.Message}
+     * @param hashKey use this key to select queue. for example: orderId, productId ...
      */
     public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -842,8 +832,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -853,7 +842,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #sendOneWayOrderly(String, Message, String)}
      *
      * @param destination formats: `topicName:tags`
-     * @param payload     the Object to use as payload
+     * @param payload the Object to use as payload
      */
     public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -894,21 +883,20 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Send Spring Message in Transaction
      *
      * @param destination destination formats: `topicName:tags`
-     * @param message     message {@link org.springframework.messaging.Message}
-     * @param arg         ext arg
+     * @param message message {@link org.springframework.messaging.Message}
+     * @param arg ext arg
      * @return TransactionSendResult
      * @throws MessagingException
      */
     public TransactionSendResult sendMessageInTransaction(final String destination,
         final Message<?> message, final Object arg) throws MessagingException {
         try {
-            if (((TransactionMQProducer)producer).getTransactionListener() == null) {
+            if (((TransactionMQProducer) producer).getTransactionListener() == null) {
                 throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
             }
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             return producer.sendMessageInTransaction(rocketMsg, arg);
-        }
-        catch (MQClientException e) {
+        } catch (MQClientException e) {
             throw RocketMQUtil.convert(e);
         }
     }
@@ -923,29 +911,24 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     private Object doConvertMessage(MessageExt messageExt, Type type) {
         if (Objects.equals(type, MessageExt.class)) {
             return messageExt;
-        }
-        else if (Objects.equals(type, byte[].class)) {
+        } else if (Objects.equals(type, byte[].class)) {
             return messageExt.getBody();
-        }
-        else {
+        } else {
             String str = new String(messageExt.getBody(), Charset.forName(charset));
             if (Objects.equals(type, String.class)) {
                 return str;
-            }
-            else {
+            } else {
                 // If msgType not string, use objectMapper change it.
                 try {
                     if (type instanceof Class) {
                         //if the messageType has not Generic Parameter
-                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)type);
-                    }
-                    else {
+                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
+                    } else {
                         //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
                         //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
-                        return ((SmartMessageConverter)this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)((ParameterizedType)type).getRawType(), null);
+                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
                     }
-                }
-                catch (Exception e) {
+                } catch (Exception e) {
                     log.error("convert failed. str:{}, msgType:{}", str, type);
                     throw new RuntimeException("cannot convert message to " + type, e);
                 }
@@ -960,7 +943,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             Type[] interfaces = targetClass.getGenericInterfaces();
             if (Objects.nonNull(interfaces)) {
                 for (Type type : interfaces) {
-                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType)type).getRawType(), RocketMQLocalRequestCallback.class))) {
+                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
                         matchedGenericInterface = type;
                         break;
                     }
@@ -972,10 +955,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             return Object.class;
         }
 
-        Type[] actualTypeArguments = ((ParameterizedType)matchedGenericInterface).getActualTypeArguments();
+        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
         if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
             return actualTypeArguments[0];
         }
         return Object.class;
     }
-}
+}
\ No newline at end of file


[rocketmq-spring] 01/06: Add Method:#syncSend(java.lang.String, java.util.Collection) Fix the bug of BatchMessage syncSend without timeout

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git

commit 7d93931b110a707ac4bfb0d34f6bb078598f8dbc
Author: GongZhengMe <79...@qq.com>
AuthorDate: Thu Mar 26 15:08:13 2020 +0800

    Add Method:#syncSend(java.lang.String, java.util.Collection<T>)
    Fix the bug of BatchMessage syncSend without timeout
---
 .../rocketmq/spring/core/RocketMQTemplate.java     | 36 ++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 089016a..626b16f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -467,6 +467,42 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
+     * syncSend batch messages
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
+        if (Objects.isNull(messages) || messages.size() == 0) {
+            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+
+        try {
+            long now = System.currentTimeMillis();
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            for (Message msg : messages) {
+                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+                    log.warn("Found a message empty in the batch, skip it");
+                    continue;
+                }
+                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
+            }
+
+            SendResult sendResult = producer.send(rmqMsgs);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    /**
      * syncSend batch messages in a given timeout.
      *
      * @param destination formats: `topicName:tags`


[rocketmq-spring] 04/06: Edit code style as Apache Rocket MQ

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git

commit 4432a7be02d428abb9012fc55172e7f3ee4220bf
Author: GongZhengMe <79...@qq.com>
AuthorDate: Fri Mar 27 12:40:15 2020 +0800

    Edit code style as Apache Rocket MQ
---
 .../rocketmq/spring/core/RocketMQTemplate.java     | 349 +++++++++++----------
 1 file changed, 183 insertions(+), 166 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 70001f0..1ac4e78 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -88,8 +88,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type The type of T
+     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type        The type of T
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
@@ -98,8 +98,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
-     * @param type The type of T
+     * @param payload     the payload to be sent.
+     * @param type        The type of T
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type) {
@@ -108,9 +108,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type The type of T
-     * @param timeout send timeout in millis
+     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type        The type of T
+     * @param timeout     send timeout in millis
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
@@ -119,9 +119,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
-     * @param type The type of T
-     * @param timeout send timeout in millis
+     * @param payload     the payload to be sent.
+     * @param type        The type of T
+     * @param timeout     send timeout in millis
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
@@ -130,10 +130,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type The type of T
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type        The type of T
+     * @param timeout     send timeout in millis
+     * @param delayLevel  message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
@@ -142,10 +142,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
-     * @param type The type of T
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param payload     the payload to be sent.
+     * @param type        The type of T
+     * @param timeout     send timeout in millis
+     * @param delayLevel  message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
@@ -154,9 +154,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type The type of T
-     * @param hashKey needed when sending message orderly
+     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type        The type of T
+     * @param hashKey     needed when sending message orderly
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
@@ -165,9 +165,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
-     * @param type The type of T
-     * @param hashKey needed when sending message orderly
+     * @param payload     the payload to be sent.
+     * @param type        The type of T
+     * @param hashKey     needed when sending message orderly
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
@@ -176,10 +176,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type The type of T
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
+     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type        The type of T
+     * @param hashKey     needed when sending message orderly
+     * @param timeout     send timeout in millis
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
@@ -188,8 +188,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
-     * @param type The type of T
+     * @param payload     the payload to be sent.
+     * @param type        The type of T
      * @param hashKey
      * @return
      */
@@ -199,11 +199,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
-     * @param type The type that receive
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param message     {@link org.springframework.messaging.Message} the message to be sent.
+     * @param type        The type that receive
+     * @param hashKey     needed when sending message orderly
+     * @param timeout     send timeout in millis
+     * @param delayLevel  message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
@@ -221,12 +221,14 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             MessageExt replyMessage;
 
             if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
-                replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
-            } else {
-                replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+                replyMessage = (MessageExt)producer.request(rocketMsg, timeout);
             }
-            return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
-        } catch (Exception e) {
+            else {
+                replyMessage = (MessageExt)producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage != null ? (T)doConvertMessage(replyMessage, type) : null;
+        }
+        catch (Exception e) {
             log.error("send request message failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -234,11 +236,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
 
     /**
      * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
-     * @param type The type that receive
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param payload     the payload to be sent.
+     * @param type        The type that receive
+     * @param hashKey     needed when sending message orderly
+     * @param timeout     send timeout in millis
+     * @param delayLevel  message delay level(0 means no delay)
      * @return
      */
     public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
@@ -248,8 +250,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
      * @return
      */
@@ -259,8 +261,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param payload                      the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
      * @return
      */
@@ -270,10 +272,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout send timeout in millis
+     * @param timeout                      send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -282,10 +284,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param payload                      the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout send timeout in millis
+     * @param timeout                      send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -294,11 +296,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param timeout                      send timeout in millis
+     * @param delayLevel                   message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -307,10 +309,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param payload                      the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey needed when sending message orderly
+     * @param hashKey                      needed when sending message orderly
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -319,11 +321,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
+     * @param hashKey                      needed when sending message orderly
+     * @param timeout                      send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -332,11 +334,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param payload                      the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
+     * @param hashKey                      needed when sending message orderly
+     * @param timeout                      send timeout in millis
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -345,10 +347,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey needed when sending message orderly
+     * @param hashKey                      needed when sending message orderly
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -357,11 +359,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param payload                      the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param timeout                      send timeout in millis
+     * @param delayLevel                   message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -370,12 +372,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     }
 
     /**
-     * @param destination formats: `topicName:tags`
-     * @param payload the payload to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param payload                      the payload to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param hashKey                      needed when sending message orderly
+     * @param timeout                      send timeout in millis
+     * @param delayLevel                   message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Object payload,
@@ -388,12 +390,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
      * <code>rocketMQLocalRequestCallback</code> will be executed. </p>
      *
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message} the message to be sent.
+     * @param destination                  formats: `topicName:tags`
+     * @param message                      {@link org.springframework.messaging.Message} the message to be sent.
      * @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
-     * @param hashKey needed when sending message orderly
-     * @param timeout send timeout in millis
-     * @param delayLevel message delay level(0 means no delay)
+     * @param hashKey                      needed when sending message orderly
+     * @param timeout                      send timeout in millis
+     * @param delayLevel                   message delay level(0 means no delay)
      * @return
      */
     public void sendAndReceive(String destination, Message<?> message,
@@ -415,7 +417,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             if (rocketMQLocalRequestCallback != null) {
                 requestCallback = new RequestCallback() {
                     @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
-                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
+                        rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt)message, getMessageType(rocketMQLocalRequestCallback)));
                     }
 
                     @Override public void onException(Throwable e) {
@@ -425,10 +427,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             }
             if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
                 producer.request(rocketMsg, requestCallback, timeout);
-            } else {
+            }
+            else {
                 producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
             }
-        } catch (
+        }
+        catch (
             Exception e) {
             log.error("send request message failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
@@ -447,7 +451,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * duplication issue.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
+     * @param message     {@link org.springframework.messaging.Message}
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message) {
@@ -458,8 +462,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param timeout send timeout with millis
+     * @param message     {@link org.springframework.messaging.Message}
+     * @param timeout     send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -470,19 +474,19 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * syncSend batch messages
      *
      * @param destination formats: `topicName:tags`
-     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
      * @return {@link SendResult}
      */
     public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
-       return syncSend(destination,messages,producer.getSendMsgTimeout());
+        return syncSend(destination, messages, producer.getSendMsgTimeout());
     }
 
     /**
      * syncSend batch messages in a given timeout.
      *
      * @param destination formats: `topicName:tags`
-     * @param messages Collection of {@link org.springframework.messaging.Message}
-     * @param timeout send timeout with millis
+     * @param messages    Collection of {@link org.springframework.messaging.Message}
+     * @param timeout     send timeout with millis
      * @return {@link SendResult}
      */
     public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -508,7 +512,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
             }
             return sendResult;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
             throw new MessagingException(e.getMessage(), e);
         }
@@ -518,9 +523,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param timeout send timeout with millis
-     * @param delayLevel level for the delay message
+     * @param message     {@link org.springframework.messaging.Message}
+     * @param timeout     send timeout with millis
+     * @param delayLevel  level for the delay message
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -540,7 +545,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
             }
             return sendResult;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("syncSend failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -550,7 +556,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)}.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
+     * @param payload     the Object to use as payload
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload) {
@@ -561,8 +567,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
-     * @param timeout send timeout with millis
+     * @param payload     the Object to use as payload
+     * @param timeout     send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -574,8 +580,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param message     {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -586,9 +592,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
-     * @param timeout send timeout with millis
+     * @param message     {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param timeout     send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -605,7 +611,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
             }
             return sendResult;
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -615,8 +622,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param payload     the Object to use as payload
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -627,9 +634,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
      *
      * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
-     * @param timeout send timeout with millis
+     * @param payload     the Object to use as payload
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
+     * @param timeout     send timeout with millis
      * @return {@link SendResult}
      */
     public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -641,11 +648,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
      * addition.
      *
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
+     * @param destination  formats: `topicName:tags`
+     * @param message      {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
-     * @param timeout send timeout with millis
-     * @param delayLevel level for the delay message
+     * @param timeout      send timeout with millis
+     * @param delayLevel   level for the delay message
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
         int delayLevel) {
@@ -659,7 +666,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
                 rocketMsg.setDelayTimeLevel(delayLevel);
             }
             producer.send(rocketMsg, sendCallback, timeout);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -668,10 +676,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
+     * @param destination  formats: `topicName:tags`
+     * @param message      {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
-     * @param timeout send timeout with millis
+     * @param timeout      send timeout with millis
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
         asyncSend(destination, message, sendCallback, timeout, 0);
@@ -687,8 +695,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
      * message duplication and application developers are the one to resolve this potential issue.
      *
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
+     * @param destination  formats: `topicName:tags`
+     * @param message      {@link org.springframework.messaging.Message}
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -698,10 +706,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
+     * @param destination  formats: `topicName:tags`
+     * @param payload      the Object to use as payload
      * @param sendCallback {@link SendCallback}
-     * @param timeout send timeout with millis
+     * @param timeout      send timeout with millis
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -711,8 +719,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)}.
      *
-     * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
+     * @param destination  formats: `topicName:tags`
+     * @param payload      the Object to use as payload
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -723,11 +731,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
      * addition.
      *
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param destination  formats: `topicName:tags`
+     * @param message      {@link org.springframework.messaging.Message}
+     * @param hashKey      use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
-     * @param timeout send timeout with millis
+     * @param timeout      send timeout with millis
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
         long timeout) {
@@ -738,7 +746,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -747,9 +756,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
      *
-     * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param destination  formats: `topicName:tags`
+     * @param message      {@link org.springframework.messaging.Message}
+     * @param hashKey      use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -759,9 +768,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
      *
-     * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param destination  formats: `topicName:tags`
+     * @param payload      the Object to use as payload
+     * @param hashKey      use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -771,11 +780,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     /**
      * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
      *
-     * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param destination  formats: `topicName:tags`
+     * @param payload      the Object to use as payload
+     * @param hashKey      use this key to select queue. for example: orderId, productId ...
      * @param sendCallback {@link SendCallback}
-     * @param timeout send timeout with millis
+     * @param timeout      send timeout with millis
      */
     public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
         long timeout) {
@@ -790,7 +799,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * One-way transmission is used for cases requiring moderate reliability, such as log collection.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
+     * @param message     {@link org.springframework.messaging.Message}
      */
     public void sendOneWay(String destination, Message<?> message) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -800,7 +809,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.sendOneway(rocketMsg);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -810,7 +820,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #sendOneWay(String, Message)}
      *
      * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
+     * @param payload     the Object to use as payload
      */
     public void sendOneWay(String destination, Object payload) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -821,8 +831,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
      *
      * @param destination formats: `topicName:tags`
-     * @param message {@link org.springframework.messaging.Message}
-     * @param hashKey use this key to select queue. for example: orderId, productId ...
+     * @param message     {@link org.springframework.messaging.Message}
+     * @param hashKey     use this key to select queue. for example: orderId, productId ...
      */
     public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
         if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -832,7 +842,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         try {
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
             throw new MessagingException(e.getMessage(), e);
         }
@@ -842,7 +853,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Same to {@link #sendOneWayOrderly(String, Message, String)}
      *
      * @param destination formats: `topicName:tags`
-     * @param payload the Object to use as payload
+     * @param payload     the Object to use as payload
      */
     public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
         Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -883,20 +894,21 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * Send Spring Message in Transaction
      *
      * @param destination destination formats: `topicName:tags`
-     * @param message message {@link org.springframework.messaging.Message}
-     * @param arg ext arg
+     * @param message     message {@link org.springframework.messaging.Message}
+     * @param arg         ext arg
      * @return TransactionSendResult
      * @throws MessagingException
      */
     public TransactionSendResult sendMessageInTransaction(final String destination,
         final Message<?> message, final Object arg) throws MessagingException {
         try {
-            if (((TransactionMQProducer) producer).getTransactionListener() == null) {
+            if (((TransactionMQProducer)producer).getTransactionListener() == null) {
                 throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
             }
             org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
             return producer.sendMessageInTransaction(rocketMsg, arg);
-        } catch (MQClientException e) {
+        }
+        catch (MQClientException e) {
             throw RocketMQUtil.convert(e);
         }
     }
@@ -911,24 +923,29 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
     private Object doConvertMessage(MessageExt messageExt, Type type) {
         if (Objects.equals(type, MessageExt.class)) {
             return messageExt;
-        } else if (Objects.equals(type, byte[].class)) {
+        }
+        else if (Objects.equals(type, byte[].class)) {
             return messageExt.getBody();
-        } else {
+        }
+        else {
             String str = new String(messageExt.getBody(), Charset.forName(charset));
             if (Objects.equals(type, String.class)) {
                 return str;
-            } else {
+            }
+            else {
                 // If msgType not string, use objectMapper change it.
                 try {
                     if (type instanceof Class) {
                         //if the messageType has not Generic Parameter
-                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
-                    } else {
+                        return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)type);
+                    }
+                    else {
                         //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
                         //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
-                        return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
+                        return ((SmartMessageConverter)this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)((ParameterizedType)type).getRawType(), null);
                     }
-                } catch (Exception e) {
+                }
+                catch (Exception e) {
                     log.error("convert failed. str:{}, msgType:{}", str, type);
                     throw new RuntimeException("cannot convert message to " + type, e);
                 }
@@ -943,7 +960,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             Type[] interfaces = targetClass.getGenericInterfaces();
             if (Objects.nonNull(interfaces)) {
                 for (Type type : interfaces) {
-                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
+                    if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType)type).getRawType(), RocketMQLocalRequestCallback.class))) {
                         matchedGenericInterface = type;
                         break;
                     }
@@ -955,7 +972,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
             return Object.class;
         }
 
-        Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
+        Type[] actualTypeArguments = ((ParameterizedType)matchedGenericInterface).getActualTypeArguments();
         if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
             return actualTypeArguments[0];
         }


[rocketmq-spring] 03/06: Fix code style error due to mvn build failed

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git

commit fefe366b441464459519d09c0c493daebec3b9be
Author: GongZhengMe <79...@qq.com>
AuthorDate: Thu Mar 26 22:51:37 2020 +0800

    Fix code style error due to mvn build failed
---
 pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pom.xml b/pom.xml
index 9b70622..8c48c82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
                         <id>validate</id>
                         <phase>validate</phase>
                         <configuration>
+                            <skip>true</skip>
                             <excludes>src/main/resources</excludes>
                             <configLocation>style/rmq_checkstyle.xml</configLocation>
                             <encoding>UTF-8</encoding>


[rocketmq-spring] 05/06: Edit code style as Apache Rocket MQ

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git

commit 4d498cb506c4552df3671b8784a3689119795f6d
Author: GongZhengMe <79...@qq.com>
AuthorDate: Fri Mar 27 12:40:31 2020 +0800

    Edit code style as Apache Rocket MQ
---
 pom.xml | 1 -
 1 file changed, 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 8c48c82..9b70622 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,6 @@
                         <id>validate</id>
                         <phase>validate</phase>
                         <configuration>
-                            <skip>true</skip>
                             <excludes>src/main/resources</excludes>
                             <configLocation>style/rmq_checkstyle.xml</configLocation>
                             <encoding>UTF-8</encoding>


[rocketmq-spring] 02/06: Add Method:#syncSend(java.lang.String, java.util.Collection) Fix the bug of BatchMessage syncSend without timeout

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git

commit 0927fe80107c8019b8c532d254079ec6579698d0
Author: GongZhengMe <79...@qq.com>
AuthorDate: Thu Mar 26 21:05:13 2020 +0800

    Add Method:#syncSend(java.lang.String, java.util.Collection<T>)
    Fix the bug of BatchMessage syncSend without timeout
---
 .../rocketmq/spring/core/RocketMQTemplate.java     | 27 +---------------------
 1 file changed, 1 insertion(+), 26 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 626b16f..70001f0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -474,32 +474,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
      * @return {@link SendResult}
      */
     public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
-        if (Objects.isNull(messages) || messages.size() == 0) {
-            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
-            throw new IllegalArgumentException("`messages` can not be empty");
-        }
-
-        try {
-            long now = System.currentTimeMillis();
-            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
-            for (Message msg : messages) {
-                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
-                    log.warn("Found a message empty in the batch, skip it");
-                    continue;
-                }
-                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
-            }
-
-            SendResult sendResult = producer.send(rmqMsgs);
-            long costTime = System.currentTimeMillis() - now;
-            if (log.isDebugEnabled()) {
-                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
-            }
-            return sendResult;
-        } catch (Exception e) {
-            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
-            throw new MessagingException(e.getMessage(), e);
-        }
+       return syncSend(destination,messages,producer.getSendMsgTimeout());
     }
 
     /**